From a1b5b5dcca8542e51b69bcd4d7ccbc51fe7ce7f4 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Thu, 2 Feb 2017 10:15:10 -0800 Subject: [PATCH] EpollRecvByteAllocatorHandle doesn't inform delegate of more data Motivation: EpollRecvByteAllocatorHandle intends to override the meaning of "maybe more data to read" which is a concept also used in all existing implementations of RecvByteBufAllocator$Handle but the interface doesn't support overriding. Because the interfaces lack the ability to propagate this computation EpollRecvByteAllocatorHandle attempts to implement a heuristic on top of the delegate which may lead to reading when we shouldn't or not reading data. Modifications: - Create a new interface ExtendedRecvByteBufAllocator and ExtendedHandle which allows the "maybe more data to read" between interfaces - Deprecate RecvByteBufAllocator and change all existing implementations to extend ExtendedRecvByteBufAllocator - transport-native-epoll should require ExtendedRecvByteBufAllocator so the "maybe more data to read" can be propagated to the ExtendedHandle Result: Fixes https://github.com/netty/netty/issues/6303. --- .../netty/util/UncheckedBooleanSupplier.java | 48 ++++ .../transport/socket/SocketAutoReadTest.java | 10 +- .../socket/SocketHalfClosedTest.java | 225 ++++++++++++++++++ .../socket/SocketReadPendingTest.java | 14 +- .../channel/epoll/AbstractEpollChannel.java | 68 +++--- .../epoll/AbstractEpollServerChannel.java | 4 +- .../epoll/AbstractEpollStreamChannel.java | 15 +- .../channel/epoll/EpollChannelConfig.java | 4 + .../channel/epoll/EpollDatagramChannel.java | 4 +- .../epoll/EpollRecvByteAllocatorHandle.java | 92 +++++-- ...EpollRecvByteAllocatorStreamingHandle.java | 7 +- .../epoll/EpollETSocketHalfClosed.java | 39 +++ .../epoll/EpollLTSocketHalfClosed.java | 39 +++ .../epoll/EpollSocketChannelConfigTest.java | 2 +- .../channel/AdaptiveRecvByteBufAllocator.java | 1 + .../netty/channel/DefaultChannelConfig.java | 4 +- .../DefaultMaxBytesRecvByteBufAllocator.java | 23 +- ...efaultMaxMessagesRecvByteBufAllocator.java | 27 ++- .../channel/FixedRecvByteBufAllocator.java | 1 + .../netty/channel/RecvByteBufAllocator.java | 18 +- .../channel/nio/AbstractNioByteChannel.java | 11 +- .../ChannelInputShutdownReadComplete.java | 27 +++ .../channel/socket/nio/NioSocketChannel.java | 5 + 23 files changed, 600 insertions(+), 88 deletions(-) create mode 100644 common/src/main/java/io/netty/util/UncheckedBooleanSupplier.java create mode 100644 testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java create mode 100644 transport/src/main/java/io/netty/channel/socket/ChannelInputShutdownReadComplete.java diff --git a/common/src/main/java/io/netty/util/UncheckedBooleanSupplier.java b/common/src/main/java/io/netty/util/UncheckedBooleanSupplier.java new file mode 100644 index 0000000000..57be6de747 --- /dev/null +++ b/common/src/main/java/io/netty/util/UncheckedBooleanSupplier.java @@ -0,0 +1,48 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util; + +/** + * Represents a supplier of {@code boolean}-valued results which doesn't throw any checked exceptions. + */ +public interface UncheckedBooleanSupplier extends BooleanSupplier { + /** + * Gets a boolean value. + * @return a boolean value. + */ + @Override + boolean get(); + + /** + * A supplier which always returns {@code false} and never throws. + */ + UncheckedBooleanSupplier FALSE_SUPPLIER = new UncheckedBooleanSupplier() { + @Override + public boolean get() { + return false; + } + }; + + /** + * A supplier which always returns {@code true} and never throws. + */ + UncheckedBooleanSupplier TRUE_SUPPLIER = new UncheckedBooleanSupplier() { + @Override + public boolean get() { + return true; + } + }; +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java index 09c51c0d2c..cc7bb90eec 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketAutoReadTest.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.RecvByteBufAllocator; import io.netty.util.ReferenceCountUtil; +import io.netty.util.UncheckedBooleanSupplier; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -161,8 +162,8 @@ public class SocketAutoReadTest extends AbstractSocketTest { */ private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator { @Override - public Handle newHandle() { - return new Handle() { + public ExtendedHandle newHandle() { + return new ExtendedHandle() { private ChannelConfig config; private int attemptedBytesRead; private int lastBytesRead; @@ -211,6 +212,11 @@ public class SocketAutoReadTest extends AbstractSocketTest { return config.isAutoRead(); } + @Override + public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { + return config.isAutoRead(); + } + @Override public void readComplete() { // Nothing needs to be done or adjusted after each read cycle is completed. diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java new file mode 100644 index 0000000000..006402d437 --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java @@ -0,0 +1,225 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.channel.socket.ChannelInputShutdownReadComplete; +import io.netty.channel.socket.DuplexChannel; +import io.netty.util.UncheckedBooleanSupplier; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertTrue; + +public class SocketHalfClosedTest extends AbstractSocketTest { + @Test + public void testAllDataReadAfterHalfClosure() throws Throwable { + run(); + } + + public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable { + testAllDataReadAfterHalfClosure(true, sb, cb); + testAllDataReadAfterHalfClosure(false, sb, cb); + } + + public void testAllDataReadAfterHalfClosure(final boolean autoRead, + ServerBootstrap sb, Bootstrap cb) throws Throwable { + final int totalServerBytesWritten = 1024 * 16; + final int numReadsPerReadLoop = 2; + final CountDownLatch serverInitializedLatch = new CountDownLatch(1); + final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1); + final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1); + final AtomicInteger clientReadCompletes = new AtomicInteger(); + Channel serverChannel = null; + Channel clientChannel = null; + try { + cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true) + .option(ChannelOption.AUTO_READ, autoRead) + .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop)); + + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten); + buf.writerIndex(buf.capacity()); + ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + ((DuplexChannel) future.channel()).shutdownOutput(); + } + }); + serverInitializedLatch.countDown(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + }); + } + }); + + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + private int bytesRead; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf buf = (ByteBuf) msg; + bytesRead += buf.readableBytes(); + buf.release(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt == ChannelInputShutdownEvent.INSTANCE) { + clientHalfClosedLatch.countDown(); + } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) { + ctx.close(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + clientReadCompletes.incrementAndGet(); + if (bytesRead == totalServerBytesWritten) { + clientReadAllDataLatch.countDown(); + } + if (!autoRead) { + ctx.read(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + }); + } + }); + + serverChannel = sb.bind().sync().channel(); + clientChannel = cb.connect(serverChannel.localAddress()).sync().channel(); + clientChannel.read(); + + serverInitializedLatch.await(); + clientReadAllDataLatch.await(); + clientHalfClosedLatch.await(); + assertTrue("too many read complete events: " + clientReadCompletes.get(), + totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get()); + } finally { + if (clientChannel != null) { + clientChannel.close().sync(); + } + if (serverChannel != null) { + serverChannel.close().sync(); + } + } + } + + /** + * Designed to read a single byte at a time to control the number of reads done at a fine granularity. + */ + private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { + private final int numReads; + TestNumReadsRecvByteBufAllocator(int numReads) { + this.numReads = numReads; + } + + @Override + public ExtendedHandle newHandle() { + return new ExtendedHandle() { + private int attemptedBytesRead; + private int lastBytesRead; + private int numMessagesRead; + @Override + public ByteBuf allocate(ByteBufAllocator alloc) { + return alloc.ioBuffer(guess(), guess()); + } + + @Override + public int guess() { + return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled. + } + + @Override + public void reset(ChannelConfig config) { + numMessagesRead = 0; + } + + @Override + public void incMessagesRead(int numMessages) { + numMessagesRead += numMessages; + } + + @Override + public void lastBytesRead(int bytes) { + lastBytesRead = bytes; + } + + @Override + public int lastBytesRead() { + return lastBytesRead; + } + + @Override + public void attemptedBytesRead(int bytes) { + attemptedBytesRead = bytes; + } + + @Override + public int attemptedBytesRead() { + return attemptedBytesRead; + } + + @Override + public boolean continueReading() { + return numMessagesRead < numReads; + } + + @Override + public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { + return continueReading() && maybeMoreDataSupplier.get(); + } + + @Override + public void readComplete() { + // Nothing needs to be done or adjusted after each read cycle is completed. + } + }; + } + } +} diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java index e3f2846520..84d0e85949 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketReadPendingTest.java @@ -28,6 +28,7 @@ import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.RecvByteBufAllocator; import io.netty.util.ReferenceCountUtil; +import io.netty.util.UncheckedBooleanSupplier; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -39,7 +40,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class SocketReadPendingTest extends AbstractSocketTest { - @Test + @Test(timeout = 30000) public void testReadPendingIsResetAfterEachRead() throws Throwable { run(); } @@ -130,7 +131,7 @@ public class SocketReadPendingTest extends AbstractSocketTest { } /** - * Designed to keep reading as long as autoread is enabled. + * Designed to read a single byte at a time to control the number of reads done at a fine granularity. */ private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator { private final int numReads; @@ -139,8 +140,8 @@ public class SocketReadPendingTest extends AbstractSocketTest { } @Override - public Handle newHandle() { - return new Handle() { + public ExtendedHandle newHandle() { + return new ExtendedHandle() { private int attemptedBytesRead; private int lastBytesRead; private int numMessagesRead; @@ -189,6 +190,11 @@ public class SocketReadPendingTest extends AbstractSocketTest { return numMessagesRead < numReads; } + @Override + public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { + return continueReading(); + } + @Override public void readComplete() { // Nothing needs to be done or adjusted after each read cycle is completed. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 79b8c052ea..f62cfda049 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -24,10 +24,10 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; -import io.netty.channel.ChannelOption; import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -45,6 +45,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann private final int readFlag; private final Socket fileDescriptor; protected int flags = Native.EPOLLET; + boolean inputClosedSeenErrorOnRead; + boolean epollInReadyRunnablePending; protected volatile boolean active; @@ -107,6 +109,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann @Override protected void doClose() throws Exception { active = false; + // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a + // socket which has not even been connected yet. This has been observed to block during unit tests. + inputClosedSeenErrorOnRead = true; try { doDeregister(); } finally { @@ -148,10 +153,19 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified // again if we didn't consume all the data. So we force a read operation here if there maybe more data. if (unsafe.maybeMoreDataToRead) { - unsafe.executeEpollInReadyRunnable(); + unsafe.executeEpollInReadyRunnable(config()); } } + final boolean shouldBreakEpollInReady(ChannelConfig config) { + return fileDescriptor.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config)); + } + + final boolean isAllowHalfClosure(ChannelConfig config) { + return config instanceof EpollSocketChannelConfig && + ((EpollSocketChannelConfig) config).isAllowHalfClosure(); + } + final void clearEpollIn() { // Only clear if registered with an EventLoop as otherwise if (isRegistered()) { @@ -186,12 +200,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann @Override protected void doRegister() throws Exception { - EpollEventLoop loop = (EpollEventLoop) eventLoop(); // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the // new EventLoop. - ((AbstractEpollUnsafe) unsafe()).epollInReadyRunnablePending = false; - loop.add(this); + epollInReadyRunnablePending = false; + ((EpollEventLoop) eventLoop()).add(this); } @Override @@ -314,9 +327,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { boolean readPending; boolean maybeMoreDataToRead; - boolean epollInReadyRunnablePending; private EpollRecvByteAllocatorHandle allocHandle; - private Runnable epollInReadyRunnable; + private final Runnable epollInReadyRunnable = new Runnable() { + @Override + public void run() { + epollInReadyRunnablePending = false; + epollInReady(); + } + }; /** * Called once EPOLLIN event is ready to be processed @@ -326,7 +344,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann final void epollInBefore() { maybeMoreDataToRead = false; } final void epollInFinally(ChannelConfig config) { - maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); + maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead(); // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method @@ -335,7 +353,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { clearEpollIn(); - } else if (readPending && maybeMoreDataToRead && !fd().isInputShutdown()) { + } else if (readPending && maybeMoreDataToRead) { // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket // @@ -343,24 +361,15 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set // to false before every read operation to prevent re-entry into epollInReady() we will not read from // the underlying OS again unless the user happens to call read again. - executeEpollInReadyRunnable(); + executeEpollInReadyRunnable(config); } } - final void executeEpollInReadyRunnable() { - if (epollInReadyRunnablePending) { + final void executeEpollInReadyRunnable(ChannelConfig config) { + if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) { return; } epollInReadyRunnablePending = true; - if (epollInReadyRunnable == null) { - epollInReadyRunnable = new Runnable() { - @Override - public void run() { - epollInReadyRunnablePending = false; - epollInReady(); - } - }; - } eventLoop().execute(epollInReadyRunnable); } @@ -380,9 +389,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event. clearEpollRdHup(); } - // epollInReady may call this, but we should ensure that it gets called. - shutdownInput(); + shutdownInput(true); } /** @@ -400,12 +408,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann /** * Shutdown the input side of the channel. */ - void shutdownInput() { + void shutdownInput(boolean rdHup) { if (!fd().isInputShutdown()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + if (isAllowHalfClosure(config())) { try { fd().shutdown(true, false); - clearEpollIn0(); } catch (IOException ignored) { // We attempted to shutdown and failed, which means the input has already effectively been // shutdown. @@ -421,6 +428,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } else { close(voidPromise()); } + } else if (!rdHup) { + inputClosedSeenErrorOnRead = true; + pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); } } @@ -432,7 +442,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann @Override public EpollRecvByteAllocatorHandle recvBufAllocHandle() { if (allocHandle == null) { - allocHandle = newEpollHandle(super.recvBufAllocHandle()); + allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle()); } return allocHandle; } @@ -441,8 +451,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann * Create a new {@link EpollRecvByteAllocatorHandle} instance. * @param handle The handle to wrap with EPOLL specific logic. */ - EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { - return new EpollRecvByteAllocatorHandle(handle, config()); + EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) { + return new EpollRecvByteAllocatorHandle(handle); } @Override diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index 888375b472..02401a58e5 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -107,11 +107,11 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im @Override void epollInReady() { assert eventLoop().inEventLoop(); - if (fd().isInputShutdown()) { + final ChannelConfig config = config(); + if (shouldBreakEpollInReady(config)) { clearEpollIn0(); return; } - final ChannelConfig config = config(); final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index f34d23eeb9..52dbd152f3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -795,7 +795,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } class EpollStreamUnsafe extends AbstractEpollUnsafe { - // Overridden here just to be able to access this method from AbstractEpollStreamChannel @Override protected Executor prepareToClose() { @@ -816,7 +815,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { - shutdownInput(); + shutdownInput(false); } } @@ -963,17 +962,17 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im } @Override - EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.Handle handle) { - return new EpollRecvByteAllocatorStreamingHandle(handle, config()); + EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) { + return new EpollRecvByteAllocatorStreamingHandle(handle); } @Override void epollInReady() { - if (fd().isInputShutdown()) { + final ChannelConfig config = config(); + if (shouldBreakEpollInReady(config)) { clearEpollIn0(); return; } - final ChannelConfig config = config(); final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); @@ -1018,7 +1017,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im pipeline.fireChannelRead(byteBuf); byteBuf = null; - if (fd().isInputShutdown()) { + if (shouldBreakEpollInReady(config)) { // We need to do this for two reasons: // // - If the input was shutdown in between (which may be the case when the user did it in the @@ -1038,7 +1037,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im pipeline.fireChannelReadComplete(); if (close) { - shutdownInput(); + shutdownInput(false); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java index 215af453a6..e18e8c19c1 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollChannelConfig.java @@ -86,6 +86,10 @@ public class EpollChannelConfig extends DefaultChannelConfig { @Override public EpollChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + if (!(allocator.newHandle() instanceof RecvByteBufAllocator.ExtendedHandle)) { + throw new IllegalArgumentException("allocator.newHandle() must return an object of type: " + + RecvByteBufAllocator.ExtendedHandle.class); + } super.setRecvByteBufAllocator(allocator); return this; } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 7ff4a7a442..874437ee38 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -523,11 +523,11 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements @Override void epollInReady() { assert eventLoop().inEventLoop(); - if (fd().isInputShutdown()) { + DatagramChannelConfig config = config(); + if (shouldBreakEpollInReady(config)) { clearEpollIn0(); return; } - DatagramChannelConfig config = config(); final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET)); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java index e33b42ce99..658cd3c6fb 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorHandle.java @@ -15,17 +15,26 @@ */ package io.netty.channel.epoll; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelConfig; import io.netty.channel.RecvByteBufAllocator; +import io.netty.util.UncheckedBooleanSupplier; +import io.netty.util.internal.ObjectUtil; -class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle { +class EpollRecvByteAllocatorHandle implements RecvByteBufAllocator.ExtendedHandle { + private final RecvByteBufAllocator.ExtendedHandle delegate; private boolean isEdgeTriggered; - private final ChannelConfig config; private boolean receivedRdHup; + private final UncheckedBooleanSupplier defaultMaybeMoreDataSupplier = new UncheckedBooleanSupplier() { + @Override + public boolean get() { + return maybeMoreDataToRead(); + } + }; - EpollRecvByteAllocatorHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) { - super(handle); - this.config = config; + EpollRecvByteAllocatorHandle(RecvByteBufAllocator.ExtendedHandle handle) { + this.delegate = ObjectUtil.checkNotNull(handle, "handle"); } final void receivedRdHup() { @@ -37,8 +46,16 @@ class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle } boolean maybeMoreDataToRead() { - // If EPOLLRDHUP has been received we must read until we get a read error. - return isEdgeTriggered && (lastBytesRead() > 0 || receivedRdHup); + /** + * EPOLL ET requires that we read until we get an EAGAIN + * (see Q9 in epoll man). However in order to + * respect auto read we supporting reading to stop if auto read is off. It is expected that the + * {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not + * read, and will force a EPOLLIN ready event. + */ + return (isEdgeTriggered && lastBytesRead() > 0) || + (!isEdgeTriggered && lastBytesRead() == attemptedBytesRead()) || + receivedRdHup; } final void edgeTriggered(boolean edgeTriggered) { @@ -49,16 +66,59 @@ class EpollRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle return isEdgeTriggered; } + @Override + public final ByteBuf allocate(ByteBufAllocator alloc) { + return delegate.allocate(alloc); + } + + @Override + public final int guess() { + return delegate.guess(); + } + + @Override + public final void reset(ChannelConfig config) { + delegate.reset(config); + } + + @Override + public final void incMessagesRead(int numMessages) { + delegate.incMessagesRead(numMessages); + } + + @Override + public final void lastBytesRead(int bytes) { + delegate.lastBytesRead(bytes); + } + + @Override + public final int lastBytesRead() { + return delegate.lastBytesRead(); + } + + @Override + public final int attemptedBytesRead() { + return delegate.attemptedBytesRead(); + } + + @Override + public final void attemptedBytesRead(int bytes) { + delegate.attemptedBytesRead(bytes); + } + + @Override + public final void readComplete() { + delegate.readComplete(); + } + + @Override + public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { + return delegate.continueReading(maybeMoreDataSupplier); + } + @Override public final boolean continueReading() { - /** - * EPOLL ET requires that we read until we get an EAGAIN - * (see Q9 in epoll man). However in order to - * respect auto read we supporting reading to stop if auto read is off. If auto read is on we force reading to - * continue to avoid a {@link StackOverflowError} between channelReadComplete and reading from the - * channel. It is expected that the {@link #EpollSocketChannel} implementations will track if we are in - * edgeTriggered mode and all data was not read, and will force a EPOLLIN ready event. - */ - return maybeMoreDataToRead() && config.isAutoRead() || super.continueReading(); + // We must override the supplier which determines if there maybe more data to read. + return delegate.continueReading(defaultMaybeMoreDataSupplier); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java index 393ab41e6f..f6ba5f58b5 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollRecvByteAllocatorStreamingHandle.java @@ -15,12 +15,11 @@ */ package io.netty.channel.epoll; -import io.netty.channel.ChannelConfig; import io.netty.channel.RecvByteBufAllocator; final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocatorHandle { - public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.Handle handle, ChannelConfig config) { - super(handle, config); + public EpollRecvByteAllocatorStreamingHandle(RecvByteBufAllocator.ExtendedHandle handle) { + super(handle); } @Override @@ -31,6 +30,6 @@ final class EpollRecvByteAllocatorStreamingHandle extends EpollRecvByteAllocator * * If EPOLLRDHUP has been received we must read until we get a read error. */ - return isEdgeTriggered() && (lastBytesRead() == attemptedBytesRead() || isReceivedRdHup()); + return lastBytesRead() == attemptedBytesRead() || isReceivedRdHup(); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java new file mode 100644 index 0000000000..4a5356043f --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollETSocketHalfClosed.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketHalfClosedTest; + +import java.util.List; + +public class EpollETSocketHalfClosed extends SocketHalfClosedTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java new file mode 100644 index 0000000000..d544651b2f --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollLTSocketHalfClosed.java @@ -0,0 +1,39 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketHalfClosedTest; + +import java.util.List; + +public class EpollLTSocketHalfClosed extends SocketHalfClosedTest { + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.socket(); + } + + @Override + protected void configure(ServerBootstrap bootstrap, Bootstrap bootstrap2, ByteBufAllocator allocator) { + super.configure(bootstrap, bootstrap2, allocator); + bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED) + .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + bootstrap2.option(EpollChannelOption.EPOLL_MODE, EpollMode.LEVEL_TRIGGERED); + } +} diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java index 5e8f69e4cb..19ba3f84b8 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java @@ -140,7 +140,7 @@ public class EpollSocketChannelConfigTest { public void testGetOptionWhenClosed() { ch.close().syncUninterruptibly(); try { - ch.config().getSoLinger(); + ch.config().getSoLinger(); fail(); } catch (ChannelException e) { assertTrue(e.getCause() instanceof ClosedChannelException); diff --git a/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java index 11750108eb..b45e6e960d 100644 --- a/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/AdaptiveRecvByteBufAllocator.java @@ -175,6 +175,7 @@ public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufA this.initial = initial; } + @SuppressWarnings("deprecation") @Override public Handle newHandle() { return new HandleImpl(minIndex, maxIndex, initial); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java index 9311b84c0a..aa05b84fb1 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelConfig.java @@ -29,8 +29,8 @@ import static io.netty.channel.ChannelOption.AUTO_READ; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; import static io.netty.channel.ChannelOption.MAX_MESSAGES_PER_READ; import static io.netty.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR; -import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; import static io.netty.channel.ChannelOption.RCVBUF_ALLOCATOR; +import static io.netty.channel.ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP; import static io.netty.channel.ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK; import static io.netty.channel.ChannelOption.WRITE_BUFFER_LOW_WATER_MARK; import static io.netty.channel.ChannelOption.WRITE_BUFFER_WATER_MARK; @@ -307,7 +307,7 @@ public class DefaultChannelConfig implements ChannelConfig { } else if (allocator == null) { throw new NullPointerException("allocator"); } - rcvBufAllocator = allocator; + setRecvByteBufAllocator(allocator); } @Override diff --git a/transport/src/main/java/io/netty/channel/DefaultMaxBytesRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/DefaultMaxBytesRecvByteBufAllocator.java index 430b5f14b0..65f7b068dc 100644 --- a/transport/src/main/java/io/netty/channel/DefaultMaxBytesRecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/DefaultMaxBytesRecvByteBufAllocator.java @@ -15,11 +15,12 @@ */ package io.netty.channel; -import java.util.AbstractMap; -import java.util.Map.Entry; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.UncheckedBooleanSupplier; + +import java.util.AbstractMap; +import java.util.Map.Entry; /** * The {@link RecvByteBufAllocator} that yields a buffer size prediction based upon decrementing the value from @@ -29,11 +30,17 @@ public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufA private volatile int maxBytesPerRead; private volatile int maxBytesPerIndividualRead; - private final class HandleImpl implements Handle { + private final class HandleImpl implements ExtendedHandle { private int individualReadMax; private int bytesToRead; private int lastBytesRead; private int attemptBytesRead; + private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { + @Override + public boolean get() { + return attemptBytesRead == lastBytesRead; + } + }; @Override public ByteBuf allocate(ByteBufAllocator alloc) { @@ -70,8 +77,13 @@ public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufA @Override public boolean continueReading() { + return continueReading(defaultMaybeMoreSupplier); + } + + @Override + public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { // Keep reading if we are allowed to read more bytes, and our last read filled up the buffer we provided. - return bytesToRead > 0 && attemptBytesRead == lastBytesRead; + return bytesToRead > 0 && maybeMoreDataSupplier.get(); } @Override @@ -99,6 +111,7 @@ public class DefaultMaxBytesRecvByteBufAllocator implements MaxBytesRecvByteBufA this.maxBytesPerIndividualRead = maxBytesPerIndividualRead; } + @SuppressWarnings("deprecation") @Override public Handle newHandle() { return new HandleImpl(); diff --git a/transport/src/main/java/io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator.java index 28d89e2909..ead81b09cf 100644 --- a/transport/src/main/java/io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator.java @@ -17,6 +17,7 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.UncheckedBooleanSupplier; /** * Default implementation of {@link MaxMessagesRecvByteBufAllocator} which respects {@link ChannelConfig#isAutoRead()} @@ -50,13 +51,19 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa /** * Focuses on enforcing the maximum messages per read condition for {@link #continueReading()}. */ - public abstract class MaxMessageHandle implements Handle { + public abstract class MaxMessageHandle implements ExtendedHandle { private ChannelConfig config; private int maxMessagePerRead; private int totalMessages; private int totalBytesRead; private int attemptedBytesRead; private int lastBytesRead; + private final UncheckedBooleanSupplier defaultMaybeMoreSupplier = new UncheckedBooleanSupplier() { + @Override + public boolean get() { + return attemptedBytesRead == lastBytesRead; + } + }; /** * Only {@link ChannelConfig#getMaxMessagesPerRead()} is used. @@ -81,11 +88,8 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa @Override public final void lastBytesRead(int bytes) { lastBytesRead = bytes; - // Ignore if bytes is negative, the interface contract states it will be detected externally after call. - // The value may be "invalid" after this point, but it doesn't matter because reading will be stopped. - totalBytesRead += bytes; - if (totalBytesRead < 0) { - totalBytesRead = Integer.MAX_VALUE; + if (bytes > 0) { + totalBytesRead += bytes; } } @@ -96,10 +100,15 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa @Override public boolean continueReading() { + return continueReading(defaultMaybeMoreSupplier); + } + + @Override + public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return config.isAutoRead() && - attemptedBytesRead == lastBytesRead && + maybeMoreDataSupplier.get() && totalMessages < maxMessagePerRead && - totalBytesRead < Integer.MAX_VALUE; + totalBytesRead > 0; } @Override @@ -117,7 +126,7 @@ public abstract class DefaultMaxMessagesRecvByteBufAllocator implements MaxMessa } protected final int totalBytesRead() { - return totalBytesRead; + return totalBytesRead < 0 ? Integer.MAX_VALUE : totalBytesRead; } } } diff --git a/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java index 09d891bfe5..86f0bbfd23 100644 --- a/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/FixedRecvByteBufAllocator.java @@ -48,6 +48,7 @@ public class FixedRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllo this.bufferSize = bufferSize; } + @SuppressWarnings("deprecation") @Override public Handle newHandle() { return new HandleImpl(bufferSize); diff --git a/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java b/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java index ff65718dfe..4b537ada89 100644 --- a/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java +++ b/transport/src/main/java/io/netty/channel/RecvByteBufAllocator.java @@ -17,6 +17,9 @@ package io.netty.channel; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.UncheckedBooleanSupplier; +import io.netty.util.internal.UnstableApi; + import static io.netty.util.internal.ObjectUtil.checkNotNull; /** @@ -24,13 +27,16 @@ import static io.netty.util.internal.ObjectUtil.checkNotNull; * not to waste its space. */ public interface RecvByteBufAllocator { - /** * Creates a new handle. The handle provides the actual operations and keeps the internal information which is * required for predicting an optimal buffer capacity. */ Handle newHandle(); + /** + * @Deprecated Use {@link ExtendedHandle}. + */ + @Deprecated interface Handle { /** * Creates a new receive buffer whose capacity is probably large enough to read all inbound data and small @@ -101,6 +107,16 @@ public interface RecvByteBufAllocator { void readComplete(); } + @SuppressWarnings("deprecation") + @UnstableApi + interface ExtendedHandle extends Handle { + /** + * Same as {@link Handle#continueReading()} except "more data" is determined by the supplier parameter. + * @param maybeMoreDataSupplier A supplier that determines if there maybe more data to read. + */ + boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier); + } + /** * A {@link Handle} which delegates all call to some other {@link Handle}. */ diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 7d16ed4ae6..cb7b6ba0fe 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.FileRegion; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -59,6 +60,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { */ protected abstract ChannelFuture shutdownInput(); + protected boolean isInputShutdown0() { + return false; + } + @Override protected AbstractNioUnsafe newUnsafe() { return new NioByteUnsafe(); @@ -72,15 +77,15 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected class NioByteUnsafe extends AbstractNioUnsafe { private void closeOnRead(ChannelPipeline pipeline) { - if (isOpen()) { + if (!isInputShutdown0()) { if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { shutdownInput(); - SelectionKey key = selectionKey(); - key.interestOps(key.interestOps() & ~readInterestOp); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); } + } else { + pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); } } diff --git a/transport/src/main/java/io/netty/channel/socket/ChannelInputShutdownReadComplete.java b/transport/src/main/java/io/netty/channel/socket/ChannelInputShutdownReadComplete.java new file mode 100644 index 0000000000..a91da1a980 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/ChannelInputShutdownReadComplete.java @@ -0,0 +1,27 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket; + +/** + * User event that signifies the channel's input side is shutdown, and we tried to shut it down again. This typically + * indicates that there is no more data to read. + */ +public final class ChannelInputShutdownReadComplete { + public static final ChannelInputShutdownReadComplete INSTANCE = new ChannelInputShutdownReadComplete(); + + private ChannelInputShutdownReadComplete() { + } +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 43f8a06716..e29f4e34ab 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -182,6 +182,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty return shutdownInput(newPromise()); } + @Override + protected boolean isInputShutdown0() { + return isInputShutdown(); + } + @Override public ChannelFuture shutdownInput(final ChannelPromise promise) { Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose();