Make sure the inbound/outbound buffer of the ChannelHandlerContext is only modified within the EventLoop

This commit is contained in:
Norman Maurer 2013-02-05 16:19:04 +01:00
parent fd40df9033
commit 86b4cde82f
4 changed files with 36 additions and 25 deletions

View File

@ -788,7 +788,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (ctx.hasInboundByteBuffer()) {
if (ctx.executor().inEventLoop()) {
return ctx.inboundByteBuffer();
} else {
}
if (executor().inEventLoop()) {
ByteBridge bridge = ctx.inByteBridge;
if (bridge == null) {
bridge = new ByteBridge(ctx);
@ -798,6 +799,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
return bridge.byteBuf;
}
throw new IllegalStateException("nextInboundByteBuffer() called from outside the eventLoop");
}
ctx = ctx.next;
}
@ -824,7 +826,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (ctx.hasInboundMessageBuffer()) {
if (ctx.executor().inEventLoop()) {
return ctx.inboundMessageBuffer();
} else {
}
if (executor().inEventLoop()) {
MessageBridge bridge = ctx.inMsgBridge;
if (bridge == null) {
bridge = new MessageBridge();
@ -834,6 +837,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
return bridge.msgBuf;
}
throw new IllegalStateException("nextInboundMessageBuffer() called from outside the eventLoop");
}
ctx = ctx.next;
}
@ -847,7 +851,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (ctx.hasOutboundByteBuffer()) {
if (ctx.executor().inEventLoop()) {
return ctx.outboundByteBuffer();
} else {
}
if (executor().inEventLoop()) {
ByteBridge bridge = ctx.outByteBridge;
if (bridge == null) {
bridge = new ByteBridge(ctx);
@ -857,6 +862,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
return bridge.byteBuf;
}
throw new IllegalStateException("nextOutboundByteBuffer() called from outside the eventLoop");
}
ctx = ctx.prev;
@ -884,7 +890,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (ctx.hasOutboundMessageBuffer()) {
if (ctx.executor().inEventLoop()) {
return ctx.outboundMessageBuffer();
} else {
}
if (executor().inEventLoop()) {
MessageBridge bridge = ctx.outMsgBridge;
if (bridge == null) {
bridge = new MessageBridge();
@ -894,6 +901,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
}
return bridge.msgBuf;
}
throw new IllegalStateException("nextOutboundMessageBuffer() called from outside the eventLoop");
}
ctx = ctx.prev;

View File

@ -32,7 +32,7 @@ public class DefaultChannelPipelineTest {
public void testFreeCalled() throws InterruptedException{
final CountDownLatch free = new CountDownLatch(1);
Freeable holder = new Freeable() {
final Freeable holder = new Freeable() {
@Override
public void free() {
free.countDown();
@ -46,13 +46,18 @@ public class DefaultChannelPipelineTest {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
StringInboundHandler handler = new StringInboundHandler();
pipeline.addLast(handler);
pipeline.fireChannelActive();
pipeline.inboundMessageBuffer().add(holder);
pipeline.fireInboundBufferUpdated();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
pipeline.inboundMessageBuffer().add(holder);
pipeline.fireInboundBufferUpdated();
}
});
assertTrue(free.await(10, TimeUnit.SECONDS));
assertTrue(handler.called);

View File

@ -25,6 +25,8 @@ import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import static org.junit.Assert.*;
public class LocalChannelRegistryTest {
@ -59,12 +61,19 @@ public class LocalChannelRegistryTest {
// Start server
Channel sc = sb.bind(addr).sync().channel();
final CountDownLatch latch = new CountDownLatch(1);
// Connect to the server
Channel cc = cb.connect(addr).sync().channel();
// Send a message event up the pipeline.
cc.pipeline().inboundMessageBuffer().add("Hello, World");
cc.pipeline().fireInboundBufferUpdated();
final Channel cc = cb.connect(addr).sync().channel();
cc.eventLoop().execute(new Runnable() {
@Override
public void run() {
// Send a message event up the pipeline.
cc.pipeline().inboundMessageBuffer().add("Hello, World");
cc.pipeline().fireInboundBufferUpdated();
latch.countDown();
}
});
latch.await();
// Close the channel
cc.close().sync();

View File

@ -111,17 +111,6 @@ public class LocalTransportThreadModelTest2 {
// Wait until the connection is closed or the connection attempt fails.
localChannel.closeFuture().awaitUninterruptibly();
MessageBuf<Object> inboundMessageBuffer = localChannel.pipeline().inboundMessageBuffer();
if (!inboundMessageBuffer.isEmpty()) {
// sometimes we close the pipeline before everything on it has been notified/received.
// we want these messages, since they are in our queue.
Iterator<Object> iterator = inboundMessageBuffer.iterator();
while (iterator.hasNext()) {
Object next = iterator.next();
System.err.println("DEFERRED on close: " + next);
iterator.remove();
}
}
}
@Sharable