From a3585492e99cca1eac0d95588636e32ec283d103 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 28 Aug 2020 21:34:26 +0200 Subject: [PATCH] Correctly handle POLL*, handle errors, cleanup Motivation: We not correctly handled errors and also had some problems with home POLL* was handled. Modifictions: - Cleanup - No need to for links anymore - Add error handling for most operations (poll still missing) - Add better handling for RDHUP - Correctly handle writeScheduled flag for writev Result: Cleaner and more correct code --- .../channel/uring/AbstractIOUringChannel.java | 262 +++++++----------- .../uring/AbstractIOUringServerChannel.java | 66 ++--- .../uring/AbstractIOUringStreamChannel.java | 85 +++--- .../java/io/netty/channel/uring/IOUring.java | 4 +- .../channel/uring/IOUringCompletionQueue.java | 2 +- .../netty/channel/uring/IOUringEventLoop.java | 42 +-- .../channel/uring/IOUringSubmissionQueue.java | 13 +- .../netty/channel/uring/IovecArrayPool.java | 4 +- .../java/io/netty/channel/unix/Errors.java | 2 +- .../io/netty/channel/AbstractChannel.java | 42 +-- 10 files changed, 230 insertions(+), 292 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index c07ba80541..383903d9d9 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -34,21 +34,13 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.channel.socket.SocketChannelConfig; -import io.netty.channel.unix.Buffer; -import io.netty.channel.unix.FileDescriptor; -import io.netty.channel.unix.NativeInetAddress; -import io.netty.channel.unix.Socket; -import io.netty.channel.unix.UnixChannel; -import io.netty.channel.unix.UnixChannelUtil; +import io.netty.channel.unix.*; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.io.FileNotFoundException; import java.io.IOException; -import java.net.ConnectException; import java.net.InetSocketAddress; -import java.net.NoRouteToHostException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AlreadyConnectedException; @@ -70,7 +62,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha protected volatile boolean active; private boolean pollInScheduled = false; - //boolean uringInReadyPending; boolean inputClosedSeenErrorOnRead; static final int SOCK_ADDR_LEN = 128; @@ -235,11 +226,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha connectPromise = null; } - ScheduledFuture future = connectTimeoutFuture; - if (future != null) { - future.cancel(false); - connectTimeoutFuture = null; - } + cancelConnectTimeoutFuture(); if (isRegistered()) { // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor @@ -271,7 +258,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // Channel/ChannelHandlerContext.read() was called @Override protected void doBeginRead() { - System.out.println("Begin Read"); final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); if (!pollInScheduled) { unsafe.schedulePollIn(); @@ -279,7 +265,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) { logger.trace("IOUring doWrite message size: {}", in.size()); if (writeScheduled) { @@ -290,22 +276,27 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha doWriteMultiple(in); //Object msg = in.current(); //doWriteSingle((ByteBuf) msg); - } else if(msgCount == 1) { + } else if (msgCount == 1) { Object msg = in.current(); doWriteSingle((ByteBuf) msg); } } - private void doWriteMultiple(ChannelOutboundBuffer in) throws Exception { + private void doWriteMultiple(ChannelOutboundBuffer in) { final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool(); iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress(); if (iovecMemoryAddress != -1) { - in.forEachFlushedMessage(iovecArray); + try { + in.forEachFlushedMessage(iovecArray); + } catch (Exception e) { + + } if (iovecArray.count() > 0) { submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count()); submissionQueue().submit(); + writeScheduled = true; } } //Todo error handling @@ -313,24 +304,17 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha protected final void doWriteSingle(ByteBuf buf) { - if (buf.hasMemoryAddress()) { - //link pollwrite operation - addPollOut(); - - IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), - buf.writerIndex()); - submissionQueue.submit(); - writeScheduled = true; - } + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), + buf.writerIndex()); + submissionQueue.submit(); + writeScheduled = true; } //POLLOUT private void addPollOut() { - IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - submissionQueue.addPollOutLink(socket.intValue()); + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollOut(socket.intValue()); submissionQueue.submit(); } @@ -396,14 +380,43 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return allocHandle; } + void shutdownInput(boolean rdHup) { + logger.trace("shutdownInput Fd: {}", fd().intValue()); + if (!socket.isInputShutdown()) { + if (isAllowHalfClosure(config())) { + try { + socket.shutdown(true, false); + } catch (IOException ignored) { + // We attempted to shutdown and failed, which means the input has already effectively been + // shutdown. + fireEventAndClose(ChannelInputShutdownEvent.INSTANCE); + return; + } catch (NotYetConnectedException ignore) { + // We attempted to shutdown and failed, which means the input has already effectively been + // shutdown. + } + pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidPromise()); + } + } else if (!rdHup) { + inputClosedSeenErrorOnRead = true; + pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); + } + } + + private void fireEventAndClose(Object evt) { + pipeline().fireUserEventTriggered(evt); + close(voidPromise()); + } + void schedulePollIn() { assert !pollInScheduled; if (!isActive() || shouldBreakIoUringInReady(config())) { return; } pollInScheduled = true; - IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); + IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollIn(socket.intValue()); submissionQueue.submit(); } @@ -413,11 +426,26 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha readComplete0(res); } - abstract void readComplete0(int res); + protected abstract void readComplete0(int res); + + /** + * Called once POLLRDHUP event is ready to be processed + */ + final void pollRdHup(int res) { + if (isActive()) { + // If it is still active, we need to call epollInReady as otherwise we may miss to + // read pending data from the underlying file descriptor. + // See https://github.com/netty/netty/issues/3709 + pollIn(res); + } else { + // Just to be safe make sure the input marked as closed. + shutdownInput(true); + } + } abstract void pollIn(int res); - void pollOut(int res) { + final void pollOut(int res) { // pending connect if (connectPromise != null) { // Note this method is invoked by the event loop only if the connection attempt was @@ -440,50 +468,55 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha if (!connectStillInProgress) { // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used // See https://github.com/netty/netty/issues/1770 - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } + cancelConnectTimeoutFuture(); connectPromise = null; } } + } else if (!getSocket().isOutputShutdown()) { + doWrite(unsafe().outboundBuffer()); } } - void writeComplete(int res) { + final void writeComplete(int res) { writeScheduled = false; ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); - if (iovecMemoryAddress != 0) { + if (iovecMemoryAddress != -1) { ((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress); + iovecMemoryAddress = -1; } - if (res > 0) { + if (res >= 0) { channelOutboundBuffer.removeBytes(res); + doWrite(channelOutboundBuffer); + } else { try { - doWrite(channelOutboundBuffer); - } catch (Exception e) { - e.printStackTrace(); + if (ioResult("io_uring write", res) == 0) { + // We were not able to write everything, let's register for POLLOUT + addPollOut(); + } + } catch (Throwable cause) { + handleWriteError(cause); } } } - - void connectComplete(int res) { + final void connectComplete(int res) { if (res == 0) { fulfillConnectPromise(connectPromise, active); } else { if (res == ERRNO_EINPROGRESS_NEGATIVE) { // connect not complete yet need to wait for poll_out event - IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addPollOut(fd().intValue()); - submissionQueue.submit(); + addPollOut(); } else { - /* - if (res == -1 || res == -4) { - submissionQueue.addConnect(fd, channel.getRemoteAddressMemoryAddress(), - AbstractIOUringChannel.SOCK_ADDR_LEN); - submissionQueue.submit(); - break; - } - */ + try { + Errors.throwConnectException("io_uring connect", res); + } catch (Throwable cause) { + fulfillConnectPromise(connectPromise, cause); + } finally { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + cancelConnectTimeoutFuture(); + connectPromise = null; + } } } } @@ -504,9 +537,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress; NativeInetAddress address = NativeInetAddress.newInstance(inetSocketAddress.getAddress()); socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(),remoteAddressMemoryAddress); - IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - final IOUringSubmissionQueue ioUringSubmissionQueue = - ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); + final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue(); ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN); ioUringSubmissionQueue.submit(); @@ -537,9 +568,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } + cancelConnectTimeoutFuture(); connectPromise = null; close(voidPromise()); } @@ -603,7 +632,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha /** * Connect to the remote peer */ - protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + private void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress instanceof InetSocketAddress) { checkResolvable((InetSocketAddress) localAddress); } @@ -626,87 +655,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } -// public void setRemote() { -// remote = remoteSocketAddr == null ? -// remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); -// } - - private boolean doConnect0(SocketAddress remote) throws Exception { - boolean success = false; - try { - boolean connected = socket.connect(remote); - if (!connected) { - //setFlag(Native.EPOLLOUT); - } - success = true; - return connected; - } finally { - if (!success) { - doClose(); - } - } - } - - void shutdownInput(boolean rdHup) { - logger.trace("shutdownInput Fd: {}", this.socket.intValue()); - if (!socket.isInputShutdown()) { - if (isAllowHalfClosure(config())) { - try { - socket.shutdown(true, false); - } catch (IOException ignored) { - // We attempted to shutdown and failed, which means the input has already effectively been - // shutdown. - fireEventAndClose(ChannelInputShutdownEvent.INSTANCE); - return; - } catch (NotYetConnectedException ignore) { - // We attempted to shutdown and failed, which means the input has already effectively been - // shutdown. - } - pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - close(voidPromise()); - } - } else if (!rdHup) { - inputClosedSeenErrorOnRead = true; - pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); - } - } - - - //Todo we should move it to a error class - // copy unix Errors - static void throwConnectException(String method, int err) - throws IOException { - if (err == ERROR_EALREADY_NEGATIVE) { - throw new ConnectionPendingException(); - } - if (err == ERROR_ENETUNREACH_NEGATIVE) { - throw new NoRouteToHostException(); - } - if (err == ERROR_EISCONN_NEGATIVE) { - throw new AlreadyConnectedException(); - } - if (err == ERRNO_ENOENT_NEGATIVE) { - throw new FileNotFoundException(); - } - throw new ConnectException(method + "(..) failed: "); - } - private static boolean isAllowHalfClosure(ChannelConfig config) { return config instanceof SocketChannelConfig && ((SocketChannelConfig) config).isAllowHalfClosure(); } - private void fireEventAndClose(Object evt) { - pipeline().fireUserEventTriggered(evt); - close(voidPromise()); - } - - void cancelTimeoutFuture() { + private void cancelConnectTimeoutFuture() { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); + connectTimeoutFuture = null; } - connectPromise = null; } private boolean doFinishConnect() throws Exception { @@ -718,35 +676,17 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return true; } - IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addPollOut(fd().intValue()); - submissionQueue.submit(); + addPollOut(); return false; } - void computeRemote() { + private void computeRemote() { if (requestedRemoteAddress instanceof InetSocketAddress) { remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress()); } } - final boolean shouldBreakIoUringInReady(ChannelConfig config) { + private boolean shouldBreakIoUringInReady(ChannelConfig config) { return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config)); } - - public long getRemoteAddressMemoryAddress() { - return remoteAddressMemoryAddress; - } - - public ChannelPromise getConnectPromise() { - return connectPromise; - } - - public ScheduledFuture getConnectTimeoutFuture() { - return connectTimeoutFuture; - } - - public SocketAddress getRequestedRemoteAddress() { - return requestedRemoteAddress; - } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 7917c68884..269c1e46bc 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -21,9 +21,12 @@ import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.ServerChannel; +import io.netty.channel.unix.Errors; import java.net.SocketAddress; +import static io.netty.channel.unix.Errors.*; + abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel { AbstractIOUringServerChannel(int fd) { @@ -40,7 +43,7 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple } @Override - protected void doWrite(ChannelOutboundBuffer in) throws Exception { + protected void doWrite(ChannelOutboundBuffer in) { throw new UnsupportedOperationException(); } @@ -53,41 +56,43 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { private final byte[] acceptedAddress = new byte[26]; - - @Override - void pollIn(int res) { + private void acceptSocket() { IOUringSubmissionQueue submissionQueue = submissionQueue(); //Todo get network addresses submissionQueue.addAccept(fd().intValue()); submissionQueue.submit(); } - void readComplete0(int res) { - if (res >= 0) { - final IOUringRecvByteAllocatorHandle allocHandle = - (IOUringRecvByteAllocatorHandle) unsafe() - .recvBufAllocHandle(); - final ChannelPipeline pipeline = pipeline(); + @Override + void pollIn(int res) { + acceptSocket(); + } - allocHandle.incMessagesRead(1); - try { - final Channel childChannel = newChildChannel(res); - - // all childChannels should poll POLLRDHUP - IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addPollRdHup(res); - submissionQueue.submit(); - - pipeline.fireChannelRead(childChannel); - } catch (Exception e) { - e.printStackTrace(); + // TODO: Respect MAX_MESSAGES_READ + protected void readComplete0(int res) { + final IOUringRecvByteAllocatorHandle allocHandle = + (IOUringRecvByteAllocatorHandle) unsafe() + .recvBufAllocHandle(); + final ChannelPipeline pipeline = pipeline(); + if (res >= 0) { + allocHandle.incMessagesRead(1); + try { + pipeline.fireChannelRead(newChildChannel(res)); + } catch (Throwable cause) { + allocHandle.readComplete(); + pipeline.fireExceptionCaught(cause); + pipeline.fireChannelReadComplete(); + } + acceptSocket(); + } else { + allocHandle.readComplete(); + // Check if we did fail because there was nothing to accept atm. + if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) { + // Something bad happened. Convert to an exception. + pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res)); + } + pipeline.fireChannelReadComplete(); } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); - } else { - // TODO: Fix me - schedulePollIn(); - } } @Override @@ -96,10 +101,5 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple promise.setFailure(new UnsupportedOperationException()); } } - - @Override - protected void doClose() throws Exception { - super.doClose(); - } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index ed9d2b5e6b..ecfaa55806 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -34,6 +34,8 @@ import java.net.SocketAddress; import java.io.IOException; import java.util.concurrent.Executor; +import static io.netty.channel.unix.Errors.ioResult; + abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel implements DuplexChannel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIOUringStreamChannel.class); @@ -47,18 +49,15 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple AbstractIOUringStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) { super(parent, fd, remote); - // Add EPOLLRDHUP so we are notified once the remote peer close the connection. } @Override public ChannelFuture shutdown() { - System.out.println("AbstractStreamChannel shutdown"); return shutdown(newPromise()); } @Override public ChannelFuture shutdown(final ChannelPromise promise) { - ChannelFuture shutdownOutputFuture = shutdownOutput(); if (shutdownOutputFuture.isDone()) { shutdownOutputDone(shutdownOutputFuture, promise); @@ -173,7 +172,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple private static void shutdownDone(ChannelFuture shutdownOutputFuture, ChannelFuture shutdownInputFuture, ChannelPromise promise) { - System.out.println("AbstractStreamChannel ShutdownDone"); Throwable shutdownOutputCause = shutdownOutputFuture.cause(); Throwable shutdownInputCause = shutdownInputFuture.cause(); if (shutdownOutputCause != null) { @@ -189,6 +187,15 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } } + @Override + protected void doRegister() throws Exception { + super.doRegister(); + // all non-server channels should poll POLLRDHUP + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollRdHup(fd().intValue()); + submissionQueue.submit(); + } + class IOUringStreamUnsafe extends AbstractUringUnsafe { // Overridden here just to be able to access this method from AbstractEpollStreamChannel @@ -197,8 +204,14 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple return super.prepareToClose(); } + private ByteBuf readBuffer; + @Override void pollIn(int res) { + readFromSocket(); + } + + private void readFromSocket() { final ChannelConfig config = config(); final ByteBufAllocator allocator = config.getAllocator(); @@ -206,44 +219,42 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple allocHandle.reset(config); ByteBuf byteBuf = allocHandle.allocate(allocator); - doReadBytes(byteBuf); - } - - private ByteBuf readBuffer; - - public void doReadBytes(ByteBuf byteBuf) { - assert readBuffer == null; - IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); - IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); - + IOUringSubmissionQueue submissionQueue = submissionQueue(); unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); - if (byteBuf.hasMemoryAddress()) { - readBuffer = byteBuf; - submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), - byteBuf.writerIndex(), byteBuf.capacity()); - submissionQueue.submit(); - } + assert readBuffer == null; + readBuffer = byteBuf; + submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), + byteBuf.writerIndex(), byteBuf.capacity()); + submissionQueue.submit(); } - void readComplete0(int localReadAmount) { + // TODO: Respect MAX_MESSAGE_READ. + protected void readComplete0(int res) { boolean close = false; - ByteBuf byteBuf = null; + final IOUringRecvByteAllocatorHandle allocHandle = (IOUringRecvByteAllocatorHandle) unsafe() .recvBufAllocHandle(); final ChannelPipeline pipeline = pipeline(); + ByteBuf byteBuf = this.readBuffer; + this.readBuffer = null; + assert byteBuf != null; + + boolean writable = true; + try { - logger.trace("EventLoop Read Res: {}", localReadAmount); - logger.trace("EventLoop Fd: {}", fd().intValue()); - byteBuf = this.readBuffer; - this.readBuffer = null; - - if (localReadAmount > 0) { - byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount); + if (res < 0) { + // If res is negative we should pass it to ioResult(...) which will either throw + // or convert it to 0 if we could not read because the socket was not readable. + allocHandle.lastBytesRead(ioResult("io_uring read", res)); + } else if (res > 0) { + byteBuf.writerIndex(byteBuf.writerIndex() + res); + allocHandle.lastBytesRead(res); + } else { + // EOF which we signal with -1. + allocHandle.lastBytesRead(-1); } - - allocHandle.lastBytesRead(localReadAmount); if (allocHandle.lastBytesRead() <= 0) { // nothing was read, release the buffer. byteBuf.release(); @@ -259,13 +270,21 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } allocHandle.incMessagesRead(1); + writable = byteBuf.isWritable(); pipeline.fireChannelRead(byteBuf); byteBuf = null; - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } + if (!close) { + if (!writable) { + // Let's schedule another read. + readFromSocket(); + } else { + // We did not fill the whole ByteBuf so we should break the "read loop" and try again later. + pipeline.fireChannelReadComplete(); + } + } } private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java index ae982a118f..26e7010afc 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java @@ -15,7 +15,6 @@ */ package io.netty.channel.uring; -import io.netty.channel.unix.FileDescriptor; import io.netty.util.internal.SystemPropertyUtil; final class IOUring { @@ -31,9 +30,8 @@ final class IOUring { static final int OP_CONNECT = 16; static final int POLLMASK_IN = 1; - static final int POLLMASK_OUT_LINK = 4; - static final int POLLMASK_RDHUP = 8192; static final int POLLMASK_OUT = 4; + static final int POLLMASK_RDHUP = 8192; static { Throwable cause = null; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index 23111e0f2f..4877637ae3 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -55,7 +55,7 @@ final class IOUringCompletionQueue { this.ringFd = ringFd; } - public int process(IOUringCompletionQueueCallback callback) throws Exception { + public int process(IOUringCompletionQueueCallback callback) { int i = 0; for (;;) { long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 9e8d0a25ec..2ce47f8123 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -153,11 +153,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } } - try { - completionQueue.process(this); - } catch (Exception e) { - //Todo handle exception - } + completionQueue.process(this); if (hasTasks()) { runAllTasks(); @@ -201,15 +197,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements if (writeChannel == null) { break; } - //localFlushAmount -> res - logger.trace("EventLoop Write Res: {}", res); - logger.trace("EventLoop Fd: {}", fd); - - if (res == SOCKET_ERROR_EPIPE) { - writeChannel.shutdownInput(false); - } else { - ((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res); - } + ((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res); break; case IOUring.IO_TIMEOUT: if (res == ETIME) { @@ -225,14 +213,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } if (eventfd.intValue() == fd) { pendingWakeup = false; - // We need to consume the data as otherwise we would see another event - // in the completionQueue without - // an extra eventfd_write(....) - Native.eventFdRead(eventfd.intValue()); - - submissionQueue.addPollIn(eventfd.intValue()); - // Submit so its picked up - submissionQueue.submit(); + handleEventFd(submissionQueue); } else { AbstractIOUringChannel channel = channels.get(fd); if (channel == null) { @@ -246,9 +227,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollOut(res); break; case IOUring.POLLMASK_RDHUP: - if (!channel.isActive()) { - channel.shutdownInput(true); - } + ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollRdHup(res); break; default: break; @@ -266,18 +245,27 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements case IOUring.OP_CONNECT: AbstractIOUringChannel channel = channels.get(fd); - System.out.println("Connect res: " + res); if (channel != null) { ((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).connectComplete(res); } break; - default: break; } return true; } + private void handleEventFd(IOUringSubmissionQueue submissionQueue) { + // We need to consume the data as otherwise we would see another event + // in the completionQueue without + // an extra eventfd_write(....) + Native.eventFdRead(eventfd.intValue()); + + submissionQueue.addPollIn(eventfd.intValue()); + // Submit so its picked up + submissionQueue.submit(); + } + @Override protected void cleanup() { try { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 4d1d9411aa..0fbb966623 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -126,12 +126,7 @@ final class IOUringSubmissionQueue { long uData = convertToUserData(op, fd, pollMask); PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData); - //pollread or accept operation - if (op == 6 && (pollMask == IOUring.POLLMASK_OUT_LINK)) { - PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK); - } else { - PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); - } + PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); //c union set Rw-Flags or accept_flags if (op != IOUring.OP_ACCEPT) { @@ -152,8 +147,6 @@ final class IOUringSubmissionQueue { PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); } - - logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); @@ -174,10 +167,6 @@ final class IOUringSubmissionQueue { return addPoll(fd, IOUring.POLLMASK_IN); } - public boolean addPollOutLink(int fd) { - return addPoll(fd, IOUring.POLLMASK_OUT_LINK); - } - public boolean addPollRdHup(int fd) { return addPoll(fd, IOUring.POLLMASK_RDHUP); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IovecArrayPool.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IovecArrayPool.java index 1aa0f42d0b..576f681c39 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IovecArrayPool.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IovecArrayPool.java @@ -54,7 +54,7 @@ final class IovecArrayPool implements MessageProcessor { count = 0; if (remainingIovec.empty()) { - //Todo allocate new Memory + // Todo allocate new Memory return -1; } long index = remainingIovec.pop(); @@ -135,7 +135,7 @@ final class IovecArrayPool implements MessageProcessor { } @Override - public boolean processMessage(Object msg) throws Exception { + public boolean processMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buffer = (ByteBuf) msg; return add(buffer, buffer.readerIndex(), buffer.readableBytes()); diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Errors.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Errors.java index e75b580fe4..c387f37aaf 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Errors.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Errors.java @@ -107,7 +107,7 @@ public final class Errors { } } - static void throwConnectException(String method, int err) + public static void throwConnectException(String method, int err) throws IOException { if (err == ERROR_EALREADY_NEGATIVE) { throw new ConnectionPendingException(); diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index e7016578ca..9f83c19dca 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -929,30 +929,34 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doWrite(outboundBuffer); } catch (Throwable t) { - if (t instanceof IOException && config().isAutoClose()) { - /** - * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of - * failing all flushed messages and also ensure the actual close of the underlying transport - * will happen before the promises are notified. - * - * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} - * may still return {@code true} even if the channel should be closed as result of the exception. - */ - initialCloseCause = t; - close(voidPromise(), t, newClosedChannelException(t), false); - } else { - try { - shutdownOutput(voidPromise(), t); - } catch (Throwable t2) { - initialCloseCause = t; - close(voidPromise(), t2, newClosedChannelException(t), false); - } - } + handleWriteError(t); } finally { inFlush0 = false; } } + protected final void handleWriteError(Throwable t) { + if (t instanceof IOException && config().isAutoClose()) { + /** + * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of + * failing all flushed messages and also ensure the actual close of the underlying transport + * will happen before the promises are notified. + * + * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} + * may still return {@code true} even if the channel should be closed as result of the exception. + */ + initialCloseCause = t; + close(voidPromise(), t, newClosedChannelException(t), false); + } else { + try { + shutdownOutput(voidPromise(), t); + } catch (Throwable t2) { + initialCloseCause = t; + close(voidPromise(), t2, newClosedChannelException(t), false); + } + } + } + private ClosedChannelException newClosedChannelException(Throwable cause) { ClosedChannelException exception = new ClosedChannelException(); if (cause != null) {