diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index c29b07db64..7f4c3b1c4b 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -497,6 +497,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public final void close(final ChannelPromise promise) { + if (inFlush0) { + invokeLater(new Runnable() { + @Override + public void run() { + close(promise); + } + }); + return; + } + boolean wasActive = isActive(); if (closeFuture.setClosed()) { try { diff --git a/transport/src/test/java/io/netty/channel/nio/NioSocketChannelTest.java b/transport/src/test/java/io/netty/channel/nio/NioSocketChannelTest.java new file mode 100644 index 0000000000..2436802784 --- /dev/null +++ b/transport/src/test/java/io/netty/channel/nio/NioSocketChannelTest.java @@ -0,0 +1,95 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.nio; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.junit.Test; + +import java.io.InputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + + +public class NioSocketChannelTest { + + /** + * Test try to reproduce issue #1600 + */ + @Test + public void testFlushCloseReentrance() throws Exception { + NioEventLoopGroup group = new NioEventLoopGroup(1); + try { + final Queue futures = new LinkedBlockingQueue(); + + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group).channel(NioServerSocketChannel.class); + sb.childHandler(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + // Write a large enough data so that it is split into two loops. + futures.add( + ctx.write(ctx.alloc().buffer().writeZero(1048576)).addListener(ChannelFutureListener.CLOSE)); + futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576))); + ctx.flush(); + futures.add(ctx.write(ctx.alloc().buffer().writeZero(1048576))); + ctx.flush(); + } + }); + + SocketAddress address = sb.bind(0).sync().channel().localAddress(); + + Socket s = new Socket(); + s.connect(address); + + InputStream in = s.getInputStream(); + byte[] buf = new byte[8192]; + for (;;) { + if (in.read(buf) == -1) { + break; + } + + // Wait a little bit so that the write attempts are split into multiple flush attempts. + Thread.sleep(10); + } + s.close(); + + assertThat(futures.size(), is(3)); + ChannelFuture f1 = futures.poll(); + ChannelFuture f2 = futures.poll(); + ChannelFuture f3 = futures.poll(); + assertThat(f1.isSuccess(), is(true)); + assertThat(f2.isDone(), is(true)); + assertThat(f2.isSuccess(), is(false)); + assertThat(f2.cause(), is(instanceOf(ClosedChannelException.class))); + assertThat(f3.isDone(), is(true)); + assertThat(f3.isSuccess(), is(false)); + assertThat(f3.cause(), is(instanceOf(ClosedChannelException.class))); + } finally { + group.shutdownGracefully().sync(); + } + } +}