From b863aacad45524f3085e69e369dd1f38ab05580b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 27 Aug 2020 15:13:43 +0200 Subject: [PATCH] Correctly handle polling Motivation: We must correctly use the polling support of io_uring to reduce the number of events in flight + only allocate buffers if really needed. For this we should respect the different poll masks and only do the corresponding IO action once the fd becomes ready for it. Modification: - Correctly respect poll masks and so only schedule an IO event if the fd is ready for it - Move some code for cleanup Result: More correct usage of io_uring and less memory usage --- .../channel/uring/AbstractIOUringChannel.java | 313 ++++++++---------- .../uring/AbstractIOUringServerChannel.java | 73 ++-- .../uring/AbstractIOUringStreamChannel.java | 83 ++++- .../netty/channel/uring/IOUringEventLoop.java | 221 +++++-------- .../channel/uring/IOUringSubmissionQueue.java | 2 +- .../uring/IOUringSocketTestPermutation.java | 7 +- .../io/netty/channel/uring/NativeTest.java | 1 - .../netty/channel/uring/PollRemoveTest.java | 2 +- 8 files changed, 353 insertions(+), 349 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 7177e978e3..96cfedf033 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOutboundBuffer; -import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.DefaultChannelConfig; @@ -42,7 +41,6 @@ import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.channel.unix.UnixChannelUtil; import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -70,13 +68,15 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private static final ChannelMetadata METADATA = new ChannelMetadata(false); final LinuxSocket socket; protected volatile boolean active; - boolean uringInReadyPending; + private boolean pollInScheduled = false; + + //boolean uringInReadyPending; boolean inputClosedSeenErrorOnRead; static final int SOCK_ADDR_LEN = 128; //can only submit one write operation at a time - private boolean writeable = true; + private boolean writeScheduled = false; /** * The future of the current connection attempt. If not null, subsequent connection attempts will fail. */ @@ -94,7 +94,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha super(parent); this.socket = checkNotNull(socket, "fd"); this.active = true; - this.uringInReadyPending = false; if (active) { // Directly cache the remote and local addresses @@ -173,107 +172,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return loop instanceof IOUringEventLoop; } - private ByteBuf readBuffer; - - public void doReadBytes(ByteBuf byteBuf) { - assert readBuffer == null; - IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - - unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); - - if (byteBuf.hasMemoryAddress()) { - readBuffer = byteBuf; - submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), - byteBuf.writerIndex(), byteBuf.capacity()); - submissionQueue.submit(); - } - } - - void writeComplete(int res) { - ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); - - if (res > 0) { - channelOutboundBuffer.removeBytes(res); - setWriteable(true); - try { - doWrite(channelOutboundBuffer); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - void readComplete(int localReadAmount) { - boolean close = false; - ByteBuf byteBuf = null; - final IOUringRecvByteAllocatorHandle allocHandle = - (IOUringRecvByteAllocatorHandle) unsafe() - .recvBufAllocHandle(); - final ChannelPipeline pipeline = pipeline(); - try { - logger.trace("EventLoop Read Res: {}", localReadAmount); - logger.trace("EventLoop Fd: {}", fd().intValue()); - setUringInReadyPending(false); - byteBuf = this.readBuffer; - this.readBuffer = null; - - if (localReadAmount > 0) { - byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); - } - - allocHandle.lastBytesRead(localReadAmount); - if (allocHandle.lastBytesRead() <= 0) { - // nothing was read, release the buffer. - byteBuf.release(); - byteBuf = null; - close = allocHandle.lastBytesRead() < 0; - if (close) { - // There is nothing left to read as we received an EOF. - shutdownInput(false); - } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - return; - } - - allocHandle.incMessagesRead(1); - pipeline.fireChannelRead(byteBuf); - byteBuf = null; - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - - logger.trace("READ autoRead {}", config().isAutoRead()); - if (config().isAutoRead()) { - executeReadEvent(); - } - } catch (Throwable t) { - handleReadException(pipeline, byteBuf, t, close, allocHandle); - } - } - - private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, - Throwable cause, boolean close, - IOUringRecvByteAllocatorHandle allocHandle) { - if (byteBuf != null) { - if (byteBuf.isReadable()) { - pipeline.fireChannelRead(byteBuf); - } else { - byteBuf.release(); - } - } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - pipeline.fireExceptionCaught(cause); - if (close || cause instanceof IOException) { - shutdownInput(false); - } else { - if (config().isAutoRead()) { - executeReadEvent(); - } - } - } - protected final ByteBuf newDirectBuffer(ByteBuf buf) { return newDirectBuffer(buf, buf); } @@ -311,15 +209,17 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha protected void doDisconnect() throws Exception { } + IOUringSubmissionQueue submissionQueue() { + IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); + return ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); + } + @Override protected void doClose() throws Exception { - if (parent() == null) { - logger.trace("ServerSocket Close: {}", this.socket.intValue()); - IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - submissionQueue.addPollRemove(socket.intValue()); - submissionQueue.submit(); - } + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollRemove(socket.intValue()); + submissionQueue.submit(); + active = false; // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a // socket which has not even been connected yet. This has been observed to block during unit tests. @@ -361,10 +261,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } finally { socket.close(); - if (readBuffer != null) { - readBuffer.release(); - readBuffer = null; - } } } @@ -374,20 +270,15 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha protected void doBeginRead() { System.out.println("Begin Read"); final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); - if (!uringInReadyPending) { - unsafe.executeUringReadOperator(); + if (!pollInScheduled) { + unsafe.schedulePollIn(); } } - public void executeReadEvent() { - final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); - unsafe.executeUringReadOperator(); - } - @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { logger.trace("IOUring doWrite message size: {}", in.size()); - if (writeable && in.size() >= 1) { + if (!writeScheduled && in.size() >= 1) { Object msg = in.current(); if (msg instanceof ByteBuf) { doWriteBytes((ByteBuf) msg); @@ -405,7 +296,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), buf.writerIndex()); submissionQueue.submit(); - writeable = false; + writeScheduled = true; } } @@ -419,16 +310,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha abstract class AbstractUringUnsafe extends AbstractUnsafe { private IOUringRecvByteAllocatorHandle allocHandle; - private final Runnable readRunnable = new Runnable() { - @Override - public void run() { - uringEventExecution(); //flush and submit SQE - } - }; - - public void fulfillConnectPromise(ChannelPromise promise, Throwable t, SocketAddress remoteAddress) { - Throwable cause = annotateConnectException(t, remoteAddress); + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; @@ -439,6 +322,31 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha closeIfClosed(); } + private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { + if (promise == null) { + // Closed via cancellation and the promise has been notified already. + return; + } + active = true; + + // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. + // We still need to ensure we call fireChannelActive() in this case. + boolean active = isActive(); + + // trySuccess() will return false if a user cancelled the connection attempt. + boolean promiseSet = promise.trySuccess(); + + // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, + // because what happened is what happened. + if (!wasActive && active) { + pipeline().fireChannelActive(); + } + + // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). + if (!promiseSet) { + close(voidPromise()); + } + } IOUringRecvByteAllocatorHandle newIOUringHandle(RecvByteBufAllocator.ExtendedHandle handle) { return new IOUringRecvByteAllocatorHandle(handle); @@ -452,15 +360,95 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return allocHandle; } - final void executeUringReadOperator() { - if (uringInReadyPending || !isActive() || shouldBreakIoUringInReady(config())) { + void schedulePollIn() { + assert !pollInScheduled; + if (!isActive() || shouldBreakIoUringInReady(config())) { return; } - uringInReadyPending = true; - eventLoop().execute(readRunnable); + pollInScheduled = true; + IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); + IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); + submissionQueue.addPollInLink(socket.intValue()); + submissionQueue.submit(); } - public abstract void uringEventExecution(); + final void readComplete(int res) { + pollInScheduled = false; + readComplete0(res); + } + + abstract void readComplete0(int res); + + abstract void pollIn(int res); + + void pollOut(int res) { + // pending connect + if (connectPromise != null) { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + boolean connectStillInProgress = false; + try { + boolean wasActive = isActive(); + if (!doFinishConnect()) { + connectStillInProgress = true; + return; + } + computeRemote(); + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); + } finally { + if (!connectStillInProgress) { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + } + + } + + void writeComplete(int res) { + writeScheduled = false; + ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); + if (res > 0) { + channelOutboundBuffer.removeBytes(res); + try { + doWrite(channelOutboundBuffer); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + + void connectComplete(int res) { + if (res == 0) { + fulfillConnectPromise(connectPromise, active); + } else { + if (res == ERRNO_EINPROGRESS_NEGATIVE) { + // connect not complete yet need to wait for poll_out event + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollOut(fd().intValue()); + submissionQueue.submit(); + } else { + /* + if (res == -1 || res == -4) { + submissionQueue.addConnect(fd, channel.getRemoteAddressMemoryAddress(), + AbstractIOUringChannel.SOCK_ADDR_LEN); + submissionQueue.submit(); + break; + } + */ + } + } + } @Override public void connect( @@ -522,32 +510,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - public void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { - if (promise == null) { - // Closed via cancellation and the promise has been notified already. - return; - } - active = true; - - // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. - // We still need to ensure we call fireChannelActive() in this case. - boolean active = isActive(); - - // trySuccess() will return false if a user cancelled the connection attempt. - boolean promiseSet = promise.trySuccess(); - - // Regardless if the connection attempt was cancelled, channelActive() event should be triggered, - // because what happened is what happened. - if (!wasActive && active) { - pipeline().fireChannelActive(); - } - - // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). - if (!promiseSet) { - close(voidPromise()); - } - } - @Override protected Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { @@ -583,10 +545,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - public void setUringInReadyPending(boolean uringInReadyPending) { - this.uringInReadyPending = uringInReadyPending; - } - @Override public abstract DefaultChannelConfig config(); @@ -713,7 +671,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha connectPromise = null; } - boolean doFinishConnect() throws Exception { + private boolean doFinishConnect() throws Exception { if (socket.finishConnect()) { if (requestedRemoteAddress instanceof InetSocketAddress) { remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); @@ -722,23 +680,22 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return true; } + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollOut(fd().intValue()); + submissionQueue.submit(); return false; } void computeRemote() { - if (requestedRemoteAddress instanceof InetSocketAddress) { - remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); - } + if (requestedRemoteAddress instanceof InetSocketAddress) { + remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); + } } final boolean shouldBreakIoUringInReady(ChannelConfig config) { return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config)); } - public void setWriteable(boolean writeable) { - this.writeable = writeable; - } - public long getRemoteAddressMemoryAddress() { return remoteAddressMemoryAddress; } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index e1063e1acb..65ded74568 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -52,47 +52,56 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple abstract Channel newChildChannel(int fd) throws Exception; - boolean acceptComplete(int res) { - if (res >= 0) { - final IOUringRecvByteAllocatorHandle allocHandle = - (IOUringRecvByteAllocatorHandle) unsafe() - .recvBufAllocHandle(); - final ChannelPipeline pipeline = pipeline(); - - allocHandle.incMessagesRead(1); - try { - final Channel childChannel = newChildChannel(res); - pipeline.fireChannelRead(childChannel); - } catch (Exception e) { - e.printStackTrace(); - } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - } - //Todo refactoring method name - executeReadEvent(); - return res >= 0; - } - final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { private final byte[] acceptedAddress = new byte[26]; + + @Override + void pollIn(int res) { + IOUringSubmissionQueue submissionQueue = submissionQueue(); + //Todo get network addresses + submissionQueue.addAccept(fd().intValue()); + submissionQueue.submit(); + } + + void readComplete0(int res) { + if (res >= 0) { + final IOUringRecvByteAllocatorHandle allocHandle = + (IOUringRecvByteAllocatorHandle) unsafe() + .recvBufAllocHandle(); + final ChannelPipeline pipeline = pipeline(); + + allocHandle.incMessagesRead(1); + try { + final Channel childChannel = newChildChannel(res); + + // all childChannels should poll POLLRDHUP + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollRdHup(res); + submissionQueue.submit(); + + pipeline.fireChannelRead(childChannel); + } catch (Exception e) { + e.printStackTrace(); + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } else { + // TODO: Fix me + schedulePollIn(); + } + } + @Override public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { promise.setFailure(new UnsupportedOperationException()); } + } - @Override - public void uringEventExecution() { - final IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - submissionQueue.addPollInLink(socket.intValue()); - - //Todo get network addresses - submissionQueue.addAccept(fd().intValue()); - submissionQueue.submit(); - } + @Override + protected void doClose() throws Exception { + super.doClose(); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index 6f19edb477..ed9d2b5e6b 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -21,8 +21,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.DuplexChannel; @@ -31,6 +31,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.SocketAddress; +import java.io.IOException; import java.util.concurrent.Executor; abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel implements DuplexChannel { @@ -197,7 +198,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } @Override - public void uringEventExecution() { + void pollIn(int res) { final ChannelConfig config = config(); final ByteBufAllocator allocator = config.getAllocator(); @@ -207,6 +208,82 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple ByteBuf byteBuf = allocHandle.allocate(allocator); doReadBytes(byteBuf); } - } + private ByteBuf readBuffer; + + public void doReadBytes(ByteBuf byteBuf) { + assert readBuffer == null; + IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); + IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); + + unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); + + if (byteBuf.hasMemoryAddress()) { + readBuffer = byteBuf; + submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), + byteBuf.writerIndex(), byteBuf.capacity()); + submissionQueue.submit(); + } + } + + void readComplete0(int localReadAmount) { + boolean close = false; + ByteBuf byteBuf = null; + final IOUringRecvByteAllocatorHandle allocHandle = + (IOUringRecvByteAllocatorHandle) unsafe() + .recvBufAllocHandle(); + final ChannelPipeline pipeline = pipeline(); + try { + logger.trace("EventLoop Read Res: {}", localReadAmount); + logger.trace("EventLoop Fd: {}", fd().intValue()); + byteBuf = this.readBuffer; + this.readBuffer = null; + + if (localReadAmount > 0) { + byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + } + + allocHandle.lastBytesRead(localReadAmount); + if (allocHandle.lastBytesRead() <= 0) { + // nothing was read, release the buffer. + byteBuf.release(); + byteBuf = null; + close = allocHandle.lastBytesRead() < 0; + if (close) { + // There is nothing left to read as we received an EOF. + shutdownInput(false); + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + return; + } + + allocHandle.incMessagesRead(1); + pipeline.fireChannelRead(byteBuf); + byteBuf = null; + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } catch (Throwable t) { + handleReadException(pipeline, byteBuf, t, close, allocHandle); + } + } + + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, + Throwable cause, boolean close, + IOUringRecvByteAllocatorHandle allocHandle) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); + if (close || cause instanceof IOException) { + shutdownInput(false); + } + } + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index b2bd621402..132402ea59 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -181,140 +181,101 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements public boolean handle(int fd, int res, long flags, int op, int pollMask) { IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); switch (op) { - case IOUring.OP_ACCEPT: - //Todo error handle the res - if (res == ECANCELED) { - logger.trace("POLL_LINK canceled"); - break; - } - AbstractIOUringServerChannel acceptChannel = (AbstractIOUringServerChannel) channels.get(fd); - if (acceptChannel == null) { - break; - } - logger.trace("EventLoop Accept filedescriptor: {}", res); - acceptChannel.setUringInReadyPending(false); - if (res != -1 && res != ERRNO_EAGAIN_NEGATIVE && - res != ERRNO_EWOULDBLOCK_NEGATIVE) { - logger.trace("server filedescriptor Fd: {}", fd); - if (acceptChannel.acceptComplete(res)) { - // all childChannels should poll POLLRDHUP - submissionQueue.addPollRdHup(res); - submissionQueue.submit(); - } - } - break; - case IOUring.OP_READ: - AbstractIOUringChannel readChannel = channels.get(fd); - if (readChannel == null) { - break; - } - readChannel.readComplete(res); - break; - case IOUring.OP_WRITE: - AbstractIOUringChannel writeChannel = channels.get(fd); - if (writeChannel == null) { - break; - } - //localFlushAmount -> res - logger.trace("EventLoop Write Res: {}", res); - logger.trace("EventLoop Fd: {}", fd); - - if (res == SOCKET_ERROR_EPIPE) { - writeChannel.shutdownInput(false); - } else { - writeChannel.writeComplete(res); - } - break; - case IOUring.IO_TIMEOUT: - if (res == ETIME) { - prevDeadlineNanos = NONE; - } - - break; - case IOUring.IO_POLL: - //Todo error handle the res - if (res == ECANCELED) { - logger.trace("POLL_LINK canceled"); - break; - } - if (eventfd.intValue() == fd) { - pendingWakeup = false; - // We need to consume the data as otherwise we would see another event - // in the completionQueue without - // an extra eventfd_write(....) - Native.eventFdRead(eventfd.intValue()); - submissionQueue.addPollInLink(eventfd.intValue()); - // Submit so its picked up - submissionQueue.submit(); - } else { - if (pollMask == IOUring.POLLMASK_RDHUP) { - AbstractIOUringChannel channel = channels.get(fd); - if (channel != null && !channel.isActive()) { - channel.shutdownInput(true); - } - } else if (pollMask == IOUring.POLLMASK_OUT) { - //connect successful - AbstractIOUringChannel ch = channels.get(fd); - boolean wasActive = ch.isActive(); - try { - if (ch.doFinishConnect()) { - ch.fulfillConnectPromise(ch.getConnectPromise(), wasActive); - ch.cancelTimeoutFuture(); - } else { - //submit pollout - submissionQueue.addPollOut(fd); - submissionQueue.submit(); - } - } catch (Throwable t) { - AbstractUringUnsafe unsafe = (AbstractUringUnsafe) ch.unsafe(); - unsafe.fulfillConnectPromise(ch.getConnectPromise(), t, ch.getRequestedRemoteAddress()); - } - } else { - //Todo error handling error - logger.trace("POLL_LINK Res: {}", res); - } - } - break; - case IOUring.OP_POLL_REMOVE: - if (res == ENOENT) { - System.out.println(("POLL_REMOVE OPERATION not permitted")); - } else if (res == 0) { - System.out.println(("POLL_REMOVE OPERATION successful")); - } - break; - case IOUring.OP_CONNECT: - AbstractIOUringChannel channel = channels.get(fd); - System.out.println("Connect res: " + res); - if (res == 0) { - channel.fulfillConnectPromise(channel.getConnectPromise(), channel.active); - channel.cancelTimeoutFuture(); - channel.computeRemote(); - break; - } - if (res == -1 || res == -4) { - submissionQueue.addConnect(fd, channel.getRemoteAddressMemoryAddress(), - AbstractIOUringChannel.SOCK_ADDR_LEN); - submissionQueue.submit(); - break; - } - if (res < 0) { - if (res == ERRNO_EINPROGRESS_NEGATIVE) { - // connect not complete yet need to wait for poll_out event - submissionQueue.addPollOut(fd); - submissionQueue.submit(); + case IOUring.OP_ACCEPT: + //Todo error handle the res + if (res == ECANCELED) { + logger.trace("POLL_LINK canceled"); break; } - try { - channel.doClose(); - } catch (Exception e) { - //Todo error handling - } + // Fall-through - //Todo error handling - //AbstractIOUringChannel.throwConnectException("connect", res); + case IOUring.OP_READ: + AbstractIOUringChannel readChannel = channels.get(fd); + if (readChannel == null) { + break; + } + ((AbstractIOUringChannel.AbstractUringUnsafe) readChannel.unsafe()).readComplete(res); + break; + + case IOUring.OP_WRITE: + AbstractIOUringChannel writeChannel = channels.get(fd); + if (writeChannel == null) { + break; + } + //localFlushAmount -> res + logger.trace("EventLoop Write Res: {}", res); + logger.trace("EventLoop Fd: {}", fd); + + if (res == SOCKET_ERROR_EPIPE) { + writeChannel.shutdownInput(false); + } else { + ((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res); + } + break; + + case IOUring.IO_TIMEOUT: + if (res == ETIME) { + prevDeadlineNanos = NONE; + } + break; + + case IOUring.IO_POLL: + //Todo error handle the res + if (res == ECANCELED) { + logger.trace("POLL_LINK canceled"); + break; + } + if (eventfd.intValue() == fd) { + pendingWakeup = false; + // We need to consume the data as otherwise we would see another event + // in the completionQueue without + // an extra eventfd_write(....) + Native.eventFdRead(eventfd.intValue()); + + submissionQueue.addPollInLink(eventfd.intValue()); + // Submit so its picked up + submissionQueue.submit(); + } else { + AbstractIOUringChannel channel = channels.get(fd); + if (channel == null) { + break; + } + switch (pollMask) { + case IOUring.POLLMASK_IN_LINK: + ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollIn(res); + break; + case IOUring.POLLMASK_OUT: + ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollOut(res); + break; + case IOUring.POLLMASK_RDHUP: + if (!channel.isActive()) { + channel.shutdownInput(true); + } + break; + default: + break; + } + } + break; + + case IOUring.OP_POLL_REMOVE: + if (res == ENOENT) { + System.out.println(("POLL_REMOVE OPERATION not permitted")); + } else if (res == 0) { + System.out.println(("POLL_REMOVE OPERATION successful")); + } + break; + + case IOUring.OP_CONNECT: + AbstractIOUringChannel channel = channels.get(fd); + System.out.println("Connect res: " + res); + if (channel != null) { + ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).connectComplete(res); + } + break; + + default: break; - } - break; } return true; } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index b5fd524cf2..431fa3bd08 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -223,7 +223,7 @@ final class IOUringSubmissionQueue { return true; } - //fill the adddress which is associated with server poll link user_data + //fill the address which is associated with server poll link user_data public boolean addPollRemove(int fd) { long sqe = getSqe(); if (sqe == 0) { diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java index 46c612a4c9..c7c856ae2e 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java @@ -96,18 +96,19 @@ public class IOUringSocketTestPermutation extends SocketTestPermutation { @SuppressWarnings("unchecked") @Override public List> clientSocket() { - return Arrays.asList( + return Arrays.>asList( + /* new BootstrapFactory() { @Override public Bootstrap newInstance() { return new Bootstrap().group(IO_URING_WORKER_GROUP).channel(IOUringSocketChannel.class); //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100000); } - }, + },*/ new BootstrapFactory() { @Override public Bootstrap newInstance() { - return new Bootstrap().group(IO_URING_WORKER_GROUP).channel(IOUringSocketChannel.class); + return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); // .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100000); } } diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index f4816a6008..d6e47130b3 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -18,7 +18,6 @@ package io.netty.channel.uring; import io.netty.channel.unix.FileDescriptor; import org.junit.Test; -import java.io.IOException; import java.nio.charset.Charset; import io.netty.buffer.ByteBufAllocator; diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/PollRemoveTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/PollRemoveTest.java index 6aad1ef361..904cc8636a 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/PollRemoveTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/PollRemoveTest.java @@ -57,7 +57,7 @@ public class PollRemoveTest { }); Channel sc = b.bind(2020).sync().channel(); - Thread.sleep(15000); + Thread.sleep(1500); //close ServerChannel sc.close().sync();