diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index c7f1a7c536..1038f0a65c 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import java.nio.channels.NotYetConnectedException; import java.util.Random; import java.util.concurrent.ConcurrentMap; @@ -671,7 +672,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } inFlushNow = true; + final ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer; + + // Mark all pending write requests as failure if the channel is inactive. + if (!isActive()) { + if (isOpen()) { + outboundBuffer.fail(new NotYetConnectedException()); + } else { + outboundBuffer.fail(new ClosedChannelException()); + } + inFlushNow = false; + return; + } + try { for (;;) { ChannelPromise promise = outboundBuffer.currentPromise; diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java similarity index 61% rename from transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java rename to transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index 13f43c7089..b501ebab59 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelRegistryTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -17,6 +17,7 @@ package io.netty.channel.local; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -27,20 +28,20 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.Test; +import java.nio.channels.ClosedChannelException; import java.util.concurrent.CountDownLatch; +import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; -public class LocalChannelRegistryTest { +public class LocalChannelTest { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(LocalChannelRegistryTest.class); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(LocalChannelTest.class); private static final String LOCAL_ADDR_ID = "test.id"; @Test public void testLocalAddressReuse() throws Exception { - for (int i = 0; i < 2; i ++) { EventLoopGroup clientGroup = new LocalEventLoopGroup(); EventLoopGroup serverGroup = new LocalEventLoopGroup(); @@ -94,10 +95,57 @@ public class LocalChannelRegistryTest { } } + @Test + public void testWriteFailsFastOnClosedChannel() throws Exception { + EventLoopGroup clientGroup = new LocalEventLoopGroup(); + EventLoopGroup serverGroup = new LocalEventLoopGroup(); + LocalAddress addr = new LocalAddress(LOCAL_ADDR_ID); + Bootstrap cb = new Bootstrap(); + ServerBootstrap sb = new ServerBootstrap(); + + cb.group(clientGroup) + .channel(LocalChannel.class) + .handler(new TestHandler()); + + sb.group(serverGroup) + .channel(LocalServerChannel.class) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(new TestHandler()); + } + }); + + // Start server + sb.bind(addr).sync(); + + // Connect to the server + final Channel cc = cb.connect(addr).sync().channel(); + + // Close the channel and write something. + cc.close().sync(); + try { + cc.write(new Object()).sync(); + fail("must raise a ClosedChannelException"); + } catch (Exception e) { + assertThat(e, is(instanceOf(ClosedChannelException.class))); + // Ensure that the actual write attempt on a closed channel was never made by asserting that + // the ClosedChannelException has been created by AbstractUnsafe rather than transport implementations. + assertThat(e.getStackTrace()[0].getClassName(), is(AbstractChannel.class.getName() + "$AbstractUnsafe")); + e.printStackTrace(); + } + + serverGroup.shutdownGracefully(); + clientGroup.shutdownGracefully(); + serverGroup.terminationFuture().sync(); + clientGroup.terminationFuture().sync(); + } + static class TestHandler extends ChannelInboundHandlerAdapter { @Override public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { - for (int i = 0; i < msgs.size(); i ++) { + final int size = msgs.size(); + for (int i = 0; i < size; i ++) { logger.info(String.format("Received mesage: %s", msgs.get(i))); } }