From 5b48fc284ebe85ca4974985e3be005d37626e980 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 5 Apr 2016 10:13:18 -0700 Subject: [PATCH] Make OIO/NIO/EPOLL autoReadClear consistent Motivation: OIO/NIO use a volatile variable to track if a read is pending. EPOLL does not use a volatile an executes a Runnable on the event loop thread to set readPending to false. These mechansims should be consistent, and not using a volatile variable is preferable because the variable is written to frequently in the event loop thread. OIO also does not set readPending to false before each fireChannelRead operation and may result in reading more data than the user desires. Modifications: - OIO/NIO should not use a volatile variable for readPending - OIO should set readPending to false before each fireChannelRead Result: OIO/NIO/EPOLL are more consistent w.r.t. readPending and volatile variable operations are reduced Fixes https://github.com/netty/netty/issues/5069 --- .../transport/socket/SocketAutoReadTest.java | 313 ++++++------ .../socket/SocketExceptionHandlingTest.java | 114 +++++ .../socket/SocketReadPendingTest.java | 199 ++++++++ .../socket/SocketStringEchoTest.java | 4 +- .../epoll/EpollETSocketAutoReadTest.java | 39 ++ .../EpollETSocketExceptionHandlingTest.java | 39 ++ .../epoll/EpollETSocketReadPendingTest.java | 39 ++ .../epoll/EpollLTSocketAutoReadTest.java | 39 ++ .../EpollLTSocketExceptionHandlingTest.java | 39 ++ .../epoll/EpollLTSocketReadPendingTest.java | 39 ++ .../channel/epoll/EpollSocketChannelTest.java | 473 ------------------ .../channel/sctp/nio/NioSctpChannel.java | 2 +- .../sctp/nio/NioSctpServerChannel.java | 2 +- .../channel/sctp/oio/OioSctpChannel.java | 2 +- .../sctp/oio/OioSctpServerChannel.java | 2 +- .../channel/nio/AbstractNioByteChannel.java | 11 +- .../netty/channel/nio/AbstractNioChannel.java | 53 +- .../nio/AbstractNioMessageChannel.java | 9 +- .../channel/oio/AbstractOioByteChannel.java | 54 +- .../netty/channel/oio/AbstractOioChannel.java | 59 ++- .../oio/AbstractOioMessageChannel.java | 37 +- .../socket/nio/NioDatagramChannel.java | 5 + .../socket/nio/NioDatagramChannelConfig.java | 2 +- .../socket/nio/NioServerSocketChannel.java | 2 +- .../channel/socket/nio/NioSocketChannel.java | 2 +- .../DefaultOioServerSocketChannelConfig.java | 2 +- .../oio/DefaultOioSocketChannelConfig.java | 2 +- .../socket/oio/OioServerSocketChannel.java | 5 + .../channel/socket/oio/OioSocketChannel.java | 5 + 29 files changed, 902 insertions(+), 691 deletions(-) create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketExceptionHandlingTest.java create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java index 2ec708e4b4..319a133600 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java @@ -17,10 +17,16 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.RecvByteBufAllocator; import io.netty.util.ReferenceCountUtil; import org.junit.Test; @@ -28,174 +34,193 @@ import java.io.IOException; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SocketAutoReadTest extends AbstractSocketTest { - private static final Random random = new Random(); - static final byte[] data = new byte[1024]; - - static { - random.nextBytes(data); - } - - // See https://github.com/netty/netty/pull/2375 - @Test(timeout = 30000) - public void testAutoReadDisableOutsideChannelRead() throws Throwable { + @Test + public void testAutoReadOffDuringReadOnlyReadsOneTime() throws Throwable { run(); } - public void testAutoReadDisableOutsideChannelRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - TestHandler sh = new TestHandler() { - private boolean allBytesReceived; - @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - assertFalse(allBytesReceived); - ctx.writeAndFlush(msg); - ctx.channel().eventLoop().execute(new Runnable() { - @Override - public void run() { - ctx.channel().config().setAutoRead(false); - allBytesReceived = true; - } - }); + public void testAutoReadOffDuringReadOnlyReadsOneTime(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testAutoReadOffDuringReadOnlyReadsOneTime(true, sb, cb); + testAutoReadOffDuringReadOnlyReadsOneTime(false, sb, cb); + } + + private void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread, + ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); + AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); + sb.option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.AUTO_READ, true) + .childOption(ChannelOption.AUTO_READ, true) + // We want to ensure that we attempt multiple individual read operations per read loop so we can + // test the auto read feature being turned off when data is first read. + .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) + .childHandler(serverInitializer); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + + cb.remoteAddress(serverChannel.localAddress()) + .option(ChannelOption.AUTO_READ, true) + // We want to ensure that we attempt multiple individual read operations per read loop so we can + // test the auto read feature being turned off when data is first read. + .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) + .handler(clientInitializer); + + clientChannel = cb.connect().syncUninterruptibly().channel(); + + // 3 bytes means 3 independent reads for TestRecvByteBufAllocator + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); + serverInitializer.autoReadHandler.assertSingleRead(); + + // 3 bytes means 3 independent reads for TestRecvByteBufAllocator + serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); + clientInitializer.autoReadHandler.assertSingleRead(); + + if (readOutsideEventLoopThread) { + serverInitializer.channel.read(); } - }; - sb.childHandler(sh); + serverInitializer.autoReadHandler.assertSingleReadSecondTry(); - TestHandler ch = new TestHandler(); - cb.handler(ch); - Channel sc = sb.bind().sync().channel(); - Channel cc = cb.connect().sync().channel(); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - - cc.close().sync(); - sc.close().sync(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); + if (readOutsideEventLoopThread) { + clientChannel.read(); + } + clientInitializer.autoReadHandler.assertSingleReadSecondTry(); + } finally { + if (clientChannel != null) { + clientChannel.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } } } - // See https://github.com/netty/netty/pull/2375 - @Test(timeout = 30000) - public void testAutoReadDisableOutsideChannelReadManualRead() throws Throwable { - run(); - } + private static class AutoReadInitializer extends ChannelInitializer { + final AutoReadHandler autoReadHandler; + volatile Channel channel; - public void testAutoReadDisableOutsideChannelReadManualRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - - ServerTestHandler sh = new ServerTestHandler(); - sb.childHandler(sh); - - TestHandler ch = new TestHandler(); - cb.handler(ch); - Channel sc = sb.bind().sync().channel(); - Channel cc = cb.connect().sync().channel(); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - sh.await(); - cc.close().sync(); - sc.close().sync(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); + AutoReadInitializer(boolean readInEventLoop) { + autoReadHandler = new AutoReadHandler(readInEventLoop); } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); + + @Override + protected void initChannel(Channel ch) throws Exception { + channel = ch; + ch.pipeline().addLast(autoReadHandler); } } - public static class ServerTestHandler extends TestHandler { - enum State { - AUTO_READ, - SCHEDULED, - BYTES_RECEIVED, - READ_SCHEDULED - } + private static final class AutoReadHandler extends ChannelInboundHandlerAdapter { + private final AtomicInteger count = new AtomicInteger(); private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch2; + private final boolean callRead; - private State state = State.AUTO_READ; - - @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - ctx.writeAndFlush(msg); - switch (state) { - case READ_SCHEDULED: - latch.countDown(); - break; - case AUTO_READ: - state = State.SCHEDULED; - ctx.channel().eventLoop().execute(new Runnable() { - @Override - public void run() { - ctx.channel().config().setAutoRead(false); - state = State.BYTES_RECEIVED; - ctx.channel().eventLoop().schedule(new Runnable() { - @Override - public void run() { - state = State.READ_SCHEDULED; - ctx.channel().read(); - } - }, 2, TimeUnit.SECONDS); - } - }); - break; - case BYTES_RECEIVED: - // Once the state is BYTES_RECEIVED we should not receive anymore data. - fail(); - break; - case SCHEDULED: - // nothing to do - break; - } - } - - public void await() throws InterruptedException { - latch.await(); - } - } - - private static class TestHandler extends ChannelInboundHandlerAdapter { - final AtomicReference exception = new AtomicReference(); - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - if (exception.compareAndSet(null, cause)) { - cause.printStackTrace(); - ctx.close(); - } + AutoReadHandler(boolean callRead) { + this.callRead = callRead; + latch2 = new CountDownLatch(callRead ? 3 : 2); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ReferenceCountUtil.release(msg); + if (count.incrementAndGet() == 1) { + ctx.channel().config().setAutoRead(false); + } + if (callRead) { + // Test calling read in the EventLoop thread to ensure a read is eventually done. + ctx.read(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + latch2.countDown(); + } + + void assertSingleRead() throws InterruptedException { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertTrue(count.get() > 0); + } + + void assertSingleReadSecondTry() throws InterruptedException { + assertTrue(latch2.await(5, TimeUnit.SECONDS)); + assertEquals(callRead ? 3 : 2, count.get()); + } + } + + /** + * Designed to keep reading as long as autoread is enabled. + */ + private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator { + @Override + public Handle newHandle() { + return new Handle() { + private ChannelConfig config; + private int attemptedBytesRead; + private int lastBytesRead; + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess(), guess()); + } + + @Override + public int guess() { + return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. + } + + @Override + public void reset(ChannelConfig config) { + this.config = config; + } + + @Override + public void incMessagesRead(int numMessages) { + // No need to track the number of messages read because it is not used. + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public boolean continueReading() { + return config.isAutoRead(); + } + + @Override + public void readComplete() { + // Nothing needs to be done or adjusted after each read cycle is completed. + } + }; } } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketExceptionHandlingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketExceptionHandlingTest.java new file mode 100644 index 0000000000..36aad045a3 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketExceptionHandlingTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SocketExceptionHandlingTest extends AbstractSocketTest { + @Test + public void testReadPendingIsResetAfterEachRead() throws Throwable { + run(); + } + + public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + MyInitializer serverInitializer = new MyInitializer(); + sb.option(ChannelOption.SO_BACKLOG, 1024); + sb.childHandler(serverInitializer); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + + cb.remoteAddress(serverChannel.localAddress()) + .handler(new MyInitializer()); + clientChannel = cb.connect().syncUninterruptibly().channel(); + + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024])); + + // We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler). + assertTrue(serverInitializer.exceptionHandler.latch1.await(5, TimeUnit.SECONDS)); + + // After we get the first exception, we should get no more, this is expected to timeout. + assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() + + " exceptions when 1 was expected", + serverInitializer.exceptionHandler.latch2.await(1, TimeUnit.SECONDS)); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (clientChannel != null) { + clientChannel.close().syncUninterruptibly(); + } + } + } + + private static class MyInitializer extends ChannelInitializer { + final ExceptionHandler exceptionHandler = new ExceptionHandler(); + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast(new BuggyChannelHandler()); + pipeline.addLast(exceptionHandler); + } + } + + private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + throw new NullPointerException("I am a bug!"); + } + } + + private static class ExceptionHandler extends ChannelInboundHandlerAdapter { + final AtomicLong count = new AtomicLong(); + /** + * We expect to get 1 call to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}. + */ + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (count.incrementAndGet() <= 2) { + latch1.countDown(); + } else { + latch2.countDown(); + } + // This should not throw any exception. + ctx.close(); + } + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java new file mode 100644 index 0000000000..e3f2846520 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java @@ -0,0 +1,199 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SocketReadPendingTest extends AbstractSocketTest { + @Test + public void testReadPendingIsResetAfterEachRead() throws Throwable { + run(); + } + + public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + ReadPendingInitializer serverInitializer = new ReadPendingInitializer(); + ReadPendingInitializer clientInitializer = new ReadPendingInitializer(); + sb.option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.AUTO_READ, true) + .childOption(ChannelOption.AUTO_READ, false) + // We intend to do 2 reads per read loop wakeup + .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) + .childHandler(serverInitializer); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + + cb.remoteAddress(serverChannel.localAddress()) + .option(ChannelOption.AUTO_READ, false) + // We intend to do 2 reads per read loop wakeup + .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) + .handler(clientInitializer); + clientChannel = cb.connect().syncUninterruptibly().channel(); + + // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); + + // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator + assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS)); + serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); + + serverInitializer.channel.read(); + serverInitializer.readPendingHandler.assertAllRead(); + + clientChannel.read(); + clientInitializer.readPendingHandler.assertAllRead(); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (clientChannel != null) { + clientChannel.close().syncUninterruptibly(); + } + } + } + + private static class ReadPendingInitializer extends ChannelInitializer { + final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler(); + final CountDownLatch channelInitLatch = new CountDownLatch(1); + volatile Channel channel; + + @Override + protected void initChannel(Channel ch) throws Exception { + channel = ch; + ch.pipeline().addLast(readPendingHandler); + channelInitLatch.countDown(); + } + } + + private static final class ReadPendingReadHandler extends ChannelInboundHandlerAdapter { + private final AtomicInteger count = new AtomicInteger(); + private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch2 = new CountDownLatch(2); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + if (count.incrementAndGet() == 1) { + // Call read the first time, to ensure it is not reset the second time. + ctx.read(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + latch2.countDown(); + } + + void assertAllRead() throws InterruptedException { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + // We should only do 1 read loop, because we only called read() on the first channelRead. + assertFalse(latch2.await(1, TimeUnit.SECONDS)); + assertEquals(2, count.get()); + } + } + + /** + * Designed to keep reading as long as autoread is enabled. + */ + private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { + private final int numReads; + TestNumReadsRecvByteBufAllocator(int numReads) { + this.numReads = numReads; + } + + @Override + public Handle newHandle() { + return new Handle() { + private int attemptedBytesRead; + private int lastBytesRead; + private int numMessagesRead; + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess(), guess()); + } + + @Override + public int guess() { + return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. + } + + @Override + public void reset(ChannelConfig config) { + numMessagesRead = 0; + } + + @Override + public void incMessagesRead(int numMessages) { + numMessagesRead += numMessages; + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public boolean continueReading() { + return numMessagesRead < numReads; + } + + @Override + public void readComplete() { + // Nothing needs to be done or adjusted after each read cycle is completed. + } + }; + } + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java index c5bd0883c0..77f2c4ea5c 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java @@ -52,7 +52,7 @@ public class SocketStringEchoTest extends AbstractSocketTest { } } - @Test + @Test(timeout = 30000) public void testStringEcho() throws Throwable { run(); } @@ -61,7 +61,7 @@ public class SocketStringEchoTest extends AbstractSocketTest { testStringEcho(sb, cb, true); } - @Test + @Test(timeout = 30000) public void testStringEchoNotAutoRead() throws Throwable { run(); } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java new file mode 100644 index 0000000000..8e35662646 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketAutoReadTest; + +import java.util.List; + +public class EpollETSocketAutoReadTest extends SocketAutoReadTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java new file mode 100644 index 0000000000..29ccc9a2fa --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketExceptionHandlingTest; + +import java.util.List; + +public class EpollETSocketExceptionHandlingTest extends SocketExceptionHandlingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java new file mode 100644 index 0000000000..38697ded19 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketReadPendingTest; + +import java.util.List; + +public class EpollETSocketReadPendingTest extends SocketReadPendingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java new file mode 100644 index 0000000000..cd4f7c2d0b --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketAutoReadTest; + +import java.util.List; + +public class EpollLTSocketAutoReadTest extends SocketAutoReadTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java new file mode 100644 index 0000000000..d9ad6c1eec --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketExceptionHandlingTest; + +import java.util.List; + +public class EpollLTSocketExceptionHandlingTest extends SocketExceptionHandlingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java new file mode 100644 index 0000000000..b12516a318 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketReadPendingTest; + +import java.util.List; + +public class EpollLTSocketReadPendingTest extends SocketReadPendingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java index d791b90ef2..54b2ded43d 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java @@ -17,18 +17,14 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; import io.netty.util.ReferenceCountUtil; import org.junit.Assert; @@ -38,10 +34,8 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -120,471 +114,4 @@ public class EpollSocketChannelTest { Assert.assertTrue(info.rcvSpace() >= 0); Assert.assertTrue(info.totalRetrans() >= 0); } - - @Test - public void testExceptionHandlingDoesNotInfiniteLoop() throws InterruptedException { - EventLoopGroup group = new EpollEventLoopGroup(); - try { - runExceptionHandleFeedbackLoop(group, EpollServerSocketChannel.class, EpollSocketChannel.class, - new InetSocketAddress(0)); - runExceptionHandleFeedbackLoop(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class, - EpollSocketTestPermutation.newSocketAddress()); - } finally { - group.shutdownGracefully(); - } - } - - @Test - public void testAutoReadOffDuringReadOnlyReadsOneTime() throws InterruptedException { - EventLoopGroup group = new EpollEventLoopGroup(); - try { - runAutoReadTest(true, group, EpollServerSocketChannel.class, - EpollSocketChannel.class, new InetSocketAddress(0)); - runAutoReadTest(true, group, EpollServerDomainSocketChannel.class, - EpollDomainSocketChannel.class, EpollSocketTestPermutation.newSocketAddress()); - runAutoReadTest(false, group, EpollServerSocketChannel.class, - EpollSocketChannel.class, new InetSocketAddress(0)); - runAutoReadTest(false, group, EpollServerDomainSocketChannel.class, - EpollDomainSocketChannel.class, EpollSocketTestPermutation.newSocketAddress()); - } finally { - group.shutdownGracefully(); - } - } - - @Test - public void testReadPendingIsResetAfterEachRead() throws InterruptedException { - EventLoopGroup group = new EpollEventLoopGroup(); - try { - runReadPendingTest(group, EpollServerSocketChannel.class, EpollSocketChannel.class, - new InetSocketAddress(0)); - runReadPendingTest(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class, - EpollSocketTestPermutation.newSocketAddress()); - } finally { - group.shutdownGracefully(); - } - } - - private void runAutoReadTest(boolean readOutsideEventLoopThread, EventLoopGroup group, - Class serverChannelClass, - Class channelClass, SocketAddress bindAddr) - throws InterruptedException { - Channel serverChannel = null; - Channel clientChannel = null; - try { - AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); - AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); - ServerBootstrap sb = new ServerBootstrap(); - sb.option(ChannelOption.SO_BACKLOG, 1024) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, true) - .group(group) - .channel(serverChannelClass) - .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .childOption(ChannelOption.AUTO_READ, true) - // We want to ensure that we attempt multiple individual read operations per read loop so we can - // test the auto read feature being turned off when data is first read. - .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) - .childHandler(serverInitializer); - - serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); - - Bootstrap b = new Bootstrap() - .group(group) - .channel(channelClass) - .remoteAddress(serverChannel.localAddress()) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, true) - // We want to ensure that we attempt multiple individual read operations per read loop so we can - // test the auto read feature being turned off when data is first read. - .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) - .handler(clientInitializer); - clientChannel = b.connect().syncUninterruptibly().channel(); - - // 3 bytes means 3 independent reads for TestRecvByteBufAllocator - clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); - serverInitializer.autoReadHandler.assertSingleRead(); - - // 3 bytes means 3 independent reads for TestRecvByteBufAllocator - serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); - clientInitializer.autoReadHandler.assertSingleRead(); - - if (readOutsideEventLoopThread) { - serverInitializer.channel.read(); - } - serverInitializer.autoReadHandler.assertSingleReadSecondTry(); - - if (readOutsideEventLoopThread) { - clientChannel.read(); - } - clientInitializer.autoReadHandler.assertSingleReadSecondTry(); - } finally { - if (serverChannel != null) { - serverChannel.close().syncUninterruptibly(); - } - if (clientChannel != null) { - clientChannel.close().syncUninterruptibly(); - } - } - } - - private void runReadPendingTest(EventLoopGroup group, - Class serverChannelClass, - Class channelClass, SocketAddress bindAddr) - throws InterruptedException { - Channel serverChannel = null; - Channel clientChannel = null; - try { - ReadPendingInitializer serverInitializer = new ReadPendingInitializer(); - ReadPendingInitializer clientInitializer = new ReadPendingInitializer(); - ServerBootstrap sb = new ServerBootstrap(); - sb.option(ChannelOption.SO_BACKLOG, 1024) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, true) - .group(group) - .channel(serverChannelClass) - .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .childOption(ChannelOption.AUTO_READ, false) - // We intend to do 2 reads per read loop wakeup - .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) - .childHandler(serverInitializer); - - serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); - - Bootstrap b = new Bootstrap() - .group(group) - .channel(channelClass) - .remoteAddress(serverChannel.localAddress()) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, false) - // We intend to do 2 reads per read loop wakeup - .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) - .handler(clientInitializer); - clientChannel = b.connect().syncUninterruptibly().channel(); - - // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator - clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); - - // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator - assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS)); - serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); - - serverInitializer.channel.read(); - serverInitializer.readPendingHandler.assertAllRead(); - - clientChannel.read(); - clientInitializer.readPendingHandler.assertAllRead(); - } finally { - if (serverChannel != null) { - serverChannel.close().syncUninterruptibly(); - } - if (clientChannel != null) { - clientChannel.close().syncUninterruptibly(); - } - } - } - - private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class serverChannelClass, - Class channelClass, SocketAddress bindAddr) throws InterruptedException { - Channel serverChannel = null; - Channel clientChannel = null; - try { - MyInitializer serverInitializer = new MyInitializer(); - ServerBootstrap sb = new ServerBootstrap(); - sb.option(ChannelOption.SO_BACKLOG, 1024); - sb.group(group) - .channel(serverChannelClass) - .childHandler(serverInitializer); - - serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); - - Bootstrap b = new Bootstrap(); - b.group(group); - b.channel(channelClass); - b.remoteAddress(serverChannel.localAddress()); - b.handler(new MyInitializer()); - clientChannel = b.connect().syncUninterruptibly().channel(); - - clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024])); - - // We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler). - assertTrue(serverInitializer.exceptionHandler.latch1.await(2, TimeUnit.SECONDS)); - - // After we get the first exception, we should get no more, this is expected to timeout. - assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() + - " exceptions when 1 was expected", - serverInitializer.exceptionHandler.latch2.await(2, TimeUnit.SECONDS)); - } finally { - if (serverChannel != null) { - serverChannel.close().syncUninterruptibly(); - } - if (clientChannel != null) { - clientChannel.close().syncUninterruptibly(); - } - } - } - - /** - * Designed to keep reading as long as autoread is enabled. - */ - private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { - private final int numReads; - TestNumReadsRecvByteBufAllocator(int numReads) { - this.numReads = numReads; - } - - @Override - public Handle newHandle() { - return new Handle() { - private ChannelConfig config; - private int attemptedBytesRead; - private int lastBytesRead; - private int numMessagesRead; - @Override - public ByteBuf allocate(ByteBufAllocator alloc) { - return alloc.ioBuffer(guess()); - } - - @Override - public int guess() { - return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. - } - - @Override - public void reset(ChannelConfig config) { - this.config = config; - numMessagesRead = 0; - } - - @Override - public void incMessagesRead(int numMessages) { - numMessagesRead += numMessages; - } - - @Override - public void lastBytesRead(int bytes) { - lastBytesRead = bytes; - } - - @Override - public int lastBytesRead() { - return lastBytesRead; - } - - @Override - public void attemptedBytesRead(int bytes) { - attemptedBytesRead = bytes; - } - - @Override - public int attemptedBytesRead() { - return attemptedBytesRead; - } - - @Override - public boolean continueReading() { - return numMessagesRead < numReads; - } - - @Override - public void readComplete() { - } - }; - } - } - - /** - * Designed to keep reading as long as autoread is enabled. - */ - private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator { - @Override - public Handle newHandle() { - return new Handle() { - private ChannelConfig config; - private int attemptedBytesRead; - private int lastBytesRead; - @Override - public ByteBuf allocate(ByteBufAllocator alloc) { - return alloc.ioBuffer(guess()); - } - - @Override - public int guess() { - return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. - } - - @Override - public void reset(ChannelConfig config) { - this.config = config; - } - - @Override - public void incMessagesRead(int numMessages) { - } - - @Override - public void lastBytesRead(int bytes) { - lastBytesRead = bytes; - } - - @Override - public int lastBytesRead() { - return lastBytesRead; - } - - @Override - public void attemptedBytesRead(int bytes) { - attemptedBytesRead = bytes; - } - - @Override - public int attemptedBytesRead() { - return attemptedBytesRead; - } - - @Override - public boolean continueReading() { - return config.isAutoRead(); - } - - @Override - public void readComplete() { - } - }; - } - } - - private static class AutoReadInitializer extends ChannelInitializer { - final AutoReadHandler autoReadHandler; - volatile Channel channel; - - AutoReadInitializer(boolean readInEventLoop) { - autoReadHandler = new AutoReadHandler(readInEventLoop); - } - - @Override - protected void initChannel(Channel ch) throws Exception { - channel = ch; - ch.pipeline().addLast(autoReadHandler); - } - } - - private static class ReadPendingInitializer extends ChannelInitializer { - final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler(); - final CountDownLatch channelInitLatch = new CountDownLatch(1); - volatile Channel channel; - - @Override - protected void initChannel(Channel ch) throws Exception { - channel = ch; - ch.pipeline().addLast(readPendingHandler); - channelInitLatch.countDown(); - } - } - - private static class MyInitializer extends ChannelInitializer { - final ExceptionHandler exceptionHandler = new ExceptionHandler(); - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - - pipeline.addLast(new BuggyChannelHandler()); - pipeline.addLast(exceptionHandler); - } - } - - private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ReferenceCountUtil.release(msg); - throw new NullPointerException("I am a bug!"); - } - } - - private static final class ReadPendingReadHandler extends ChannelInboundHandlerAdapter { - private final AtomicInteger count = new AtomicInteger(); - private final CountDownLatch latch = new CountDownLatch(1); - private final CountDownLatch latch2 = new CountDownLatch(2); - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ReferenceCountUtil.release(msg); - if (count.incrementAndGet() == 1) { - // Call read the first time, to ensure it is not reset the second time. - ctx.read(); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - latch.countDown(); - latch2.countDown(); - } - - void assertAllRead() throws InterruptedException { - assertTrue(latch.await(5, TimeUnit.SECONDS)); - // We should only do 1 read loop, because we only called read() on the first channelRead. - assertFalse(latch2.await(1, TimeUnit.SECONDS)); - assertEquals(2, count.get()); - } - } - - private static final class AutoReadHandler extends ChannelInboundHandlerAdapter { - private final AtomicInteger count = new AtomicInteger(); - private final CountDownLatch latch = new CountDownLatch(1); - private final CountDownLatch latch2; - private final boolean callRead; - - AutoReadHandler(boolean callRead) { - this.callRead = callRead; - latch2 = new CountDownLatch(callRead ? 3 : 2); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ReferenceCountUtil.release(msg); - if (count.incrementAndGet() == 1) { - ctx.channel().config().setAutoRead(false); - } - if (callRead) { - // Test calling read in the EventLoop thread to ensure a read is eventually done. - ctx.read(); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - latch.countDown(); - latch2.countDown(); - } - - void assertSingleRead() throws InterruptedException { - assertTrue(latch.await(5, TimeUnit.SECONDS)); - assertTrue(count.get() > 0); - } - - void assertSingleReadSecondTry() throws InterruptedException { - assertTrue(latch2.await(5, TimeUnit.SECONDS)); - assertEquals(callRead ? 3 : 2, count.get()); - } - } - - private static class ExceptionHandler extends ChannelInboundHandlerAdapter { - final AtomicLong count = new AtomicLong(); - /** - * We expect to get 1 call to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}. - */ - final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (count.incrementAndGet() <= 2) { - latch1.countDown(); - } else { - latch2.countDown(); - } - // This should not throw any exception. - ctx.close(); - } - } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index f8d07a0364..7bca31b035 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -398,7 +398,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 1e420d49e9..a27a26830d 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -234,7 +234,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index 830f3aef7c..22b9235edc 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -464,7 +464,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index 37c778b68c..6a65213e4f 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -303,7 +303,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index d5550b0fc8..69f99b1575 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -76,7 +76,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { - setReadPending(false); + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); @@ -93,11 +93,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override public final void read() { final ChannelConfig config = config(); - if (!config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime - removeReadOp(); - return; - } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); @@ -118,7 +113,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } allocHandle.incMessagesRead(1); - setReadPending(false); + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); @@ -138,7 +133,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !isReadPending()) { + if (!readPending && !config.isAutoRead()) { removeReadOp(); } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index ba68408c23..981f7c8b15 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -61,7 +61,13 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected final int readInterestOp; volatile SelectionKey selectionKey; private volatile boolean inputShutdown; - private volatile boolean readPending; + boolean readPending; + private final Runnable clearReadPendingRunnable = new Runnable() { + @Override + public void run() { + readPending = false; + } + }; /** * The future of the current connection attempt. If not null, subsequent @@ -125,12 +131,53 @@ public abstract class AbstractNioChannel extends AbstractChannel { return selectionKey; } + /** + * @deprecated No longer supported. + * No longer supported. + */ + @Deprecated protected boolean isReadPending() { return readPending; } - protected void setReadPending(boolean readPending) { - this.readPending = readPending; + /** + * @deprecated Use {@link #clearReadPending()} if appropriate instead. + * No longer supported. + */ + @Deprecated + protected void setReadPending(final boolean readPending) { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + this.readPending = readPending; + } else { + eventLoop.execute(new OneTimeTask() { + @Override + public void run() { + AbstractNioChannel.this.readPending = readPending; + } + }); + } + } else { + this.readPending = readPending; + } + } + + /** + * Set read pending to {@code false}. + */ + protected final void clearReadPending() { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + readPending = false; + } else { + eventLoop.execute(clearReadPendingRunnable); + } + } else { + // Best effort if we are not registered yet clear readPending. This happens during channel initialization. + readPending = false; + } } /** diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 7f1c804fd3..1d606e075b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -54,11 +54,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); - if (!config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime - removeReadOp(); - return; - } final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); @@ -85,7 +80,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { int size = readBuf.size(); for (int i = 0; i < size; i ++) { - setReadPending(false); + readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); @@ -115,7 +110,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !isReadPending()) { + if (!readPending && !config.isAutoRead()) { removeReadOp(); } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 129a6b69a1..8c10cf2b20 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -93,7 +93,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { - setReadPending(false); + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); @@ -110,12 +110,14 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { @Override protected void doRead() { final ChannelConfig config = config(); - if (isInputShutdown() || !config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime + if (isInputShutdown() || !readPending) { + // We have to check readPending here because the Runnable to read could have been scheduled and later + // during the same read loop readPending was set to false. return; } - // OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run. - setReadPending(false); + // In OIO we should set readPending to false even if the read was not successful so we can schedule + // another read on the event loop if no reads are done. + readPending = false; final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); @@ -123,19 +125,22 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { allocHandle.reset(config); ByteBuf byteBuf = null; - boolean read = false; + boolean close = false; + boolean readData = false; try { byteBuf = allocHandle.allocate(allocator); do { allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { - if (!read) { // nothing was read. release the buffer. + if (!byteBuf.isReadable()) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; + close = allocHandle.lastBytesRead() < 0; } break; + } else { + readData = true; } - read = true; final int available = available(); if (available <= 0) { @@ -148,7 +153,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { final int maxCapacity = byteBuf.maxCapacity(); if (capacity == maxCapacity) { allocHandle.incMessagesRead(1); - read = false; + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = allocHandle.allocate(allocator); } else { @@ -162,27 +167,32 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } } while (allocHandle.continueReading()); - if (read) { - pipeline.fireChannelRead(byteBuf); + if (byteBuf != null) { + // It is possible we allocated a buffer because the previous one was not writable, but then didn't use + // it because allocHandle.continueReading() returned false. + if (byteBuf.isReadable()) { + readPending = false; + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } byteBuf = null; } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); + if (readData) { + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } - if (allocHandle.lastBytesRead() < 0) { + if (close) { closeOnRead(pipeline); } } catch (Throwable t) { - handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); + handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { - if (allocHandle.lastBytesRead() == 0 && isActive()) { - // If the read amount was 0 and the channel is still active we need to trigger a new read() - // as otherwise we will never try to read again and the user will never know. - // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are - // able to process the rest of the tasks in the queue first. - // - // See https://github.com/netty/netty/issues/2404 + if (readPending || config.isAutoRead() || !readData && isActive()) { + // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we + // should execute read() again because no data may have been read. read(); } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 553eb3156a..f23f58f0c8 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.ThreadPerChannelEventLoop; +import io.netty.util.internal.OneTimeTask; import java.net.SocketAddress; @@ -30,14 +31,19 @@ public abstract class AbstractOioChannel extends AbstractChannel { protected static final int SO_TIMEOUT = 1000; - private volatile boolean readPending; - + boolean readPending; private final Runnable readTask = new Runnable() { @Override public void run() { doRead(); } }; + private final Runnable clearReadPendingRunnable = new Runnable() { + @Override + public void run() { + readPending = false; + } + }; /** * @see AbstractChannel#AbstractChannel(Channel) @@ -87,21 +93,62 @@ public abstract class AbstractOioChannel extends AbstractChannel { @Override protected void doBeginRead() throws Exception { - if (isReadPending()) { + if (readPending) { return; } - setReadPending(true); + readPending = true; eventLoop().execute(readTask); } protected abstract void doRead(); + /** + * @deprecated No longer supported. + * No longer supported. + */ + @Deprecated protected boolean isReadPending() { return readPending; } - protected void setReadPending(boolean readPending) { - this.readPending = readPending; + /** + * @deprecated Use {@link #clearReadPending()} if appropriate instead. + * No longer supported. + */ + @Deprecated + protected void setReadPending(final boolean readPending) { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + this.readPending = readPending; + } else { + eventLoop.execute(new OneTimeTask() { + @Override + public void run() { + AbstractOioChannel.this.readPending = readPending; + } + }); + } + } else { + this.readPending = readPending; + } + } + + /** + * Set read pending to {@code false}. + */ + protected final void clearReadPending() { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + readPending = false; + } else { + eventLoop.execute(clearReadPendingRunnable); + } + } else { + // Best effort if we are not registered yet clear readPending. This happens during channel initialization. + readPending = false; + } } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index e9a30cd456..0543b83c13 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -37,14 +37,16 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { @Override protected void doRead() { - final ChannelConfig config = config(); - if (!config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime + if (!readPending) { + // We have to check readPending here because the Runnable to read could have been scheduled and later + // during the same read loop readPending was set to false. return; } - // OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run. - setReadPending(false); + // In OIO we should set readPending to false even if the read was not successful so we can schedule + // another read on the event loop if no reads are done. + readPending = false; + final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); @@ -69,13 +71,18 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { exception = t; } + boolean readData = false; int size = readBuf.size(); - for (int i = 0; i < size; i ++) { - pipeline.fireChannelRead(readBuf.get(i)); + if (size > 0) { + readData = true; + for (int i = 0; i < size; i++) { + readPending = false; + pipeline.fireChannelRead(readBuf.get(i)); + } + readBuf.clear(); + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); } - readBuf.clear(); - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException) { @@ -89,13 +96,9 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { if (isOpen()) { unsafe().close(unsafe().voidPromise()); } - } else if (allocHandle.lastBytesRead() == 0 && isActive()) { - // If the read amount was 0 and the channel is still active we need to trigger a new read() - // as otherwise we will never try to read again and the user will never know. - // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are - // able to process the rest of the tasks in the queue first. - // - // See https://github.com/netty/netty/issues/2404 + } else if (readPending || config.isAutoRead() || !readData && isActive()) { + // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we + // should execute read() again because no data may have been read. read(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 125de05f5b..6fb114fe0f 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -579,7 +579,12 @@ public final class NioDatagramChannel } @Override + @Deprecated protected void setReadPending(boolean readPending) { super.setReadPending(readPending); } + + void clearReadPending0() { + clearReadPending(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java index c85c962402..21b9e375c5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java @@ -169,7 +169,7 @@ class NioDatagramChannelConfig extends DefaultDatagramChannelConfig { @Override protected void autoReadCleared() { - ((NioDatagramChannel) channel).setReadPending(false); + ((NioDatagramChannel) channel).clearReadPending0(); } private Object getOption0(Object option) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index c7466b2124..eda4b6abb4 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -191,7 +191,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 48ab64503f..86253ab851 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -361,7 +361,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java index 59453fd879..556ccabe0c 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java @@ -155,7 +155,7 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan @Override protected void autoReadCleared() { if (channel instanceof OioServerSocketChannel) { - ((OioServerSocketChannel) channel).setReadPending(false); + ((OioServerSocketChannel) channel).clearReadPending0(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java index 6ef55a2275..ed6e7ca4b2 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java @@ -183,7 +183,7 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im @Override protected void autoReadCleared() { if (channel instanceof OioSocketChannel) { - ((OioSocketChannel) channel).setReadPending(false); + ((OioSocketChannel) channel).clearReadPending0(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 4dcb7ee9f5..14c07847a1 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -195,8 +195,13 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel throw new UnsupportedOperationException(); } + @Deprecated @Override protected void setReadPending(boolean readPending) { super.setReadPending(readPending); } + + final void clearReadPending0() { + super.clearReadPending(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index a96c0255b3..3b34e213ec 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -234,8 +234,13 @@ public class OioSocketChannel extends OioByteStreamChannel return false; } + @Deprecated @Override protected void setReadPending(boolean readPending) { super.setReadPending(readPending); } + + final void clearReadPending0() { + clearReadPending(); + } }