diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index feb64c2f38..d82d899b77 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; -import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.DuplexChannel; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; @@ -51,6 +50,11 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple super(parent, fd, remote); } + @Override + protected AbstractUringUnsafe newUnsafe() { + return new IOUringStreamUnsafe(); + } + @Override public ChannelFuture shutdown() { return shutdown(newPromise()); @@ -198,7 +202,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } } - class IOUringStreamUnsafe extends AbstractUringUnsafe { + private final class IOUringStreamUnsafe extends AbstractUringUnsafe { // Overridden here just to be able to access this method from AbstractEpollStreamChannel @Override @@ -272,6 +276,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple scheduleRead(); } else { // We did not fill the whole ByteBuf so we should break the "read loop" and try again later. + allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } } catch (Throwable t) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java index 14ebe7292f..a6afb2791c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java @@ -17,7 +17,6 @@ package io.netty.channel.uring; import io.netty.channel.Channel; import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.unix.FileDescriptor; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -56,9 +55,4 @@ public final class IOUringServerSocketChannel extends AbstractIOUringServerChann socket.listen(config.getBacklog()); active = true; } - - @Override - public FileDescriptor fd() { - return super.fd(); - } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java index 8a112f43dc..cab2f21743 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java @@ -15,33 +15,14 @@ */ package io.netty.channel.uring; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelPromise; -import io.netty.channel.DefaultChannelConfig; -import io.netty.channel.RecvByteBufAllocator; -import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.SocketChannelConfig; -import io.netty.channel.unix.FileDescriptor; -import io.netty.channel.unix.Socket; -import io.netty.channel.uring.AbstractIOUringStreamChannel.IOUringStreamUnsafe; - -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.Collection; -import java.util.Collections; public final class IOUringSocketChannel extends AbstractIOUringStreamChannel implements SocketChannel { private final IOUringSocketChannelConfig config; - //private volatile Collection tcpMd5SigAddresses = Collections.emptyList(); public IOUringSocketChannel() { super(null, LinuxSocket.newSocketStream(), false); @@ -53,35 +34,16 @@ public final class IOUringSocketChannel extends AbstractIOUringStreamChannel imp this.config = new IOUringSocketChannelConfig(this); } - IOUringSocketChannel(Channel parent, LinuxSocket fd, InetSocketAddress remoteAddress) { - super(parent, fd, remoteAddress); - this.config = new IOUringSocketChannelConfig(this); - -// if (parent instanceof IOUringSocketChannel) { -// tcpMd5SigAddresses = ((IOUringSocketChannel) parent).tcpMd5SigAddresses(); -// } - } - @Override public ServerSocketChannel parent() { return (ServerSocketChannel) super.parent(); } - @Override - protected AbstractUringUnsafe newUnsafe() { - return new IOUringStreamUnsafe(); - } - @Override public IOUringSocketChannelConfig config() { return config; } - @Override - public FileDescriptor fd() { - return super.fd(); - } - @Override public InetSocketAddress remoteAddress() { return (InetSocketAddress) super.remoteAddress(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java index 0a566d8a6b..e72e26a79a 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java @@ -27,24 +27,19 @@ import io.netty.channel.socket.SocketChannelConfig; import io.netty.util.internal.PlatformDependent; import java.io.IOException; -import java.net.InetAddress; import java.util.Map; import static io.netty.channel.ChannelOption.*; -import static io.netty.channel.unix.Limits.*; -public class IOUringSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { +public final class IOUringSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig { private volatile boolean allowHalfClosure; - private volatile long maxBytesPerGatheringWrite = SSIZE_MAX; - public IOUringSocketChannelConfig(Channel channel) { + IOUringSocketChannelConfig(Channel channel) { super(channel); if (PlatformDependent.canEnableTcpNoDelayByDefault()) { setTcpNoDelay(true); } - calculateMaxBytesPerGatheringWrite(); - } @Override @@ -333,7 +328,6 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements public IOUringSocketChannelConfig setSendBufferSize(int sendBufferSize) { try { ((IOUringSocketChannel) channel).socket.setSendBufferSize(sendBufferSize); - calculateMaxBytesPerGatheringWrite(); return this; } catch (IOException e) { throw new ChannelException(e); @@ -641,16 +635,4 @@ public class IOUringSocketChannelConfig extends DefaultChannelConfig implements super.setMessageSizeEstimator(estimator); return this; } - - final void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) { - this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite; - } - - private void calculateMaxBytesPerGatheringWrite() { - // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide. - int newSendBufferSize = getSendBufferSize() << 1; - if (newSendBufferSize > 0) { - setMaxBytesPerGatheringWrite(getSendBufferSize() << 1); - } - } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index bee8f2bf18..85c3fc618c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -83,8 +83,7 @@ final class Native { } public static RingBuffer createRingBuffer() { - //Todo throw Exception if it's null - return ioUringSetup(DEFAULT_RING_SIZE); + return createRingBuffer(DEFAULT_RING_SIZE); } private static native RingBuffer ioUringSetup(int entries); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java index 20ba4c9a50..2d884a57ad 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java @@ -37,5 +37,4 @@ final class RingBuffer { getIoUringSubmissionQueue().release(); Native.ioUringExit(this); } - }