From cc0ad9f1cce1101a841e45f79120718da923bd20 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 23 Apr 2013 22:38:28 +0900 Subject: [PATCH] Fix hanging SocketBufReleaseTest / Make sure AioServerSocketChannel closes the accepted channel when the server socket is being shut down --- .../socket/SocketBufReleaseTest.java | 33 ++++++++++++++++++- .../socket/aio/AioServerSocketChannel.java | 24 +++++++++++--- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java index 1e1af13294..ea22d69169 100644 --- a/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java +++ b/testsuite/src/test/java/io/netty/testsuite/transport/socket/SocketBufReleaseTest.java @@ -18,20 +18,29 @@ package io.netty.testsuite.transport.socket; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundMessageHandlerAdapter; +import io.netty.util.concurrent.DefaultEventExecutorGroup; +import io.netty.util.concurrent.DefaultPromise; +import io.netty.util.concurrent.DefaultThreadFactory; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.Promise; import org.junit.Test; import java.util.Random; import java.util.concurrent.CountDownLatch; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; public class SocketBufReleaseTest extends AbstractSocketTest { + private static final EventExecutor executor = + new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next(); + @Test public void testBufRelease() throws Throwable { run(); @@ -47,8 +56,13 @@ public class SocketBufReleaseTest extends AbstractSocketTest { Channel sc = sb.bind().sync().channel(); Channel cc = cb.connect().sync().channel(); + // Ensure the server socket accepted the client connection *and* initialized pipeline successfully. + serverHandler.channelFuture.sync(); + + // and then close all sockets. sc.close().sync(); cc.close().sync(); + serverHandler.check(); clientHandler.check(); } @@ -58,6 +72,23 @@ public class SocketBufReleaseTest extends AbstractSocketTest { private final Random random = new Random(); private final CountDownLatch latch = new CountDownLatch(1); private ByteBuf buf; + private final Promise channelFuture = new DefaultPromise(executor); + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + channelFuture.setSuccess(ctx.channel()); + } + + @Override + public MessageBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { + return super.newInboundBuffer(ctx); + } + + @Override + public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception { + super.freeInboundBuffer(ctx); + } + @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { byte[] data = new byte[1024]; diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 5b31709831..7976428a19 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -16,8 +16,10 @@ package io.netty.channel.socket.aio; import io.netty.buffer.BufType; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelException; import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoop; import io.netty.channel.aio.AbstractAioChannel; @@ -181,11 +183,25 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server @Override protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) { channel.acceptInProgress = false; + + ChannelPipeline pipeline = channel.pipeline(); + MessageBuf buffer = pipeline.inboundMessageBuffer(); + + if (buffer.refCnt() == 0) { + try { + ch.close(); + } catch (IOException e) { + logger.warn( + "Failed to close a socket which was accepted while its server socket is being closed", + e); + } + return; + } + // create the socket add it to the buffer and fire the event - channel.pipeline().inboundMessageBuffer().add( - new AioSocketChannel(channel, null, ch)); - channel.pipeline().fireInboundBufferUpdated(); - channel.pipeline().fireChannelReadSuspended(); + buffer.add(new AioSocketChannel(channel, null, ch)); + pipeline.fireInboundBufferUpdated(); + pipeline.fireChannelReadSuspended(); } @Override