diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java index b5600d4252..40b5e86ff7 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketHalfClosedTest.java @@ -40,10 +40,82 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class SocketHalfClosedTest extends AbstractSocketTest { + @Test(timeout = 10000) + public void testHalfClosureOnlyOneEventWhenAutoRead() throws Throwable { + run(); + } + + public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable { + Channel serverChannel = null; + try { + cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true) + .option(ChannelOption.AUTO_READ, true); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) { + ((DuplexChannel) ctx).shutdownOutput(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + }); + } + }); + + final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger(); + final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger(); + + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + + @Override + public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) { + if (evt == ChannelInputShutdownEvent.INSTANCE) { + shutdownEventReceivedCounter.incrementAndGet(); + } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) { + shutdownReadCompleteEventReceivedCounter.incrementAndGet(); + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.close(); + } + }, 100, MILLISECONDS); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + } + }); + } + }); + + serverChannel = sb.bind().sync().channel(); + Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel(); + clientChannel.closeFuture().await(); + assertEquals(1, shutdownEventReceivedCounter.get()); + assertEquals(1, shutdownReadCompleteEventReceivedCounter.get()); + } finally { + if (serverChannel != null) { + serverChannel.close().sync(); + } + } + } + @Test public void testAllDataReadAfterHalfClosure() throws Throwable { run(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 8d3dc5bbcb..c1af9975cd 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -33,6 +33,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownReadComplete; +import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; @@ -241,8 +242,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann } private static boolean isAllowHalfClosure(ChannelConfig config) { - return config instanceof EpollSocketChannelConfig && - ((EpollSocketChannelConfig) config).isAllowHalfClosure(); + return config instanceof SocketChannelConfig && + ((SocketChannelConfig) config).isAllowHalfClosure(); } final void clearEpollIn() { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index f10417ebe1..698e59c7f1 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -33,6 +33,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownReadComplete; +import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; @@ -322,8 +323,8 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } private static boolean isAllowHalfClosure(ChannelConfig config) { - return config instanceof KQueueSocketChannelConfig && - ((KQueueSocketChannelConfig) config).isAllowHalfClosure(); + return config instanceof SocketChannelConfig && + ((SocketChannelConfig) config).isAllowHalfClosure(); } final void clearReadFilter() { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index ffa0d67d47..db87b100b0 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelMetadata; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelPipeline; import io.netty.channel.FileRegion; @@ -29,6 +28,7 @@ import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.internal.ChannelUtils; import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownReadComplete; +import io.netty.channel.socket.SocketChannelConfig; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -54,6 +54,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { ((AbstractNioUnsafe) unsafe()).flush0(); } }; + private boolean inputClosedSeenErrorOnRead; /** * Create a new instance @@ -84,17 +85,27 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { return METADATA; } + final boolean shouldBreakReadReady(ChannelConfig config) { + return isInputShutdown0() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config)); + } + + private static boolean isAllowHalfClosure(ChannelConfig config) { + return config instanceof SocketChannelConfig && + ((SocketChannelConfig) config).isAllowHalfClosure(); + } + protected class NioByteUnsafe extends AbstractNioUnsafe { private void closeOnRead(ChannelPipeline pipeline) { if (!isInputShutdown0()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + if (isAllowHalfClosure(config())) { shutdownInput(); pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); } else { close(voidPromise()); } } else { + inputClosedSeenErrorOnRead = true; pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); } } @@ -120,6 +131,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { @Override public final void read() { final ChannelConfig config = config(); + if (shouldBreakReadReady(config)) { + clearReadPending(); + return; + } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 477f39ac67..6eda848f82 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -27,6 +27,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.FileRegion; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.socket.ChannelInputShutdownEvent; +import io.netty.channel.socket.ChannelInputShutdownReadComplete; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -73,6 +74,7 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } else { unsafe().close(unsafe().voidPromise()); } + pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE); } }