Merge pull request #18 from normanmaurer/bitmask

Use bitmasking to reduce the number of boolean variables and so save …
This commit is contained in:
Josef Grieb 2020-09-02 09:23:10 +02:00 committed by GitHub
commit c7f6ba0a55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -56,14 +56,18 @@ import static io.netty.channel.unix.UnixChannelUtil.*;
import static io.netty.util.internal.ObjectUtil.*; import static io.netty.util.internal.ObjectUtil.*;
abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel { abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel {
static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIOUringChannel.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIOUringChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
final LinuxSocket socket; final LinuxSocket socket;
protected volatile boolean active; protected volatile boolean active;
private boolean pollInPending = false;
private boolean pollOutPending = false; // Different masks for outstanding I/O operations.
private boolean writeScheduled = false; private static final int POLL_IN_SCHEDULED = 1;
private boolean readScheduled = false; private static final int POLL_OUT_SCHEDULED = 1 << 2;
private static final int WRITE_SCHEDULED = 1 << 3;
private static final int READ_SCHEDULED = 1 << 4;
private int ioState;
boolean inputClosedSeenErrorOnRead; boolean inputClosedSeenErrorOnRead;
static final int SOCK_ADDR_LEN = 128; static final int SOCK_ADDR_LEN = 128;
@ -211,13 +215,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
active = false; active = false;
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
if (pollInPending) { if ((ioState & POLL_IN_SCHEDULED) != 0) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN); submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN);
pollInPending = false; ioState &= ~POLL_IN_SCHEDULED;
} }
if (pollOutPending) { if ((ioState & POLL_OUT_SCHEDULED) != 0) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT); submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT);
pollOutPending = false; ioState &= ~POLL_OUT_SCHEDULED;
} }
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP); submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP);
submissionQueue.submit(); submissionQueue.submit();
@ -266,14 +270,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override @Override
protected void doBeginRead() { protected void doBeginRead() {
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
if (!pollInPending) { if ((ioState & POLL_IN_SCHEDULED) == 0) {
unsafe.schedulePollIn(); unsafe.schedulePollIn();
} }
} }
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) { protected void doWrite(ChannelOutboundBuffer in) {
if (writeScheduled) { if ((ioState & WRITE_SCHEDULED) != 0) {
return; return;
} }
@ -307,7 +311,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
if (iovecArray.count() > 0) { if (iovecArray.count() > 0) {
submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count()); submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count());
submissionQueue().submit(); submissionQueue().submit();
writeScheduled = true; ioState |= WRITE_SCHEDULED;
} }
} else { } else {
// We were not be able to create a new iovec, fallback to single write. // We were not be able to create a new iovec, fallback to single write.
@ -321,13 +325,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
buf.writerIndex()); buf.writerIndex());
submissionQueue.submit(); submissionQueue.submit();
writeScheduled = true; ioState |= WRITE_SCHEDULED;
} }
//POLLOUT //POLLOUT
private void addPollOut() { private void addPollOut() {
assert !pollOutPending; assert (ioState & POLL_OUT_SCHEDULED) == 0;
pollOutPending = true; ioState |= POLL_OUT_SCHEDULED;
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollOut(socket.intValue()); submissionQueue.addPollOut(socket.intValue());
submissionQueue.submit(); submissionQueue.submit();
@ -341,7 +345,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// Flush immediately only when there's no pending flush. // Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later, // If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now. // and thus there's no need to call it now.
if (!pollOutPending) { if ((ioState & POLL_OUT_SCHEDULED) == 0) {
super.flush0(); super.flush0();
} }
} }
@ -433,18 +437,18 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
void schedulePollIn() { void schedulePollIn() {
assert !pollInPending; assert (ioState & POLL_IN_SCHEDULED) == 0;
if (!isActive() || shouldBreakIoUringInReady(config())) { if (!isActive() || shouldBreakIoUringInReady(config())) {
return; return;
} }
pollInPending = true; ioState |= POLL_IN_SCHEDULED;
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollIn(socket.intValue()); submissionQueue.addPollIn(socket.intValue());
submissionQueue.submit(); submissionQueue.submit();
} }
final void readComplete(int res) { final void readComplete(int res) {
readScheduled = false; ioState &= ~READ_SCHEDULED;
readComplete0(res); readComplete0(res);
} }
@ -456,7 +460,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
*/ */
final void pollRdHup(int res) { final void pollRdHup(int res) {
if (isActive()) { if (isActive()) {
if (!readScheduled) { if ((ioState & READ_SCHEDULED) == 0) {
scheduleRead(); scheduleRead();
} }
} else { } else {
@ -469,15 +473,15 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
* Called once POLLIN event is ready to be processed * Called once POLLIN event is ready to be processed
*/ */
final void pollIn(int res) { final void pollIn(int res) {
pollInPending = false; ioState &= ~POLL_IN_SCHEDULED;
if (!readScheduled) { if ((ioState & READ_SCHEDULED) == 0) {
scheduleRead(); scheduleRead();
} }
} }
protected final void scheduleRead() { protected final void scheduleRead() {
readScheduled = true; ioState |= READ_SCHEDULED;
scheduleRead0(); scheduleRead0();
} }
@ -487,7 +491,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
* Called once POLLOUT event is ready to be processed * Called once POLLOUT event is ready to be processed
*/ */
final void pollOut(int res) { final void pollOut(int res) {
pollOutPending = false; ioState &= ~POLL_OUT_SCHEDULED;
// pending connect // pending connect
if (connectPromise != null) { if (connectPromise != null) {
@ -524,7 +528,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
final void writeComplete(int res) { final void writeComplete(int res) {
writeScheduled = false; ioState &= ~WRITE_SCHEDULED;
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (iovecMemoryAddress != -1) { if (iovecMemoryAddress != -1) {