From 20ef4690e74a7a5007468815f070a6c1f5074d0d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 10 Apr 2014 08:27:27 +0200 Subject: [PATCH] [#2375] [#2404] Fix bug in respecting ChannelConfig.setAutoRead(false) and also fix Channel.read() for OIO Motivation: At the moment ChanneConfig.setAutoRead(false) only is guaranteer to not have an extra channelRead(...) triggered when used from within the channelRead(...) or channelReadComplete(...) method. This is not the correct behaviour as it should also work from other methods that are triggered from within the EventLoop. For example a valid use case is to have it called from within a ChannelFutureListener, which currently not work as expected. Beside this there is another bug which is kind of related. Currently Channel.read() will not work as expected for OIO as we will stop try to read even if nothing could be read there after one read operation on the socket (when the SO_TIMEOUT kicks in). Modifications: Implement the logic the right way for the NIO/OIO/SCTP and native transport, specific to the transport implementation. Also correctly handle Channel.read() for OIO transport by trigger a new read if SO_TIMEOUT was catched. Result: It is now also possible to use ChannelConfig.setAutoRead(false) from other methods that are called from within the EventLoop and have direct effect. Conflicts: transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java --- .../transport/socket/SocketAutoReadTest.java | 205 ++++++++++++++++++ .../channel/epoll/AbstractEpollChannel.java | 28 ++- .../channel/epoll/EpollDatagramChannel.java | 2 +- .../epoll/EpollDatagramChannelConfig.java | 7 +- .../epoll/EpollServerSocketChannel.java | 2 +- .../epoll/EpollServerSocketChannelConfig.java | 4 + .../channel/epoll/EpollSocketChannel.java | 4 +- .../epoll/EpollSocketChannelConfig.java | 5 + .../channel/sctp/nio/NioSctpChannel.java | 23 +- .../sctp/nio/NioSctpServerChannel.java | 13 +- .../channel/sctp/oio/OioSctpChannel.java | 13 +- .../sctp/oio/OioSctpServerChannel.java | 13 +- .../netty/channel/DefaultChannelConfig.java | 8 + .../channel/nio/AbstractNioByteChannel.java | 16 +- .../netty/channel/nio/AbstractNioChannel.java | 11 +- .../nio/AbstractNioMessageChannel.java | 9 +- .../channel/oio/AbstractOioByteChannel.java | 17 +- .../netty/channel/oio/AbstractOioChannel.java | 21 +- .../oio/AbstractOioMessageChannel.java | 13 +- .../socket/nio/NioDatagramChannel.java | 5 + .../socket/nio/NioDatagramChannelConfig.java | 11 + .../socket/nio/NioServerSocketChannel.java | 14 +- .../channel/socket/nio/NioSocketChannel.java | 14 +- .../DefaultOioServerSocketChannelConfig.java | 12 + .../oio/DefaultOioSocketChannelConfig.java | 12 + .../socket/oio/OioServerSocketChannel.java | 5 + .../channel/socket/oio/OioSocketChannel.java | 5 + 27 files changed, 456 insertions(+), 36 deletions(-) create mode 100644 testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java new file mode 100644 index 0000000000..229efbc40d --- /dev/null +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.fail; + +public class SocketAutoReadTest extends AbstractSocketTest { + private static final Random random = new Random(); + static final byte[] data = new byte[1024]; + + static { + random.nextBytes(data); + } + + // See https://github.com/netty/netty/pull/2375 + @Test(timeout = 30000) + public void testAutoReadDisableOutsideChannelRead() throws Throwable { + run(); + } + + public void testAutoReadDisableOutsideChannelRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { + TestHandler sh = new TestHandler() { + private boolean allBytesReceived; + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + assertFalse(allBytesReceived); + ctx.writeAndFlush(msg); + ctx.channel().eventLoop().execute(new Runnable() { + @Override + public void run() { + ctx.channel().config().setAutoRead(false); + allBytesReceived = true; + } + }); + } + }; + sb.childHandler(sh); + + TestHandler ch = new TestHandler(); + cb.handler(ch); + Channel sc = sb.bind().sync().channel(); + Channel cc = cb.connect().sync().channel(); + cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); + Thread.sleep(500); + cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); + Thread.sleep(500); + cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); + Thread.sleep(500); + + cc.close().sync(); + sc.close().sync(); + + if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { + throw sh.exception.get(); + } + if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { + throw ch.exception.get(); + } + if (sh.exception.get() != null) { + throw sh.exception.get(); + } + if (ch.exception.get() != null) { + throw ch.exception.get(); + } + } + + // See https://github.com/netty/netty/pull/2375 + @Test(timeout = 30000) + public void testAutoReadDisableOutsideChannelReadManualRead() throws Throwable { + run(); + } + + public void testAutoReadDisableOutsideChannelReadManualRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { + + ServerTestHandler sh = new ServerTestHandler(); + sb.childHandler(sh); + + TestHandler ch = new TestHandler(); + cb.handler(ch); + Channel sc = sb.bind().sync().channel(); + Channel cc = cb.connect().sync().channel(); + cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); + Thread.sleep(500); + cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); + Thread.sleep(500); + cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); + Thread.sleep(500); + sh.await(); + cc.close().sync(); + sc.close().sync(); + + if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { + throw sh.exception.get(); + } + if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { + throw ch.exception.get(); + } + if (sh.exception.get() != null) { + throw sh.exception.get(); + } + if (ch.exception.get() != null) { + throw ch.exception.get(); + } + } + + public static class ServerTestHandler extends TestHandler { + enum State { + AUTO_READ, + SCHEDULED, + BYTES_RECEIVED, + READ_SCHEDULED + } + private final CountDownLatch latch = new CountDownLatch(1); + + private State state = State.AUTO_READ; + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + ctx.writeAndFlush(msg); + switch (state) { + case READ_SCHEDULED: + latch.countDown(); + break; + case AUTO_READ: + state = State.SCHEDULED; + ctx.channel().eventLoop().execute(new Runnable() { + @Override + public void run() { + ctx.channel().config().setAutoRead(false); + state = State.BYTES_RECEIVED; + ctx.channel().eventLoop().schedule(new Runnable() { + @Override + public void run() { + state = State.READ_SCHEDULED; + ctx.channel().read(); + } + }, 2, TimeUnit.SECONDS); + } + }); + break; + case BYTES_RECEIVED: + // Once the state is BYTES_RECEIVED we should not receive anymore data. + fail(); + break; + case SCHEDULED: + // nothing to do + break; + } + } + + public void await() throws InterruptedException { + latch.await(); + } + } + + private static class TestHandler extends ChannelInboundHandlerAdapter { + final AtomicReference exception = new AtomicReference(); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, + Throwable cause) throws Exception { + if (exception.compareAndSet(null, cause)) { + cause.printStackTrace(); + ctx.close(); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 6ffae33bea..02a7b4d317 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelMetadata; import io.netty.channel.EventLoop; +import io.netty.util.internal.OneTimeTask; import java.net.InetSocketAddress; import java.nio.channels.UnresolvedAddressException; @@ -103,10 +104,22 @@ abstract class AbstractEpollChannel extends AbstractChannel { } } - protected final void clearEpollIn() { - if ((flags & readFlag) != 0) { - flags &= ~readFlag; - ((EpollEventLoop) eventLoop()).modify(this); + final void clearEpollIn() { + final EventLoop loop = eventLoop(); + final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe(); + if (loop.inEventLoop()) { + unsafe.clearEpollIn0(); + } else { + // schedule a task to clear the EPOLLIN as it is not safe to modify it directly + loop.execute(new OneTimeTask() { + @Override + public void run() { + if (!config().isAutoRead() && !unsafe.readPending) { + // Still no read triggered so clear it now + unsafe.clearEpollIn0(); + } + } + }); } } @@ -183,5 +196,12 @@ abstract class AbstractEpollChannel extends AbstractChannel { private boolean isFlushPending() { return (flags & Native.EPOLLOUT) != 0; } + + protected final void clearEpollIn0() { + if ((flags & readFlag) != 0) { + flags &= ~readFlag; + ((EpollEventLoop) eventLoop()).modify(AbstractEpollChannel.this); + } + } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index c562528de6..9774d816d4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -301,7 +301,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements } if (remoteAddress == null) { - remoteAddress = this.remote; + remoteAddress = remote; if (remoteAddress == null) { throw new NotYetConnectedException(); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java index 18c754611e..8a57706b70 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannelConfig.java @@ -34,7 +34,7 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple EpollDatagramChannelConfig(EpollDatagramChannel channel) { super(channel); - this.datagramChannel = channel; + datagramChannel = channel; setRecvByteBufAllocator(DEFAULT_RCVBUF_ALLOCATOR); } @@ -278,4 +278,9 @@ public final class EpollDatagramChannelConfig extends DefaultChannelConfig imple public EpollDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) { throw new UnsupportedOperationException("Multicast not supported"); } + + @Override + protected void autoReadCleared() { + datagramChannel.clearEpollIn(); + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java index d9b89d73ff..3ed89eca28 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannel.java @@ -124,7 +124,7 @@ public final class EpollServerSocketChannel extends AbstractEpollChannel impleme // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !readPending) { - clearEpollIn(); + clearEpollIn0(); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java index 9b8a947def..1f3f8e8c40 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollServerSocketChannelConfig.java @@ -197,4 +197,8 @@ public final class EpollServerSocketChannelConfig extends DefaultChannelConfig return this; } + @Override + protected void autoReadCleared() { + channel.clearEpollIn(); + } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 12d5d1c365..346d63e548 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -355,7 +355,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So inputShutdown = true; if (isOpen()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - clearEpollIn(); + clearEpollIn0(); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); @@ -657,7 +657,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !readPending) { - clearEpollIn(); + clearEpollIn0(); } } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java index cab8ac0e4b..171b9821de 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannelConfig.java @@ -277,4 +277,9 @@ public final class EpollSocketChannelConfig extends DefaultChannelConfig super.setMessageSizeEstimator(estimator); return this; } + + @Override + protected void autoReadCleared() { + channel.clearEpollIn(); + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index 80d4b7e6e9..714227e441 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -103,7 +103,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett super(parent, sctpChannel, SelectionKey.OP_READ); try { sctpChannel.configureBlocking(false); - config = new DefaultSctpChannelConfig(this, sctpChannel); + config = new NioSctpChannelConfig(this, sctpChannel); notificationHandler = new SctpNotificationHandler(this); } catch (IOException e) { try { @@ -373,11 +373,11 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett private static final class NioSctpChannelOutboundBuffer extends ChannelOutboundBuffer { private static final Recycler RECYCLER = new Recycler() { - @Override - protected NioSctpChannelOutboundBuffer newObject(Handle handle) { - return new NioSctpChannelOutboundBuffer(handle); - } - }; + @Override + protected NioSctpChannelOutboundBuffer newObject(Handle handle) { + return new NioSctpChannelOutboundBuffer(handle); + } + }; static NioSctpChannelOutboundBuffer newInstance(AbstractChannel channel) { NioSctpChannelOutboundBuffer buffer = RECYCLER.get(); @@ -402,4 +402,15 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett return msg; } } + + private final class NioSctpChannelConfig extends DefaultSctpChannelConfig { + private NioSctpChannelConfig(NioSctpChannel channel, SctpChannel javaChannel) { + super(channel, javaChannel); + } + + @Override + protected void autoReadCleared() { + setReadPending(false); + } + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index d5d578b8bf..c9577079c9 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -64,7 +64,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel */ public NioSctpServerChannel() { super(null, newSocket(), SelectionKey.OP_ACCEPT); - config = new DefaultSctpServerChannelConfig(this, javaChannel()); + config = new NioSctpServerChannelConfig(this, javaChannel()); } @Override @@ -220,4 +220,15 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } + + private final class NioSctpServerChannelConfig extends DefaultSctpServerChannelConfig { + private NioSctpServerChannelConfig(NioSctpServerChannel channel, SctpServerChannel javaChannel) { + super(channel, javaChannel); + } + + @Override + protected void autoReadCleared() { + setReadPending(false); + } + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index 58c0f666aa..84dbe56612 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -121,7 +121,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel ch.register(writeSelector, SelectionKey.OP_WRITE); ch.register(connectSelector, SelectionKey.OP_CONNECT); - config = new DefaultSctpChannelConfig(this, ch); + config = new OioSctpChannelConfig(this, ch); notificationHandler = new SctpNotificationHandler(this); success = true; } catch (Exception e) { @@ -448,4 +448,15 @@ public class OioSctpChannel extends AbstractOioMessageChannel } return promise; } + + private final class OioSctpChannelConfig extends DefaultSctpChannelConfig { + private OioSctpChannelConfig(OioSctpChannel channel, SctpChannel javaChannel) { + super(channel, javaChannel); + } + + @Override + protected void autoReadCleared() { + setReadPending(false); + } + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index f787e7304d..37914d13be 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -91,7 +91,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel sch.configureBlocking(false); selector = Selector.open(); sch.register(selector, SelectionKey.OP_ACCEPT); - config = new DefaultSctpServerChannelConfig(this, sch); + config = new OioSctpServerChannelConfig(this, sch); success = true; } catch (Exception e) { throw new ChannelException("failed to initialize a sctp server channel", e); @@ -289,4 +289,15 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel protected void doWrite(ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } + + private final class OioSctpServerChannelConfig extends DefaultSctpServerChannelConfig { + private OioSctpServerChannelConfig(OioSctpServerChannel channel, SctpServerChannel javaChannel) { + super(channel, javaChannel); + } + + @Override + protected void autoReadCleared() { + setReadPending(false); + } + } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 53085c1981..ccacd5a5f6 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -266,10 +266,18 @@ public class DefaultChannelConfig implements ChannelConfig { this.autoRead = autoRead; if (autoRead && !oldAutoRead) { channel.read(); + } else if (!autoRead && oldAutoRead) { + autoReadCleared(); } return this; } + /** + * Is called once {@link #setAutoRead(boolean)} is called with {@code false} and {@link #isAutoRead()} was + * {@code true} before. + */ + protected void autoReadCleared() { } + @Override public boolean isAutoClose() { return autoClose; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 4a5e1dfac8..18ed477afe 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -72,7 +72,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { ByteBuf byteBuf, Throwable cause, boolean close) { if (byteBuf != null) { if (byteBuf.isReadable()) { - readPending = false; + setReadPending(false); pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); @@ -88,6 +88,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override public void read() { final ChannelConfig config = config(); + if (!config.isAutoRead() && !isReadPending()) { + // ChannelConfig.setAutoRead(false) was called in the meantime + removeReadOp(); + return; + } + final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); @@ -102,6 +108,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { try { int byteBufCapacity = allocHandle.guess(); int totalReadAmount = 0; + boolean readPendingReset = false; do { byteBuf = allocator.ioBuffer(byteBufCapacity); int writable = byteBuf.writableBytes(); @@ -112,7 +119,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { close = localReadAmount < 0; break; } - readPending = false; + if (!readPendingReset) { + readPendingReset = true; + setReadPending(false); + } pipeline.fireChannelRead(byteBuf); byteBuf = null; @@ -152,7 +162,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !readPending) { + if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 1ea06db141..c71467ce9e 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -48,6 +48,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected final int readInterestOp; private volatile SelectionKey selectionKey; private volatile boolean inputShutdown; + private volatile boolean readPending; /** * The future of the current connection attempt. If not null, subsequent @@ -111,6 +112,14 @@ public abstract class AbstractNioChannel extends AbstractChannel { return selectionKey; } + protected boolean isReadPending() { + return readPending; + } + + protected void setReadPending(boolean readPending) { + this.readPending = readPending; + } + /** * Return {@code true} if the input of this {@link Channel} is shutdown */ @@ -149,8 +158,6 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { - protected boolean readPending; - protected final void removeReadOp() { SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 47581cc698..c9e537431a 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -52,6 +52,11 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); + if (!config.isAutoRead() && !isReadPending()) { + // ChannelConfig.setAutoRead(false) was called in the meantime + removeReadOp(); + return; + } final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final ChannelPipeline pipeline = pipeline(); @@ -81,7 +86,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { } catch (Throwable t) { exception = t; } - readPending = false; + setReadPending(false); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); @@ -112,7 +117,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !readPending) { + if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index c459a5aa3e..e4fd801c4a 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -17,6 +17,7 @@ package io.netty.channel.oio; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; @@ -72,7 +73,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { if (checkInputShutdown()) { return; } - + final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); // TODO: calculate size as in 3.x @@ -80,9 +81,10 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { boolean closed = false; boolean read = false; Throwable exception = null; + int localReadAmount = 0; try { for (;;) { - int localReadAmount = doReadBytes(byteBuf); + localReadAmount = doReadBytes(byteBuf); if (localReadAmount > 0) { read = true; } else if (localReadAmount < 0) { @@ -112,7 +114,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } } } - if (!config().isAutoRead()) { + if (!config.isAutoRead()) { // stop reading until next Channel.read() call // See https://github.com/netty/netty/issues/1363 break; @@ -149,6 +151,15 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } } } + if (localReadAmount == 0 && isActive()) { + // If the read amount was 0 and the channel is still active we need to trigger a new read() + // as otherwise we will never try to read again and the user will never know. + // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are + // able to process the rest of the tasks in the queue first. + // + // See https://github.com/netty/netty/issues/2404 + read(); + } } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index eff7651cff..bd163b2799 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -31,12 +31,17 @@ public abstract class AbstractOioChannel extends AbstractChannel { protected static final int SO_TIMEOUT = 1000; - private boolean readInProgress; + private volatile boolean readPending; private final Runnable readTask = new Runnable() { @Override public void run() { - readInProgress = false; + if (!isReadPending() && !config().isAutoRead()) { + // ChannelConfig.setAutoRead(false) was called in the meantime so just return + return; + } + + setReadPending(false); doRead(); } }; @@ -94,13 +99,21 @@ public abstract class AbstractOioChannel extends AbstractChannel { @Override protected void doBeginRead() throws Exception { - if (readInProgress) { + if (isReadPending()) { return; } - readInProgress = true; + setReadPending(true); eventLoop().execute(readTask); } protected abstract void doRead(); + + protected boolean isReadPending() { + return readPending; + } + + protected void setReadPending(boolean readPending) { + this.readPending = readPending; + } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index f981698268..3e7033da8f 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -36,15 +36,16 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { @Override protected void doRead() { + final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; - final ChannelConfig config = config(); final int maxMessagesPerRead = config.getMaxMessagesPerRead(); Throwable exception = null; + int localRead = 0; try { for (;;) { - int localRead = doReadMessages(readBuf); + localRead = doReadMessages(readBuf); if (localRead == 0) { break; } @@ -80,6 +81,14 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { if (isOpen()) { unsafe().close(unsafe().voidPromise()); } + } else if (localRead == 0 && isActive()) { + // If the read amount was 0 and the channel is still active we need to trigger a new read() + // as otherwise we will never try to read again and the user will never know. + // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are + // able to process the rest of the tasks in the queue first. + // + // See https://github.com/netty/netty/issues/2404 + read(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index b3bb45281e..0d11bfece8 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -540,4 +540,9 @@ public final class NioDatagramChannel protected ChannelOutboundBuffer newOutboundBuffer() { return NioDatagramChannelOutboundBuffer.newInstance(this); } + + @Override + protected void setReadPending(boolean readPending) { + super.setReadPending(readPending); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java index 0401db8184..c85c962402 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java @@ -161,6 +161,17 @@ class NioDatagramChannelConfig extends DefaultDatagramChannelConfig { return this; } + @Override + public DatagramChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + protected void autoReadCleared() { + ((NioDatagramChannel) channel).setReadPending(false); + } + private Object getOption0(Object option) { if (PlatformDependent.javaVersion() < 7) { throw new UnsupportedOperationException(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index d6751727c9..c0dc36bc76 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -26,6 +26,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.SocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; @@ -81,7 +82,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel */ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); - config = new DefaultServerSocketChannelConfig(this, javaChannel().socket()); + config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } @Override @@ -177,4 +178,15 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } + + private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig { + private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) { + super(channel, javaSocket); + } + + @Override + protected void autoReadCleared() { + setReadPending(false); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index ea45f24d09..456714e712 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -32,6 +32,7 @@ import io.netty.util.internal.OneTimeTask; import java.io.IOException; import java.net.InetSocketAddress; +import java.net.Socket; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; @@ -91,7 +92,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty */ public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); - config = new DefaultSocketChannelConfig(this, socket.socket()); + config = new NioSocketChannelConfig(this, socket.socket()); } @Override @@ -321,4 +322,15 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty protected ChannelOutboundBuffer newOutboundBuffer() { return NioSocketChannelOutboundBuffer.newInstance(this); } + + private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { + private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { + super(channel, javaSocket); + } + + @Override + protected void autoReadCleared() { + setReadPending(false); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java index ab2fa06ca7..934fc23f8b 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java @@ -35,10 +35,15 @@ import static io.netty.channel.ChannelOption.*; public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChannelConfig implements OioServerSocketChannelConfig { + @Deprecated public DefaultOioServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) { super(channel, javaSocket); } + DefaultOioServerSocketChannelConfig(OioServerSocketChannel channel, ServerSocket javaSocket) { + super(channel, javaSocket); + } + @Override public Map, Object> getOptions() { return getOptions( @@ -145,6 +150,13 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan return this; } + @Override + protected void autoReadCleared() { + if (channel instanceof OioServerSocketChannel) { + ((OioServerSocketChannel) channel).setReadPending(false); + } + } + @Override public OioServerSocketChannelConfig setAutoClose(boolean autoClose) { super.setAutoClose(autoClose); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java index ac9fdf72cf..b9d8369e46 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java @@ -33,10 +33,15 @@ import static io.netty.channel.ChannelOption.*; * Default {@link OioSocketChannelConfig} implementation */ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig implements OioSocketChannelConfig { + @Deprecated public DefaultOioSocketChannelConfig(SocketChannel channel, Socket javaSocket) { super(channel, javaSocket); } + DefaultOioSocketChannelConfig(OioSocketChannel channel, Socket javaSocket) { + super(channel, javaSocket); + } + @Override public Map, Object> getOptions() { return getOptions( @@ -173,6 +178,13 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im return this; } + @Override + protected void autoReadCleared() { + if (channel instanceof OioSocketChannel) { + ((OioSocketChannel) channel).setReadPending(false); + } + } + @Override public OioSocketChannelConfig setAutoClose(boolean autoClose) { super.setAutoClose(autoClose); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 04428df54d..72ce74c521 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -193,4 +193,9 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel protected void doDisconnect() throws Exception { throw new UnsupportedOperationException(); } + + @Override + protected void setReadPending(boolean readPending) { + super.setReadPending(readPending); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 7ab6e4541f..dc48bc4e4b 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -232,4 +232,9 @@ public class OioSocketChannel extends OioByteStreamChannel } return false; } + + @Override + protected void setReadPending(boolean readPending) { + super.setReadPending(readPending); + } }