diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java index 2ec708e4b4..319a133600 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java @@ -17,10 +17,16 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.RecvByteBufAllocator; import io.netty.util.ReferenceCountUtil; import org.junit.Test; @@ -28,174 +34,193 @@ import java.io.IOException; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SocketAutoReadTest extends AbstractSocketTest { - private static final Random random = new Random(); - static final byte[] data = new byte[1024]; - - static { - random.nextBytes(data); - } - - // See https://github.com/netty/netty/pull/2375 - @Test(timeout = 30000) - public void testAutoReadDisableOutsideChannelRead() throws Throwable { + @Test + public void testAutoReadOffDuringReadOnlyReadsOneTime() throws Throwable { run(); } - public void testAutoReadDisableOutsideChannelRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - TestHandler sh = new TestHandler() { - private boolean allBytesReceived; - @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - assertFalse(allBytesReceived); - ctx.writeAndFlush(msg); - ctx.channel().eventLoop().execute(new Runnable() { - @Override - public void run() { - ctx.channel().config().setAutoRead(false); - allBytesReceived = true; - } - }); + public void testAutoReadOffDuringReadOnlyReadsOneTime(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testAutoReadOffDuringReadOnlyReadsOneTime(true, sb, cb); + testAutoReadOffDuringReadOnlyReadsOneTime(false, sb, cb); + } + + private void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread, + ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); + AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); + sb.option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.AUTO_READ, true) + .childOption(ChannelOption.AUTO_READ, true) + // We want to ensure that we attempt multiple individual read operations per read loop so we can + // test the auto read feature being turned off when data is first read. + .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) + .childHandler(serverInitializer); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + + cb.remoteAddress(serverChannel.localAddress()) + .option(ChannelOption.AUTO_READ, true) + // We want to ensure that we attempt multiple individual read operations per read loop so we can + // test the auto read feature being turned off when data is first read. + .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) + .handler(clientInitializer); + + clientChannel = cb.connect().syncUninterruptibly().channel(); + + // 3 bytes means 3 independent reads for TestRecvByteBufAllocator + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); + serverInitializer.autoReadHandler.assertSingleRead(); + + // 3 bytes means 3 independent reads for TestRecvByteBufAllocator + serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); + clientInitializer.autoReadHandler.assertSingleRead(); + + if (readOutsideEventLoopThread) { + serverInitializer.channel.read(); } - }; - sb.childHandler(sh); + serverInitializer.autoReadHandler.assertSingleReadSecondTry(); - TestHandler ch = new TestHandler(); - cb.handler(ch); - Channel sc = sb.bind().sync().channel(); - Channel cc = cb.connect().sync().channel(); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - - cc.close().sync(); - sc.close().sync(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); - } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); + if (readOutsideEventLoopThread) { + clientChannel.read(); + } + clientInitializer.autoReadHandler.assertSingleReadSecondTry(); + } finally { + if (clientChannel != null) { + clientChannel.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } } } - // See https://github.com/netty/netty/pull/2375 - @Test(timeout = 30000) - public void testAutoReadDisableOutsideChannelReadManualRead() throws Throwable { - run(); - } + private static class AutoReadInitializer extends ChannelInitializer { + final AutoReadHandler autoReadHandler; + volatile Channel channel; - public void testAutoReadDisableOutsideChannelReadManualRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { - - ServerTestHandler sh = new ServerTestHandler(); - sb.childHandler(sh); - - TestHandler ch = new TestHandler(); - cb.handler(ch); - Channel sc = sb.bind().sync().channel(); - Channel cc = cb.connect().sync().channel(); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - cc.writeAndFlush(Unpooled.wrappedBuffer(data)).sync(); - Thread.sleep(500); - sh.await(); - cc.close().sync(); - sc.close().sync(); - - if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) { - throw sh.exception.get(); + AutoReadInitializer(boolean readInEventLoop) { + autoReadHandler = new AutoReadHandler(readInEventLoop); } - if (ch.exception.get() != null && !(ch.exception.get() instanceof IOException)) { - throw ch.exception.get(); - } - if (sh.exception.get() != null) { - throw sh.exception.get(); - } - if (ch.exception.get() != null) { - throw ch.exception.get(); + + @Override + protected void initChannel(Channel ch) throws Exception { + channel = ch; + ch.pipeline().addLast(autoReadHandler); } } - public static class ServerTestHandler extends TestHandler { - enum State { - AUTO_READ, - SCHEDULED, - BYTES_RECEIVED, - READ_SCHEDULED - } + private static final class AutoReadHandler extends ChannelInboundHandlerAdapter { + private final AtomicInteger count = new AtomicInteger(); private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch2; + private final boolean callRead; - private State state = State.AUTO_READ; - - @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - ctx.writeAndFlush(msg); - switch (state) { - case READ_SCHEDULED: - latch.countDown(); - break; - case AUTO_READ: - state = State.SCHEDULED; - ctx.channel().eventLoop().execute(new Runnable() { - @Override - public void run() { - ctx.channel().config().setAutoRead(false); - state = State.BYTES_RECEIVED; - ctx.channel().eventLoop().schedule(new Runnable() { - @Override - public void run() { - state = State.READ_SCHEDULED; - ctx.channel().read(); - } - }, 2, TimeUnit.SECONDS); - } - }); - break; - case BYTES_RECEIVED: - // Once the state is BYTES_RECEIVED we should not receive anymore data. - fail(); - break; - case SCHEDULED: - // nothing to do - break; - } - } - - public void await() throws InterruptedException { - latch.await(); - } - } - - private static class TestHandler extends ChannelInboundHandlerAdapter { - final AtomicReference exception = new AtomicReference(); - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, - Throwable cause) throws Exception { - if (exception.compareAndSet(null, cause)) { - cause.printStackTrace(); - ctx.close(); - } + AutoReadHandler(boolean callRead) { + this.callRead = callRead; + latch2 = new CountDownLatch(callRead ? 3 : 2); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ReferenceCountUtil.release(msg); + if (count.incrementAndGet() == 1) { + ctx.channel().config().setAutoRead(false); + } + if (callRead) { + // Test calling read in the EventLoop thread to ensure a read is eventually done. + ctx.read(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + latch2.countDown(); + } + + void assertSingleRead() throws InterruptedException { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertTrue(count.get() > 0); + } + + void assertSingleReadSecondTry() throws InterruptedException { + assertTrue(latch2.await(5, TimeUnit.SECONDS)); + assertEquals(callRead ? 3 : 2, count.get()); + } + } + + /** + * Designed to keep reading as long as autoread is enabled. + */ + private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator { + @Override + public Handle newHandle() { + return new Handle() { + private ChannelConfig config; + private int attemptedBytesRead; + private int lastBytesRead; + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess(), guess()); + } + + @Override + public int guess() { + return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. + } + + @Override + public void reset(ChannelConfig config) { + this.config = config; + } + + @Override + public void incMessagesRead(int numMessages) { + // No need to track the number of messages read because it is not used. + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public boolean continueReading() { + return config.isAutoRead(); + } + + @Override + public void readComplete() { + // Nothing needs to be done or adjusted after each read cycle is completed. + } + }; } } } diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketExceptionHandlingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketExceptionHandlingTest.java new file mode 100644 index 0000000000..36aad045a3 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketExceptionHandlingTest.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SocketExceptionHandlingTest extends AbstractSocketTest { + @Test + public void testReadPendingIsResetAfterEachRead() throws Throwable { + run(); + } + + public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + MyInitializer serverInitializer = new MyInitializer(); + sb.option(ChannelOption.SO_BACKLOG, 1024); + sb.childHandler(serverInitializer); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + + cb.remoteAddress(serverChannel.localAddress()) + .handler(new MyInitializer()); + clientChannel = cb.connect().syncUninterruptibly().channel(); + + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024])); + + // We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler). + assertTrue(serverInitializer.exceptionHandler.latch1.await(5, TimeUnit.SECONDS)); + + // After we get the first exception, we should get no more, this is expected to timeout. + assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() + + " exceptions when 1 was expected", + serverInitializer.exceptionHandler.latch2.await(1, TimeUnit.SECONDS)); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (clientChannel != null) { + clientChannel.close().syncUninterruptibly(); + } + } + } + + private static class MyInitializer extends ChannelInitializer { + final ExceptionHandler exceptionHandler = new ExceptionHandler(); + @Override + protected void initChannel(Channel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast(new BuggyChannelHandler()); + pipeline.addLast(exceptionHandler); + } + } + + private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + throw new NullPointerException("I am a bug!"); + } + } + + private static class ExceptionHandler extends ChannelInboundHandlerAdapter { + final AtomicLong count = new AtomicLong(); + /** + * We expect to get 1 call to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}. + */ + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (count.incrementAndGet() <= 2) { + latch1.countDown(); + } else { + latch2.countDown(); + } + // This should not throw any exception. + ctx.close(); + } + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java new file mode 100644 index 0000000000..e3f2846520 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java @@ -0,0 +1,199 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.util.ReferenceCountUtil; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SocketReadPendingTest extends AbstractSocketTest { + @Test + public void testReadPendingIsResetAfterEachRead() throws Throwable { + run(); + } + + public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + Channel clientChannel = null; + try { + ReadPendingInitializer serverInitializer = new ReadPendingInitializer(); + ReadPendingInitializer clientInitializer = new ReadPendingInitializer(); + sb.option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.AUTO_READ, true) + .childOption(ChannelOption.AUTO_READ, false) + // We intend to do 2 reads per read loop wakeup + .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) + .childHandler(serverInitializer); + + serverChannel = sb.bind().syncUninterruptibly().channel(); + + cb.remoteAddress(serverChannel.localAddress()) + .option(ChannelOption.AUTO_READ, false) + // We intend to do 2 reads per read loop wakeup + .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) + .handler(clientInitializer); + clientChannel = cb.connect().syncUninterruptibly().channel(); + + // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator + clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); + + // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator + assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS)); + serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); + + serverInitializer.channel.read(); + serverInitializer.readPendingHandler.assertAllRead(); + + clientChannel.read(); + clientInitializer.readPendingHandler.assertAllRead(); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (clientChannel != null) { + clientChannel.close().syncUninterruptibly(); + } + } + } + + private static class ReadPendingInitializer extends ChannelInitializer { + final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler(); + final CountDownLatch channelInitLatch = new CountDownLatch(1); + volatile Channel channel; + + @Override + protected void initChannel(Channel ch) throws Exception { + channel = ch; + ch.pipeline().addLast(readPendingHandler); + channelInitLatch.countDown(); + } + } + + private static final class ReadPendingReadHandler extends ChannelInboundHandlerAdapter { + private final AtomicInteger count = new AtomicInteger(); + private final CountDownLatch latch = new CountDownLatch(1); + private final CountDownLatch latch2 = new CountDownLatch(2); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ReferenceCountUtil.release(msg); + if (count.incrementAndGet() == 1) { + // Call read the first time, to ensure it is not reset the second time. + ctx.read(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + latch2.countDown(); + } + + void assertAllRead() throws InterruptedException { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + // We should only do 1 read loop, because we only called read() on the first channelRead. + assertFalse(latch2.await(1, TimeUnit.SECONDS)); + assertEquals(2, count.get()); + } + } + + /** + * Designed to keep reading as long as autoread is enabled. + */ + private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { + private final int numReads; + TestNumReadsRecvByteBufAllocator(int numReads) { + this.numReads = numReads; + } + + @Override + public Handle newHandle() { + return new Handle() { + private int attemptedBytesRead; + private int lastBytesRead; + private int numMessagesRead; + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess(), guess()); + } + + @Override + public int guess() { + return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. + } + + @Override + public void reset(ChannelConfig config) { + numMessagesRead = 0; + } + + @Override + public void incMessagesRead(int numMessages) { + numMessagesRead += numMessages; + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public boolean continueReading() { + return numMessagesRead < numReads; + } + + @Override + public void readComplete() { + // Nothing needs to be done or adjusted after each read cycle is completed. + } + }; + } + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java index c5bd0883c0..77f2c4ea5c 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketStringEchoTest.java @@ -52,7 +52,7 @@ public class SocketStringEchoTest extends AbstractSocketTest { } } - @Test + @Test(timeout = 30000) public void testStringEcho() throws Throwable { run(); } @@ -61,7 +61,7 @@ public class SocketStringEchoTest extends AbstractSocketTest { testStringEcho(sb, cb, true); } - @Test + @Test(timeout = 30000) public void testStringEchoNotAutoRead() throws Throwable { run(); } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java new file mode 100644 index 0000000000..8e35662646 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketAutoReadTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketAutoReadTest; + +import java.util.List; + +public class EpollETSocketAutoReadTest extends SocketAutoReadTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java new file mode 100644 index 0000000000..29ccc9a2fa --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketExceptionHandlingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketExceptionHandlingTest; + +import java.util.List; + +public class EpollETSocketExceptionHandlingTest extends SocketExceptionHandlingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java new file mode 100644 index 0000000000..38697ded19 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketReadPendingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketReadPendingTest; + +import java.util.List; + +public class EpollETSocketReadPendingTest extends SocketReadPendingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java new file mode 100644 index 0000000000..cd4f7c2d0b --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketAutoReadTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketAutoReadTest; + +import java.util.List; + +public class EpollLTSocketAutoReadTest extends SocketAutoReadTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java new file mode 100644 index 0000000000..d9ad6c1eec --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketExceptionHandlingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketExceptionHandlingTest; + +import java.util.List; + +public class EpollLTSocketExceptionHandlingTest extends SocketExceptionHandlingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java new file mode 100644 index 0000000000..b12516a318 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketReadPendingTest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketReadPendingTest; + +import java.util.List; + +public class EpollLTSocketReadPendingTest extends SocketReadPendingTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java index d791b90ef2..54b2ded43d 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelTest.java @@ -17,18 +17,14 @@ package io.netty.channel.epoll; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; -import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; -import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.ServerChannel; import io.netty.util.ReferenceCountUtil; import org.junit.Assert; @@ -38,10 +34,8 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -120,471 +114,4 @@ public class EpollSocketChannelTest { Assert.assertTrue(info.rcvSpace() >= 0); Assert.assertTrue(info.totalRetrans() >= 0); } - - @Test - public void testExceptionHandlingDoesNotInfiniteLoop() throws InterruptedException { - EventLoopGroup group = new EpollEventLoopGroup(); - try { - runExceptionHandleFeedbackLoop(group, EpollServerSocketChannel.class, EpollSocketChannel.class, - new InetSocketAddress(0)); - runExceptionHandleFeedbackLoop(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class, - EpollSocketTestPermutation.newSocketAddress()); - } finally { - group.shutdownGracefully(); - } - } - - @Test - public void testAutoReadOffDuringReadOnlyReadsOneTime() throws InterruptedException { - EventLoopGroup group = new EpollEventLoopGroup(); - try { - runAutoReadTest(true, group, EpollServerSocketChannel.class, - EpollSocketChannel.class, new InetSocketAddress(0)); - runAutoReadTest(true, group, EpollServerDomainSocketChannel.class, - EpollDomainSocketChannel.class, EpollSocketTestPermutation.newSocketAddress()); - runAutoReadTest(false, group, EpollServerSocketChannel.class, - EpollSocketChannel.class, new InetSocketAddress(0)); - runAutoReadTest(false, group, EpollServerDomainSocketChannel.class, - EpollDomainSocketChannel.class, EpollSocketTestPermutation.newSocketAddress()); - } finally { - group.shutdownGracefully(); - } - } - - @Test - public void testReadPendingIsResetAfterEachRead() throws InterruptedException { - EventLoopGroup group = new EpollEventLoopGroup(); - try { - runReadPendingTest(group, EpollServerSocketChannel.class, EpollSocketChannel.class, - new InetSocketAddress(0)); - runReadPendingTest(group, EpollServerDomainSocketChannel.class, EpollDomainSocketChannel.class, - EpollSocketTestPermutation.newSocketAddress()); - } finally { - group.shutdownGracefully(); - } - } - - private void runAutoReadTest(boolean readOutsideEventLoopThread, EventLoopGroup group, - Class serverChannelClass, - Class channelClass, SocketAddress bindAddr) - throws InterruptedException { - Channel serverChannel = null; - Channel clientChannel = null; - try { - AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); - AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread); - ServerBootstrap sb = new ServerBootstrap(); - sb.option(ChannelOption.SO_BACKLOG, 1024) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, true) - .group(group) - .channel(serverChannelClass) - .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .childOption(ChannelOption.AUTO_READ, true) - // We want to ensure that we attempt multiple individual read operations per read loop so we can - // test the auto read feature being turned off when data is first read. - .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) - .childHandler(serverInitializer); - - serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); - - Bootstrap b = new Bootstrap() - .group(group) - .channel(channelClass) - .remoteAddress(serverChannel.localAddress()) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, true) - // We want to ensure that we attempt multiple individual read operations per read loop so we can - // test the auto read feature being turned off when data is first read. - .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator()) - .handler(clientInitializer); - clientChannel = b.connect().syncUninterruptibly().channel(); - - // 3 bytes means 3 independent reads for TestRecvByteBufAllocator - clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); - serverInitializer.autoReadHandler.assertSingleRead(); - - // 3 bytes means 3 independent reads for TestRecvByteBufAllocator - serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[3])); - clientInitializer.autoReadHandler.assertSingleRead(); - - if (readOutsideEventLoopThread) { - serverInitializer.channel.read(); - } - serverInitializer.autoReadHandler.assertSingleReadSecondTry(); - - if (readOutsideEventLoopThread) { - clientChannel.read(); - } - clientInitializer.autoReadHandler.assertSingleReadSecondTry(); - } finally { - if (serverChannel != null) { - serverChannel.close().syncUninterruptibly(); - } - if (clientChannel != null) { - clientChannel.close().syncUninterruptibly(); - } - } - } - - private void runReadPendingTest(EventLoopGroup group, - Class serverChannelClass, - Class channelClass, SocketAddress bindAddr) - throws InterruptedException { - Channel serverChannel = null; - Channel clientChannel = null; - try { - ReadPendingInitializer serverInitializer = new ReadPendingInitializer(); - ReadPendingInitializer clientInitializer = new ReadPendingInitializer(); - ServerBootstrap sb = new ServerBootstrap(); - sb.option(ChannelOption.SO_BACKLOG, 1024) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, true) - .group(group) - .channel(serverChannelClass) - .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .childOption(ChannelOption.AUTO_READ, false) - // We intend to do 2 reads per read loop wakeup - .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) - .childHandler(serverInitializer); - - serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); - - Bootstrap b = new Bootstrap() - .group(group) - .channel(channelClass) - .remoteAddress(serverChannel.localAddress()) - .option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) - .option(ChannelOption.AUTO_READ, false) - // We intend to do 2 reads per read loop wakeup - .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(2)) - .handler(clientInitializer); - clientChannel = b.connect().syncUninterruptibly().channel(); - - // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator - clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); - - // 4 bytes means 2 read loops for TestNumReadsRecvByteBufAllocator - assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS)); - serverInitializer.channel.writeAndFlush(Unpooled.wrappedBuffer(new byte[4])); - - serverInitializer.channel.read(); - serverInitializer.readPendingHandler.assertAllRead(); - - clientChannel.read(); - clientInitializer.readPendingHandler.assertAllRead(); - } finally { - if (serverChannel != null) { - serverChannel.close().syncUninterruptibly(); - } - if (clientChannel != null) { - clientChannel.close().syncUninterruptibly(); - } - } - } - - private void runExceptionHandleFeedbackLoop(EventLoopGroup group, Class serverChannelClass, - Class channelClass, SocketAddress bindAddr) throws InterruptedException { - Channel serverChannel = null; - Channel clientChannel = null; - try { - MyInitializer serverInitializer = new MyInitializer(); - ServerBootstrap sb = new ServerBootstrap(); - sb.option(ChannelOption.SO_BACKLOG, 1024); - sb.group(group) - .channel(serverChannelClass) - .childHandler(serverInitializer); - - serverChannel = sb.bind(bindAddr).syncUninterruptibly().channel(); - - Bootstrap b = new Bootstrap(); - b.group(group); - b.channel(channelClass); - b.remoteAddress(serverChannel.localAddress()); - b.handler(new MyInitializer()); - clientChannel = b.connect().syncUninterruptibly().channel(); - - clientChannel.writeAndFlush(Unpooled.wrappedBuffer(new byte[1024])); - - // We expect to get 2 exceptions (1 from BuggyChannelHandler and 1 from ExceptionHandler). - assertTrue(serverInitializer.exceptionHandler.latch1.await(2, TimeUnit.SECONDS)); - - // After we get the first exception, we should get no more, this is expected to timeout. - assertFalse("Encountered " + serverInitializer.exceptionHandler.count.get() + - " exceptions when 1 was expected", - serverInitializer.exceptionHandler.latch2.await(2, TimeUnit.SECONDS)); - } finally { - if (serverChannel != null) { - serverChannel.close().syncUninterruptibly(); - } - if (clientChannel != null) { - clientChannel.close().syncUninterruptibly(); - } - } - } - - /** - * Designed to keep reading as long as autoread is enabled. - */ - private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { - private final int numReads; - TestNumReadsRecvByteBufAllocator(int numReads) { - this.numReads = numReads; - } - - @Override - public Handle newHandle() { - return new Handle() { - private ChannelConfig config; - private int attemptedBytesRead; - private int lastBytesRead; - private int numMessagesRead; - @Override - public ByteBuf allocate(ByteBufAllocator alloc) { - return alloc.ioBuffer(guess()); - } - - @Override - public int guess() { - return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. - } - - @Override - public void reset(ChannelConfig config) { - this.config = config; - numMessagesRead = 0; - } - - @Override - public void incMessagesRead(int numMessages) { - numMessagesRead += numMessages; - } - - @Override - public void lastBytesRead(int bytes) { - lastBytesRead = bytes; - } - - @Override - public int lastBytesRead() { - return lastBytesRead; - } - - @Override - public void attemptedBytesRead(int bytes) { - attemptedBytesRead = bytes; - } - - @Override - public int attemptedBytesRead() { - return attemptedBytesRead; - } - - @Override - public boolean continueReading() { - return numMessagesRead < numReads; - } - - @Override - public void readComplete() { - } - }; - } - } - - /** - * Designed to keep reading as long as autoread is enabled. - */ - private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator { - @Override - public Handle newHandle() { - return new Handle() { - private ChannelConfig config; - private int attemptedBytesRead; - private int lastBytesRead; - @Override - public ByteBuf allocate(ByteBufAllocator alloc) { - return alloc.ioBuffer(guess()); - } - - @Override - public int guess() { - return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. - } - - @Override - public void reset(ChannelConfig config) { - this.config = config; - } - - @Override - public void incMessagesRead(int numMessages) { - } - - @Override - public void lastBytesRead(int bytes) { - lastBytesRead = bytes; - } - - @Override - public int lastBytesRead() { - return lastBytesRead; - } - - @Override - public void attemptedBytesRead(int bytes) { - attemptedBytesRead = bytes; - } - - @Override - public int attemptedBytesRead() { - return attemptedBytesRead; - } - - @Override - public boolean continueReading() { - return config.isAutoRead(); - } - - @Override - public void readComplete() { - } - }; - } - } - - private static class AutoReadInitializer extends ChannelInitializer { - final AutoReadHandler autoReadHandler; - volatile Channel channel; - - AutoReadInitializer(boolean readInEventLoop) { - autoReadHandler = new AutoReadHandler(readInEventLoop); - } - - @Override - protected void initChannel(Channel ch) throws Exception { - channel = ch; - ch.pipeline().addLast(autoReadHandler); - } - } - - private static class ReadPendingInitializer extends ChannelInitializer { - final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler(); - final CountDownLatch channelInitLatch = new CountDownLatch(1); - volatile Channel channel; - - @Override - protected void initChannel(Channel ch) throws Exception { - channel = ch; - ch.pipeline().addLast(readPendingHandler); - channelInitLatch.countDown(); - } - } - - private static class MyInitializer extends ChannelInitializer { - final ExceptionHandler exceptionHandler = new ExceptionHandler(); - @Override - protected void initChannel(Channel ch) throws Exception { - ChannelPipeline pipeline = ch.pipeline(); - - pipeline.addLast(new BuggyChannelHandler()); - pipeline.addLast(exceptionHandler); - } - } - - private static class BuggyChannelHandler extends ChannelInboundHandlerAdapter { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ReferenceCountUtil.release(msg); - throw new NullPointerException("I am a bug!"); - } - } - - private static final class ReadPendingReadHandler extends ChannelInboundHandlerAdapter { - private final AtomicInteger count = new AtomicInteger(); - private final CountDownLatch latch = new CountDownLatch(1); - private final CountDownLatch latch2 = new CountDownLatch(2); - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ReferenceCountUtil.release(msg); - if (count.incrementAndGet() == 1) { - // Call read the first time, to ensure it is not reset the second time. - ctx.read(); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - latch.countDown(); - latch2.countDown(); - } - - void assertAllRead() throws InterruptedException { - assertTrue(latch.await(5, TimeUnit.SECONDS)); - // We should only do 1 read loop, because we only called read() on the first channelRead. - assertFalse(latch2.await(1, TimeUnit.SECONDS)); - assertEquals(2, count.get()); - } - } - - private static final class AutoReadHandler extends ChannelInboundHandlerAdapter { - private final AtomicInteger count = new AtomicInteger(); - private final CountDownLatch latch = new CountDownLatch(1); - private final CountDownLatch latch2; - private final boolean callRead; - - AutoReadHandler(boolean callRead) { - this.callRead = callRead; - latch2 = new CountDownLatch(callRead ? 3 : 2); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ReferenceCountUtil.release(msg); - if (count.incrementAndGet() == 1) { - ctx.channel().config().setAutoRead(false); - } - if (callRead) { - // Test calling read in the EventLoop thread to ensure a read is eventually done. - ctx.read(); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - latch.countDown(); - latch2.countDown(); - } - - void assertSingleRead() throws InterruptedException { - assertTrue(latch.await(5, TimeUnit.SECONDS)); - assertTrue(count.get() > 0); - } - - void assertSingleReadSecondTry() throws InterruptedException { - assertTrue(latch2.await(5, TimeUnit.SECONDS)); - assertEquals(callRead ? 3 : 2, count.get()); - } - } - - private static class ExceptionHandler extends ChannelInboundHandlerAdapter { - final AtomicLong count = new AtomicLong(); - /** - * We expect to get 1 call to {@link #exceptionCaught(ChannelHandlerContext, Throwable)}. - */ - final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (count.incrementAndGet() <= 2) { - latch1.countDown(); - } else { - latch2.countDown(); - } - // This should not throw any exception. - ctx.close(); - } - } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index f8d07a0364..7bca31b035 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -398,7 +398,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 1e420d49e9..a27a26830d 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -234,7 +234,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index 830f3aef7c..22b9235edc 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -464,7 +464,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index 37c778b68c..6a65213e4f 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -303,7 +303,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index d5550b0fc8..69f99b1575 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -76,7 +76,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { - setReadPending(false); + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); @@ -93,11 +93,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override public final void read() { final ChannelConfig config = config(); - if (!config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime - removeReadOp(); - return; - } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); @@ -118,7 +113,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } allocHandle.incMessagesRead(1); - setReadPending(false); + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); @@ -138,7 +133,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !isReadPending()) { + if (!readPending && !config.isAutoRead()) { removeReadOp(); } } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index ba68408c23..981f7c8b15 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -61,7 +61,13 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected final int readInterestOp; volatile SelectionKey selectionKey; private volatile boolean inputShutdown; - private volatile boolean readPending; + boolean readPending; + private final Runnable clearReadPendingRunnable = new Runnable() { + @Override + public void run() { + readPending = false; + } + }; /** * The future of the current connection attempt. If not null, subsequent @@ -125,12 +131,53 @@ public abstract class AbstractNioChannel extends AbstractChannel { return selectionKey; } + /** + * @deprecated No longer supported. + * No longer supported. + */ + @Deprecated protected boolean isReadPending() { return readPending; } - protected void setReadPending(boolean readPending) { - this.readPending = readPending; + /** + * @deprecated Use {@link #clearReadPending()} if appropriate instead. + * No longer supported. + */ + @Deprecated + protected void setReadPending(final boolean readPending) { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + this.readPending = readPending; + } else { + eventLoop.execute(new OneTimeTask() { + @Override + public void run() { + AbstractNioChannel.this.readPending = readPending; + } + }); + } + } else { + this.readPending = readPending; + } + } + + /** + * Set read pending to {@code false}. + */ + protected final void clearReadPending() { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + readPending = false; + } else { + eventLoop.execute(clearReadPendingRunnable); + } + } else { + // Best effort if we are not registered yet clear readPending. This happens during channel initialization. + readPending = false; + } } /** diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 7f1c804fd3..1d606e075b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -54,11 +54,6 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); - if (!config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime - removeReadOp(); - return; - } final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); @@ -85,7 +80,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { int size = readBuf.size(); for (int i = 0; i < size; i ++) { - setReadPending(false); + readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); @@ -115,7 +110,7 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 - if (!config.isAutoRead() && !isReadPending()) { + if (!readPending && !config.isAutoRead()) { removeReadOp(); } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 129a6b69a1..8c10cf2b20 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -93,7 +93,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { RecvByteBufAllocator.Handle allocHandle) { if (byteBuf != null) { if (byteBuf.isReadable()) { - setReadPending(false); + readPending = false; pipeline.fireChannelRead(byteBuf); } else { byteBuf.release(); @@ -110,12 +110,14 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { @Override protected void doRead() { final ChannelConfig config = config(); - if (isInputShutdown() || !config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime + if (isInputShutdown() || !readPending) { + // We have to check readPending here because the Runnable to read could have been scheduled and later + // during the same read loop readPending was set to false. return; } - // OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run. - setReadPending(false); + // In OIO we should set readPending to false even if the read was not successful so we can schedule + // another read on the event loop if no reads are done. + readPending = false; final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); @@ -123,19 +125,22 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { allocHandle.reset(config); ByteBuf byteBuf = null; - boolean read = false; + boolean close = false; + boolean readData = false; try { byteBuf = allocHandle.allocate(allocator); do { allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { - if (!read) { // nothing was read. release the buffer. + if (!byteBuf.isReadable()) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; + close = allocHandle.lastBytesRead() < 0; } break; + } else { + readData = true; } - read = true; final int available = available(); if (available <= 0) { @@ -148,7 +153,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { final int maxCapacity = byteBuf.maxCapacity(); if (capacity == maxCapacity) { allocHandle.incMessagesRead(1); - read = false; + readPending = false; pipeline.fireChannelRead(byteBuf); byteBuf = allocHandle.allocate(allocator); } else { @@ -162,27 +167,32 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } } while (allocHandle.continueReading()); - if (read) { - pipeline.fireChannelRead(byteBuf); + if (byteBuf != null) { + // It is possible we allocated a buffer because the previous one was not writable, but then didn't use + // it because allocHandle.continueReading() returned false. + if (byteBuf.isReadable()) { + readPending = false; + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } byteBuf = null; } - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); + if (readData) { + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } - if (allocHandle.lastBytesRead() < 0) { + if (close) { closeOnRead(pipeline); } } catch (Throwable t) { - handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() < 0, allocHandle); + handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { - if (allocHandle.lastBytesRead() == 0 && isActive()) { - // If the read amount was 0 and the channel is still active we need to trigger a new read() - // as otherwise we will never try to read again and the user will never know. - // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are - // able to process the rest of the tasks in the queue first. - // - // See https://github.com/netty/netty/issues/2404 + if (readPending || config.isAutoRead() || !readData && isActive()) { + // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we + // should execute read() again because no data may have been read. read(); } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 553eb3156a..f23f58f0c8 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -20,6 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.ThreadPerChannelEventLoop; +import io.netty.util.internal.OneTimeTask; import java.net.SocketAddress; @@ -30,14 +31,19 @@ public abstract class AbstractOioChannel extends AbstractChannel { protected static final int SO_TIMEOUT = 1000; - private volatile boolean readPending; - + boolean readPending; private final Runnable readTask = new Runnable() { @Override public void run() { doRead(); } }; + private final Runnable clearReadPendingRunnable = new Runnable() { + @Override + public void run() { + readPending = false; + } + }; /** * @see AbstractChannel#AbstractChannel(Channel) @@ -87,21 +93,62 @@ public abstract class AbstractOioChannel extends AbstractChannel { @Override protected void doBeginRead() throws Exception { - if (isReadPending()) { + if (readPending) { return; } - setReadPending(true); + readPending = true; eventLoop().execute(readTask); } protected abstract void doRead(); + /** + * @deprecated No longer supported. + * No longer supported. + */ + @Deprecated protected boolean isReadPending() { return readPending; } - protected void setReadPending(boolean readPending) { - this.readPending = readPending; + /** + * @deprecated Use {@link #clearReadPending()} if appropriate instead. + * No longer supported. + */ + @Deprecated + protected void setReadPending(final boolean readPending) { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + this.readPending = readPending; + } else { + eventLoop.execute(new OneTimeTask() { + @Override + public void run() { + AbstractOioChannel.this.readPending = readPending; + } + }); + } + } else { + this.readPending = readPending; + } + } + + /** + * Set read pending to {@code false}. + */ + protected final void clearReadPending() { + if (isRegistered()) { + EventLoop eventLoop = eventLoop(); + if (eventLoop.inEventLoop()) { + readPending = false; + } else { + eventLoop.execute(clearReadPendingRunnable); + } + } else { + // Best effort if we are not registered yet clear readPending. This happens during channel initialization. + readPending = false; + } } } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java index e9a30cd456..0543b83c13 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioMessageChannel.java @@ -37,14 +37,16 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { @Override protected void doRead() { - final ChannelConfig config = config(); - if (!config.isAutoRead() && !isReadPending()) { - // ChannelConfig.setAutoRead(false) was called in the meantime + if (!readPending) { + // We have to check readPending here because the Runnable to read could have been scheduled and later + // during the same read loop readPending was set to false. return; } - // OIO reads are scheduled as a runnable object, the read is not pending as soon as the runnable is run. - setReadPending(false); + // In OIO we should set readPending to false even if the read was not successful so we can schedule + // another read on the event loop if no reads are done. + readPending = false; + final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); @@ -69,13 +71,18 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { exception = t; } + boolean readData = false; int size = readBuf.size(); - for (int i = 0; i < size; i ++) { - pipeline.fireChannelRead(readBuf.get(i)); + if (size > 0) { + readData = true; + for (int i = 0; i < size; i++) { + readPending = false; + pipeline.fireChannelRead(readBuf.get(i)); + } + readBuf.clear(); + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); } - readBuf.clear(); - allocHandle.readComplete(); - pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException) { @@ -89,13 +96,9 @@ public abstract class AbstractOioMessageChannel extends AbstractOioChannel { if (isOpen()) { unsafe().close(unsafe().voidPromise()); } - } else if (allocHandle.lastBytesRead() == 0 && isActive()) { - // If the read amount was 0 and the channel is still active we need to trigger a new read() - // as otherwise we will never try to read again and the user will never know. - // Just call read() is ok here as it will be submitted to the EventLoop as a task and so we are - // able to process the rest of the tasks in the queue first. - // - // See https://github.com/netty/netty/issues/2404 + } else if (readPending || config.isAutoRead() || !readData && isActive()) { + // Reading 0 bytes could mean there is a SocketTimeout and no data was actually read, so we + // should execute read() again because no data may have been read. read(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 125de05f5b..6fb114fe0f 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -579,7 +579,12 @@ public final class NioDatagramChannel } @Override + @Deprecated protected void setReadPending(boolean readPending) { super.setReadPending(readPending); } + + void clearReadPending0() { + clearReadPending(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java index c85c962402..21b9e375c5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannelConfig.java @@ -169,7 +169,7 @@ class NioDatagramChannelConfig extends DefaultDatagramChannelConfig { @Override protected void autoReadCleared() { - ((NioDatagramChannel) channel).setReadPending(false); + ((NioDatagramChannel) channel).clearReadPending0(); } private Object getOption0(Object option) { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index c7466b2124..eda4b6abb4 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -191,7 +191,7 @@ public class NioServerSocketChannel extends AbstractNioMessageChannel @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 48ab64503f..86253ab851 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -361,7 +361,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override protected void autoReadCleared() { - setReadPending(false); + clearReadPending(); } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java index 59453fd879..556ccabe0c 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioServerSocketChannelConfig.java @@ -155,7 +155,7 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan @Override protected void autoReadCleared() { if (channel instanceof OioServerSocketChannel) { - ((OioServerSocketChannel) channel).setReadPending(false); + ((OioServerSocketChannel) channel).clearReadPending0(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java index 6ef55a2275..ed6e7ca4b2 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/DefaultOioSocketChannelConfig.java @@ -183,7 +183,7 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im @Override protected void autoReadCleared() { if (channel instanceof OioSocketChannel) { - ((OioSocketChannel) channel).setReadPending(false); + ((OioSocketChannel) channel).clearReadPending0(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 4dcb7ee9f5..14c07847a1 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -195,8 +195,13 @@ public class OioServerSocketChannel extends AbstractOioMessageChannel throw new UnsupportedOperationException(); } + @Deprecated @Override protected void setReadPending(boolean readPending) { super.setReadPending(readPending); } + + final void clearReadPending0() { + super.clearReadPending(); + } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index a96c0255b3..3b34e213ec 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -234,8 +234,13 @@ public class OioSocketChannel extends OioByteStreamChannel return false; } + @Deprecated @Override protected void setReadPending(boolean readPending) { super.setReadPending(readPending); } + + final void clearReadPending0() { + clearReadPending(); + } }