Fix hanging SocketBufReleaseTest / Make sure AioServerSocketChannel closes the accepted channel when the server socket is being shut down

This commit is contained in:
Trustin Lee 2013-04-23 22:38:28 +09:00
parent f03b2cde62
commit cc0ad9f1cc
2 changed files with 52 additions and 5 deletions

View File

@ -18,20 +18,29 @@ package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
public class SocketBufReleaseTest extends AbstractSocketTest {
private static final EventExecutor executor =
new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
@Test
public void testBufRelease() throws Throwable {
run();
@ -47,8 +56,13 @@ public class SocketBufReleaseTest extends AbstractSocketTest {
Channel sc = sb.bind().sync().channel();
Channel cc = cb.connect().sync().channel();
// Ensure the server socket accepted the client connection *and* initialized pipeline successfully.
serverHandler.channelFuture.sync();
// and then close all sockets.
sc.close().sync();
cc.close().sync();
serverHandler.check();
clientHandler.check();
}
@ -58,6 +72,23 @@ public class SocketBufReleaseTest extends AbstractSocketTest {
private final Random random = new Random();
private final CountDownLatch latch = new CountDownLatch(1);
private ByteBuf buf;
private final Promise<Channel> channelFuture = new DefaultPromise<Channel>(executor);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
channelFuture.setSuccess(ctx.channel());
}
@Override
public MessageBuf<Object> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return super.newInboundBuffer(ctx);
}
@Override
public void freeInboundBuffer(ChannelHandlerContext ctx) throws Exception {
super.freeInboundBuffer(ctx);
}
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
byte[] data = new byte[1024];

View File

@ -16,8 +16,10 @@
package io.netty.channel.socket.aio;
import io.netty.buffer.BufType;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.aio.AbstractAioChannel;
@ -181,11 +183,25 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server
@Override
protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel channel) {
channel.acceptInProgress = false;
ChannelPipeline pipeline = channel.pipeline();
MessageBuf<Object> buffer = pipeline.inboundMessageBuffer();
if (buffer.refCnt() == 0) {
try {
ch.close();
} catch (IOException e) {
logger.warn(
"Failed to close a socket which was accepted while its server socket is being closed",
e);
}
return;
}
// create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, ch));
channel.pipeline().fireInboundBufferUpdated();
channel.pipeline().fireChannelReadSuspended();
buffer.add(new AioSocketChannel(channel, null, ch));
pipeline.fireInboundBufferUpdated();
pipeline.fireChannelReadSuspended();
}
@Override