diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index d2def281fc..e35ad26206 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -16,13 +16,19 @@ package io.netty.channel; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; import io.netty.buffer.ReferenceCounted; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalEventLoopGroup; +import io.netty.channel.local.LocalServerChannel; +import org.junit.After; +import org.junit.AfterClass; import org.junit.Test; import java.util.ArrayList; @@ -31,19 +37,71 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; public class DefaultChannelPipelineTest { + + private static final EventLoopGroup group = new LocalEventLoopGroup(1); + + private Channel self; + private Channel peer; + + @AfterClass + public static void afterClass() { + group.shutdownGracefully(); + } + + private void setUp(final ChannelHandler... handlers) throws Exception { + final AtomicReference peerRef = new AtomicReference(); + ServerBootstrap sb = new ServerBootstrap(); + sb.group(group).channel(LocalServerChannel.class); + sb.childHandler(new ChannelInboundMessageHandlerAdapter() { + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + peerRef.set(ctx.channel()); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { + // Swallow. + } + }); + + ChannelFuture bindFuture = sb.bind(LocalAddress.ANY).sync(); + + Bootstrap b = new Bootstrap(); + b.group(group).channel(LocalChannel.class); + b.handler(new ChannelInitializer() { + @Override + protected void initChannel(LocalChannel ch) throws Exception { + ch.pipeline().addLast(handlers); + } + }); + + self = b.connect(bindFuture.channel().localAddress()).sync().channel(); + peer = peerRef.get(); + + bindFuture.channel().close().sync(); + } + + @After + public void tearDown() throws Exception { + if (peer != null) { + peer.close(); + peer = null; + } + if (self != null) { + self = null; + } + } + @Test public void testMessageCatchAllInboundSink() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - - group.register(channel).awaitUninterruptibly(); final AtomicBoolean forwarded = new AtomicBoolean(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - pipeline.addLast(new ChannelInboundMessageHandlerAdapter() { + + setUp(new ChannelInboundMessageHandlerAdapter() { @Override public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { forwarded.set(ctx.nextInboundMessageBuffer().add(msg)); @@ -54,26 +112,16 @@ public class DefaultChannelPipelineTest { ctx.fireInboundBufferUpdated(); } }); - channel.eventLoop().submit(new Runnable() { - @Override - public void run() { - pipeline.fireChannelActive(); - pipeline.inboundMessageBuffer().add(new Object()); - pipeline.fireInboundBufferUpdated(); - } - }).get(); + + peer.write(new Object()).sync(); assertTrue(forwarded.get()); } @Test public void testByteCatchAllInboundSink() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); final AtomicBoolean forwarded = new AtomicBoolean(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - pipeline.addLast(new ChannelInboundByteHandlerAdapter() { + setUp(new ChannelInboundByteHandlerAdapter() { @Override protected void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf out = ctx.nextInboundByteBuffer(); @@ -82,26 +130,23 @@ public class DefaultChannelPipelineTest { ctx.fireInboundBufferUpdated(); } }); - channel.eventLoop().submit(new Runnable() { + + // Not using peer.write() because the pipeline will convert the bytes into a message automatically. + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.fireChannelActive(); - pipeline.inboundByteBuffer().writeByte(0); - pipeline.fireInboundBufferUpdated(); + self.pipeline().inboundByteBuffer().writeByte(0); + self.pipeline().fireInboundBufferUpdated(); } - }).get(); + }).sync(); assertTrue(forwarded.get()); } @Test public void testByteCatchAllOutboundSink() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); final AtomicBoolean forwarded = new AtomicBoolean(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - pipeline.addLast(new ChannelOutboundByteHandlerAdapter() { + setUp(new ChannelOutboundByteHandlerAdapter() { @Override protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception { ByteBuf out = ctx.nextOutboundByteBuffer(); @@ -110,21 +155,20 @@ public class DefaultChannelPipelineTest { ctx.flush(promise); } }); - channel.eventLoop().submit(new Runnable() { + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.fireChannelActive(); - pipeline.outboundByteBuffer().writeByte(0); - pipeline.flush(); + self.pipeline().outboundByteBuffer().writeByte(0); + self.pipeline().flush(); } - }).get(); + }).sync(); - Thread.sleep(1000); assertTrue(forwarded.get()); } @Test - public void testFreeCalled() throws InterruptedException { + public void testFreeCalled() throws Exception { final CountDownLatch free = new CountDownLatch(1); final ReferenceCounted holder = new ReferenceCounted() { @@ -160,21 +204,11 @@ public class DefaultChannelPipelineTest { return true; } }; - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); StringInboundHandler handler = new StringInboundHandler(); - pipeline.addLast(handler); - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - pipeline.fireChannelActive(); - pipeline.inboundMessageBuffer().add(holder); - pipeline.fireInboundBufferUpdated(); - } - }); + setUp(handler); + + peer.write(holder).sync(); assertTrue(free.await(10, TimeUnit.SECONDS)); assertTrue(handler.called); @@ -197,7 +231,7 @@ public class DefaultChannelPipelineTest { @Test public void testRemoveChannelHandler() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); + ChannelPipeline pipeline = new LocalChannel().pipeline(); ChannelHandler handler1 = newHandler(); ChannelHandler handler2 = newHandler(); @@ -217,7 +251,7 @@ public class DefaultChannelPipelineTest { @Test public void testReplaceChannelHandler() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); + ChannelPipeline pipeline = new LocalChannel().pipeline(); ChannelHandler handler1 = newHandler(); pipeline.addLast("handler1", handler1); @@ -242,7 +276,7 @@ public class DefaultChannelPipelineTest { @Test public void testChannelHandlerContextNavigation() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); + ChannelPipeline pipeline = new LocalChannel().pipeline(); final int HANDLER_ARRAY_LEN = 5; ChannelHandler[] firstHandlers = newHandlers(HANDLER_ARRAY_LEN); @@ -256,7 +290,8 @@ public class DefaultChannelPipelineTest { @Test public void testPipelineOperation() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + final int handlerNum = 5; ChannelHandler[] handlers1 = newHandlers(handlerNum); ChannelHandler[] handlers2 = newHandlers(handlerNum); @@ -283,7 +318,8 @@ public class DefaultChannelPipelineTest { @Test public void testChannelHandlerContextOrder() { - DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); + ChannelPipeline pipeline = new LocalChannel().pipeline(); + pipeline.addFirst("1", newHandler()); pipeline.addLast("10", newHandler()); @@ -311,342 +347,309 @@ public class DefaultChannelPipelineTest { verifyContextNumber(pipeline, 8); } - @Test + @Test(timeout = 100000) public void testRemoveAndForwardInboundByte() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelInboundByteHandlerImpl handler1 = new ChannelInboundByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler2 = new ChannelInboundByteHandlerImpl(); - pipeline.addLast("handler1", handler1); - pipeline.addLast("handler2", handler2); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1, handler2); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).inboundByteBuffer().writeLong(8); - assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes()); - assertEquals(0, pipeline.context(handler2).inboundByteBuffer().readableBytes()); - pipeline.remove(handler1); - assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler1).inboundByteBuffer().writeLong(8); + assertEquals(8, p.context(handler1).inboundByteBuffer().readableBytes()); + assertEquals(0, p.context(handler2).inboundByteBuffer().readableBytes()); + p.remove(handler1); + assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler2.updated); } - @Test + @Test(timeout = 100000) public void testReplaceAndForwardInboundByte() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelInboundByteHandlerImpl handler1 = new ChannelInboundByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler2 = new ChannelInboundByteHandlerImpl(); - pipeline.addLast("handler1", handler1); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).inboundByteBuffer().writeLong(8); - assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes()); - pipeline.replace(handler1, "handler2", handler2); - assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler1).inboundByteBuffer().writeLong(8); + assertEquals(8, p.context(handler1).inboundByteBuffer().readableBytes()); + p.replace(handler1, "handler2", handler2); + assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler2.updated); } - @Test + @Test(timeout = 10000) public void testRemoveAndForwardOutboundByte() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl(); final ChannelOutboundByteHandlerImpl handler2 = new ChannelOutboundByteHandlerImpl(); - pipeline.addLast("handler1", handler1); - pipeline.addLast("handler2", handler2); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1, handler2); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler2).outboundByteBuffer().writeLong(8); - assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); - assertEquals(0, pipeline.context(handler1).outboundByteBuffer().readableBytes()); - pipeline.remove(handler2); - assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler2).outboundByteBuffer().writeLong(8); + assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes()); + assertEquals(0, p.context(handler1).outboundByteBuffer().readableBytes()); + self.pipeline().remove(handler2); + assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler1.flushed); } - @Test + @Test(timeout = 10000) public void testReplaceAndForwardOutboundByte() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl(); final ChannelOutboundByteHandlerImpl handler2 = new ChannelOutboundByteHandlerImpl(); - pipeline.addLast("handler1", handler1); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).outboundByteBuffer().writeLong(8); - assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); - pipeline.replace(handler1, "handler2", handler2); - assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler1).outboundByteBuffer().writeLong(8); + assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes()); + p.replace(handler1, "handler2", handler2); + assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler2.flushed); } - @Test + @Test(timeout = 10000) public void testReplaceAndForwardDuplexByte() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ByteHandlerImpl handler1 = new ByteHandlerImpl(); final ByteHandlerImpl handler2 = new ByteHandlerImpl(); - pipeline.addLast("handler1", handler1); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).outboundByteBuffer().writeLong(8); - pipeline.context(handler1).inboundByteBuffer().writeLong(8); + ChannelPipeline p = self.pipeline(); + p.context(handler1).outboundByteBuffer().writeLong(8); + p.context(handler1).inboundByteBuffer().writeLong(8); - assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); - assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes()); + assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes()); + assertEquals(8, p.context(handler1).inboundByteBuffer().readableBytes()); - pipeline.replace(handler1, "handler2", handler2); - assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); - assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); - - latch.countDown(); + p.replace(handler1, "handler2", handler2); + assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes()); + assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(((ChannelInboundByteHandlerImpl) handler2.stateHandler()).updated); assertTrue(((ChannelOutboundByteHandlerImpl) handler2.operationHandler()).flushed); } - @Test + @Test(timeout = 10000) public void testRemoveAndForwardDuplexByte() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl(); final ByteHandlerImpl handler2 = new ByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler3 = new ChannelInboundByteHandlerImpl(); - pipeline.addLast("handler1", handler1); - pipeline.addLast("handler2", handler2); - pipeline.addLast("handler3", handler3); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + setUp(handler1, handler2, handler3); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler2).outboundByteBuffer().writeLong(8); - pipeline.context(handler2).inboundByteBuffer().writeLong(8); + ChannelPipeline p = self.pipeline(); + p.context(handler2).outboundByteBuffer().writeLong(8); + p.context(handler2).inboundByteBuffer().writeLong(8); - assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); - assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); + assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes()); + assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes()); - assertEquals(0, pipeline.context(handler1).outboundByteBuffer().readableBytes()); - assertEquals(0, pipeline.context(handler3).inboundByteBuffer().readableBytes()); + assertEquals(0, p.context(handler1).outboundByteBuffer().readableBytes()); + assertEquals(0, p.context(handler3).inboundByteBuffer().readableBytes()); - pipeline.remove(handler2); - assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); - assertEquals(8, pipeline.context(handler3).inboundByteBuffer().readableBytes()); - latch.countDown(); + p.remove(handler2); + assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes()); + assertEquals(8, p.context(handler3).inboundByteBuffer().readableBytes()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler1.flushed); assertTrue(handler3.updated); } - @Test + @Test(timeout = 10000) public void testRemoveAndForwardInboundMessage() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelInboundMessageHandlerImpl handler1 = new ChannelInboundMessageHandlerImpl(); final ChannelInboundMessageHandlerImpl handler2 = new ChannelInboundMessageHandlerImpl(); - pipeline.addLast("handler1", handler1); - pipeline.addLast("handler2", handler2); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1, handler2); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).inboundMessageBuffer().add(new Object()); - assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size()); - assertEquals(0, pipeline.context(handler2).inboundMessageBuffer().size()); - pipeline.remove(handler1); - assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler1).inboundMessageBuffer().add(new Object()); + assertEquals(1, p.context(handler1).inboundMessageBuffer().size()); + assertEquals(0, p.context(handler2).inboundMessageBuffer().size()); + p.remove(handler1); + assertEquals(1, p.context(handler2).inboundMessageBuffer().size()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler2.updated); } - @Test + @Test(timeout = 10000) public void testReplaceAndForwardInboundMessage() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelInboundMessageHandlerImpl handler1 = new ChannelInboundMessageHandlerImpl(); final ChannelInboundMessageHandlerImpl handler2 = new ChannelInboundMessageHandlerImpl(); - pipeline.addLast("handler1", handler1); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).inboundMessageBuffer().add(new Object()); - assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size()); - pipeline.replace(handler1, "handler2", handler2); - assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler1).inboundMessageBuffer().add(new Object()); + assertEquals(1, p.context(handler1).inboundMessageBuffer().size()); + p.replace(handler1, "handler2", handler2); + assertEquals(1, p.context(handler2).inboundMessageBuffer().size()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler2.updated); } - @Test + @Test(timeout = 10000) public void testRemoveAndForwardOutboundMessage() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl(); final ChannelOutboundMessageHandlerImpl handler2 = new ChannelOutboundMessageHandlerImpl(); - pipeline.addLast("handler1", handler1); - pipeline.addLast("handler2", handler2); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1, handler2); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler2).outboundMessageBuffer().add(new Object()); - assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size()); - assertEquals(0, pipeline.context(handler1).outboundMessageBuffer().size()); - pipeline.remove(handler2); - assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler2).outboundMessageBuffer().add(new Object()); + assertEquals(1, p.context(handler2).outboundMessageBuffer().size()); + assertEquals(0, p.context(handler1).outboundMessageBuffer().size()); + p.remove(handler2); + assertEquals(1, p.context(handler1).outboundMessageBuffer().size()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler1.flushed); } - @Test + @Test(timeout = 10000) public void testReplaceAndForwardOutboundMessage() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl(); final ChannelOutboundMessageHandlerImpl handler2 = new ChannelOutboundMessageHandlerImpl(); - pipeline.addLast("handler1", handler1); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).outboundMessageBuffer().add(new Object()); - assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size()); - pipeline.replace(handler1, "handler2", handler2); - assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size()); - latch.countDown(); + ChannelPipeline p = self.pipeline(); + p.context(handler1).outboundMessageBuffer().add(new Object()); + assertEquals(1, p.context(handler1).outboundMessageBuffer().size()); + p.replace(handler1, "handler2", handler2); + assertEquals(1, p.context(handler2).outboundMessageBuffer().size()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(handler2.flushed); } - @Test + @Test(timeout = 10000) public void testReplaceAndForwardDuplexMessage() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - final MessageHandlerImpl handler1 = new MessageHandlerImpl(); final MessageHandlerImpl handler2 = new MessageHandlerImpl(); - pipeline.addLast("handler1", handler1); - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { + + setUp(handler1); + + self.eventLoop().submit(new Runnable() { @Override public void run() { - pipeline.context(handler1).outboundMessageBuffer().add(new Object()); - pipeline.context(handler1).inboundMessageBuffer().add(new Object()); + ChannelPipeline p = self.pipeline(); + p.context(handler1).outboundMessageBuffer().add(new Object()); + p.context(handler1).inboundMessageBuffer().add(new Object()); - assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size()); - assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size()); + assertEquals(1, p.context(handler1).outboundMessageBuffer().size()); + assertEquals(1, p.context(handler1).inboundMessageBuffer().size()); - pipeline.replace(handler1, "handler2", handler2); - assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size()); - assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size()); - - latch.countDown(); + p.replace(handler1, "handler2", handler2); + assertEquals(1, p.context(handler2).outboundMessageBuffer().size()); + assertEquals(1, p.context(handler2).inboundMessageBuffer().size()); } - }); + }).sync(); - assertTrue(latch.await(10, TimeUnit.SECONDS)); assertTrue(((ChannelInboundMessageHandlerImpl) handler2.stateHandler()).updated); assertTrue(((ChannelOutboundMessageHandlerImpl) handler2.operationHandler()).flushed); } + @Test(timeout = 10000) + public void testRemoveAndForwardDuplexMessage() throws Exception { + final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl(); + final MessageHandlerImpl handler2 = new MessageHandlerImpl(); + final ChannelInboundMessageHandlerImpl handler3 = new ChannelInboundMessageHandlerImpl(); + + setUp(handler1, handler2, handler3); + + self.eventLoop().submit(new Runnable() { + @Override + public void run() { + ChannelPipeline p = self.pipeline(); + p.context(handler2).outboundMessageBuffer().add(new Object()); + p.context(handler2).inboundMessageBuffer().add(new Object()); + + assertEquals(1, p.context(handler2).outboundMessageBuffer().size()); + assertEquals(1, p.context(handler2).inboundMessageBuffer().size()); + + assertEquals(0, p.context(handler1).outboundMessageBuffer().size()); + assertEquals(0, p.context(handler3).inboundMessageBuffer().size()); + + p.remove(handler2); + assertEquals(1, p.context(handler1).outboundMessageBuffer().size()); + assertEquals(1, p.context(handler3).inboundMessageBuffer().size()); + } + }).sync(); + + assertTrue(handler1.flushed); + assertTrue(handler3.updated); + } + @Test(timeout = 20000) - public void testLifeCycleAware() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); + public void testLifeCycleAwareness() throws Exception { + setUp(); + + ChannelPipeline p = self.pipeline(); final List handlers = new ArrayList(); - final CountDownLatch addLatch = new CountDownLatch(20); - for (int i = 0; i < 20; i++) { + final int COUNT = 20; + final CountDownLatch addLatch = new CountDownLatch(COUNT); + for (int i = 0; i < COUNT; i++) { final LifeCycleAwareTestHandler handler = new LifeCycleAwareTestHandler("handler-" + i); // Add handler. - pipeline.addFirst(handler.name, handler); - channel.eventLoop().execute(new Runnable() { + p.addFirst(handler.name, handler); + self.eventLoop().execute(new Runnable() { @Override public void run() { // Validate handler life-cycle methods called. @@ -664,12 +667,12 @@ public class DefaultChannelPipelineTest { // Change the order of remove operations over all handlers in the pipeline. Collections.shuffle(handlers); - final CountDownLatch removeLatch = new CountDownLatch(20); + final CountDownLatch removeLatch = new CountDownLatch(COUNT); for (final LifeCycleAwareTestHandler handler : handlers) { - assertSame(handler, pipeline.remove(handler.name)); + assertSame(handler, p.remove(handler.name)); - channel.eventLoop().execute(new Runnable() { + self.eventLoop().execute(new Runnable() { @Override public void run() { // Validate handler life-cycle methods called. @@ -681,44 +684,6 @@ public class DefaultChannelPipelineTest { removeLatch.await(); } - @Test - public void testRemoveAndForwardDuplexMessage() throws Exception { - LocalChannel channel = new LocalChannel(); - LocalEventLoopGroup group = new LocalEventLoopGroup(); - group.register(channel).awaitUninterruptibly(); - final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); - - final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl(); - final MessageHandlerImpl handler2 = new MessageHandlerImpl(); - final ChannelInboundMessageHandlerImpl handler3 = new ChannelInboundMessageHandlerImpl(); - pipeline.addLast("handler1", handler1); - pipeline.addLast("handler2", handler2); - pipeline.addLast("handler3", handler3); - - final CountDownLatch latch = new CountDownLatch(1); - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - pipeline.context(handler2).outboundMessageBuffer().add(new Object()); - pipeline.context(handler2).inboundMessageBuffer().add(new Object()); - - assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size()); - assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size()); - - assertEquals(0, pipeline.context(handler1).outboundMessageBuffer().size()); - assertEquals(0, pipeline.context(handler3).inboundMessageBuffer().size()); - - pipeline.remove(handler2); - assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size()); - assertEquals(1, pipeline.context(handler3).inboundMessageBuffer().size()); - latch.countDown(); - } - }); - - assertTrue(latch.await(10, TimeUnit.SECONDS)); - assertTrue(handler1.flushed); - assertTrue(handler3.updated); - } private static int next(DefaultChannelHandlerContext ctx) { DefaultChannelHandlerContext next = ctx.next; if (next == null) { @@ -736,10 +701,10 @@ public class DefaultChannelPipelineTest { } } - private static void verifyContextNumber(DefaultChannelPipeline pipeline, int expectedNumber) { + private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) { DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext(); int handlerNumber = 0; - while (ctx != pipeline.tail) { + while (ctx != ((DefaultChannelPipeline) pipeline).tail) { handlerNumber++; ctx = ctx.next; }