Clean up DefaultChannelPipelineTest

- Use the local transport in a correct way (i.e. no need to trigger channelActive et al by ourselves)
- Use Promise/Future instead of CountDownLatch where they simplifies
This commit is contained in:
Trustin Lee 2013-05-17 10:54:20 +09:00
parent f841056752
commit e1a378aa03

View File

@ -16,13 +16,19 @@
package io.netty.channel; package io.netty.channel;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf; import io.netty.buffer.MessageBuf;
import io.netty.buffer.ReferenceCounted; import io.netty.buffer.ReferenceCounted;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup; import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
@ -31,19 +37,71 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public class DefaultChannelPipelineTest { public class DefaultChannelPipelineTest {
private static final EventLoopGroup group = new LocalEventLoopGroup(1);
private Channel self;
private Channel peer;
@AfterClass
public static void afterClass() {
group.shutdownGracefully();
}
private void setUp(final ChannelHandler... handlers) throws Exception {
final AtomicReference<Channel> peerRef = new AtomicReference<Channel>();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group).channel(LocalServerChannel.class);
sb.childHandler(new ChannelInboundMessageHandlerAdapter<Object>() {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
peerRef.set(ctx.channel());
}
@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
// Swallow.
}
});
ChannelFuture bindFuture = sb.bind(LocalAddress.ANY).sync();
Bootstrap b = new Bootstrap();
b.group(group).channel(LocalChannel.class);
b.handler(new ChannelInitializer<LocalChannel>() {
@Override
protected void initChannel(LocalChannel ch) throws Exception {
ch.pipeline().addLast(handlers);
}
});
self = b.connect(bindFuture.channel().localAddress()).sync().channel();
peer = peerRef.get();
bindFuture.channel().close().sync();
}
@After
public void tearDown() throws Exception {
if (peer != null) {
peer.close();
peer = null;
}
if (self != null) {
self = null;
}
}
@Test @Test
public void testMessageCatchAllInboundSink() throws Exception { public void testMessageCatchAllInboundSink() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final AtomicBoolean forwarded = new AtomicBoolean(); final AtomicBoolean forwarded = new AtomicBoolean();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
pipeline.addLast(new ChannelInboundMessageHandlerAdapter<Object>() { setUp(new ChannelInboundMessageHandlerAdapter<Object>() {
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
forwarded.set(ctx.nextInboundMessageBuffer().add(msg)); forwarded.set(ctx.nextInboundMessageBuffer().add(msg));
@ -54,26 +112,16 @@ public class DefaultChannelPipelineTest {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} }
}); });
channel.eventLoop().submit(new Runnable() {
@Override peer.write(new Object()).sync();
public void run() {
pipeline.fireChannelActive();
pipeline.inboundMessageBuffer().add(new Object());
pipeline.fireInboundBufferUpdated();
}
}).get();
assertTrue(forwarded.get()); assertTrue(forwarded.get());
} }
@Test @Test
public void testByteCatchAllInboundSink() throws Exception { public void testByteCatchAllInboundSink() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final AtomicBoolean forwarded = new AtomicBoolean(); final AtomicBoolean forwarded = new AtomicBoolean();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); setUp(new ChannelInboundByteHandlerAdapter() {
pipeline.addLast(new ChannelInboundByteHandlerAdapter() {
@Override @Override
protected void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { protected void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf out = ctx.nextInboundByteBuffer(); ByteBuf out = ctx.nextInboundByteBuffer();
@ -82,26 +130,23 @@ public class DefaultChannelPipelineTest {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} }
}); });
channel.eventLoop().submit(new Runnable() {
// Not using peer.write() because the pipeline will convert the bytes into a message automatically.
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.fireChannelActive(); self.pipeline().inboundByteBuffer().writeByte(0);
pipeline.inboundByteBuffer().writeByte(0); self.pipeline().fireInboundBufferUpdated();
pipeline.fireInboundBufferUpdated();
} }
}).get(); }).sync();
assertTrue(forwarded.get()); assertTrue(forwarded.get());
} }
@Test @Test
public void testByteCatchAllOutboundSink() throws Exception { public void testByteCatchAllOutboundSink() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final AtomicBoolean forwarded = new AtomicBoolean(); final AtomicBoolean forwarded = new AtomicBoolean();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel); setUp(new ChannelOutboundByteHandlerAdapter() {
pipeline.addLast(new ChannelOutboundByteHandlerAdapter() {
@Override @Override
protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception { protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception {
ByteBuf out = ctx.nextOutboundByteBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer();
@ -110,21 +155,20 @@ public class DefaultChannelPipelineTest {
ctx.flush(promise); ctx.flush(promise);
} }
}); });
channel.eventLoop().submit(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.fireChannelActive(); self.pipeline().outboundByteBuffer().writeByte(0);
pipeline.outboundByteBuffer().writeByte(0); self.pipeline().flush();
pipeline.flush();
} }
}).get(); }).sync();
Thread.sleep(1000);
assertTrue(forwarded.get()); assertTrue(forwarded.get());
} }
@Test @Test
public void testFreeCalled() throws InterruptedException { public void testFreeCalled() throws Exception {
final CountDownLatch free = new CountDownLatch(1); final CountDownLatch free = new CountDownLatch(1);
final ReferenceCounted holder = new ReferenceCounted() { final ReferenceCounted holder = new ReferenceCounted() {
@ -160,21 +204,11 @@ public class DefaultChannelPipelineTest {
return true; return true;
} }
}; };
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
StringInboundHandler handler = new StringInboundHandler(); StringInboundHandler handler = new StringInboundHandler();
pipeline.addLast(handler); setUp(handler);
channel.eventLoop().execute(new Runnable() {
@Override peer.write(holder).sync();
public void run() {
pipeline.fireChannelActive();
pipeline.inboundMessageBuffer().add(holder);
pipeline.fireInboundBufferUpdated();
}
});
assertTrue(free.await(10, TimeUnit.SECONDS)); assertTrue(free.await(10, TimeUnit.SECONDS));
assertTrue(handler.called); assertTrue(handler.called);
@ -197,7 +231,7 @@ public class DefaultChannelPipelineTest {
@Test @Test
public void testRemoveChannelHandler() { public void testRemoveChannelHandler() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); ChannelPipeline pipeline = new LocalChannel().pipeline();
ChannelHandler handler1 = newHandler(); ChannelHandler handler1 = newHandler();
ChannelHandler handler2 = newHandler(); ChannelHandler handler2 = newHandler();
@ -217,7 +251,7 @@ public class DefaultChannelPipelineTest {
@Test @Test
public void testReplaceChannelHandler() { public void testReplaceChannelHandler() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); ChannelPipeline pipeline = new LocalChannel().pipeline();
ChannelHandler handler1 = newHandler(); ChannelHandler handler1 = newHandler();
pipeline.addLast("handler1", handler1); pipeline.addLast("handler1", handler1);
@ -242,7 +276,7 @@ public class DefaultChannelPipelineTest {
@Test @Test
public void testChannelHandlerContextNavigation() { public void testChannelHandlerContextNavigation() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); ChannelPipeline pipeline = new LocalChannel().pipeline();
final int HANDLER_ARRAY_LEN = 5; final int HANDLER_ARRAY_LEN = 5;
ChannelHandler[] firstHandlers = newHandlers(HANDLER_ARRAY_LEN); ChannelHandler[] firstHandlers = newHandlers(HANDLER_ARRAY_LEN);
@ -256,7 +290,8 @@ public class DefaultChannelPipelineTest {
@Test @Test
public void testPipelineOperation() { public void testPipelineOperation() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); ChannelPipeline pipeline = new LocalChannel().pipeline();
final int handlerNum = 5; final int handlerNum = 5;
ChannelHandler[] handlers1 = newHandlers(handlerNum); ChannelHandler[] handlers1 = newHandlers(handlerNum);
ChannelHandler[] handlers2 = newHandlers(handlerNum); ChannelHandler[] handlers2 = newHandlers(handlerNum);
@ -283,7 +318,8 @@ public class DefaultChannelPipelineTest {
@Test @Test
public void testChannelHandlerContextOrder() { public void testChannelHandlerContextOrder() {
DefaultChannelPipeline pipeline = new DefaultChannelPipeline(new LocalChannel()); ChannelPipeline pipeline = new LocalChannel().pipeline();
pipeline.addFirst("1", newHandler()); pipeline.addFirst("1", newHandler());
pipeline.addLast("10", newHandler()); pipeline.addLast("10", newHandler());
@ -311,342 +347,309 @@ public class DefaultChannelPipelineTest {
verifyContextNumber(pipeline, 8); verifyContextNumber(pipeline, 8);
} }
@Test @Test(timeout = 100000)
public void testRemoveAndForwardInboundByte() throws Exception { public void testRemoveAndForwardInboundByte() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelInboundByteHandlerImpl handler1 = new ChannelInboundByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler1 = new ChannelInboundByteHandlerImpl();
final ChannelInboundByteHandlerImpl handler2 = new ChannelInboundByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler2 = new ChannelInboundByteHandlerImpl();
pipeline.addLast("handler1", handler1);
pipeline.addLast("handler2", handler2); setUp(handler1, handler2);
final CountDownLatch latch = new CountDownLatch(1);
channel.eventLoop().execute(new Runnable() { self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).inboundByteBuffer().writeLong(8); ChannelPipeline p = self.pipeline();
assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes()); p.context(handler1).inboundByteBuffer().writeLong(8);
assertEquals(0, pipeline.context(handler2).inboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler1).inboundByteBuffer().readableBytes());
pipeline.remove(handler1); assertEquals(0, p.context(handler2).inboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); p.remove(handler1);
latch.countDown(); assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler2.updated); assertTrue(handler2.updated);
} }
@Test @Test(timeout = 100000)
public void testReplaceAndForwardInboundByte() throws Exception { public void testReplaceAndForwardInboundByte() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelInboundByteHandlerImpl handler1 = new ChannelInboundByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler1 = new ChannelInboundByteHandlerImpl();
final ChannelInboundByteHandlerImpl handler2 = new ChannelInboundByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler2 = new ChannelInboundByteHandlerImpl();
pipeline.addLast("handler1", handler1);
final CountDownLatch latch = new CountDownLatch(1); setUp(handler1);
channel.eventLoop().execute(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).inboundByteBuffer().writeLong(8); ChannelPipeline p = self.pipeline();
assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes()); p.context(handler1).inboundByteBuffer().writeLong(8);
pipeline.replace(handler1, "handler2", handler2); assertEquals(8, p.context(handler1).inboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); p.replace(handler1, "handler2", handler2);
latch.countDown(); assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler2.updated); assertTrue(handler2.updated);
} }
@Test @Test(timeout = 10000)
public void testRemoveAndForwardOutboundByte() throws Exception { public void testRemoveAndForwardOutboundByte() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl(); final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl();
final ChannelOutboundByteHandlerImpl handler2 = new ChannelOutboundByteHandlerImpl(); final ChannelOutboundByteHandlerImpl handler2 = new ChannelOutboundByteHandlerImpl();
pipeline.addLast("handler1", handler1);
pipeline.addLast("handler2", handler2); setUp(handler1, handler2);
final CountDownLatch latch = new CountDownLatch(1);
channel.eventLoop().execute(new Runnable() { self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler2).outboundByteBuffer().writeLong(8); ChannelPipeline p = self.pipeline();
assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); p.context(handler2).outboundByteBuffer().writeLong(8);
assertEquals(0, pipeline.context(handler1).outboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes());
pipeline.remove(handler2); assertEquals(0, p.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); self.pipeline().remove(handler2);
latch.countDown(); assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler1.flushed); assertTrue(handler1.flushed);
} }
@Test @Test(timeout = 10000)
public void testReplaceAndForwardOutboundByte() throws Exception { public void testReplaceAndForwardOutboundByte() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl(); final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl();
final ChannelOutboundByteHandlerImpl handler2 = new ChannelOutboundByteHandlerImpl(); final ChannelOutboundByteHandlerImpl handler2 = new ChannelOutboundByteHandlerImpl();
pipeline.addLast("handler1", handler1);
final CountDownLatch latch = new CountDownLatch(1); setUp(handler1);
channel.eventLoop().execute(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).outboundByteBuffer().writeLong(8); ChannelPipeline p = self.pipeline();
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); p.context(handler1).outboundByteBuffer().writeLong(8);
pipeline.replace(handler1, "handler2", handler2); assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); p.replace(handler1, "handler2", handler2);
latch.countDown(); assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler2.flushed); assertTrue(handler2.flushed);
} }
@Test @Test(timeout = 10000)
public void testReplaceAndForwardDuplexByte() throws Exception { public void testReplaceAndForwardDuplexByte() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ByteHandlerImpl handler1 = new ByteHandlerImpl(); final ByteHandlerImpl handler1 = new ByteHandlerImpl();
final ByteHandlerImpl handler2 = new ByteHandlerImpl(); final ByteHandlerImpl handler2 = new ByteHandlerImpl();
pipeline.addLast("handler1", handler1);
final CountDownLatch latch = new CountDownLatch(1); setUp(handler1);
channel.eventLoop().execute(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).outboundByteBuffer().writeLong(8); ChannelPipeline p = self.pipeline();
pipeline.context(handler1).inboundByteBuffer().writeLong(8); p.context(handler1).outboundByteBuffer().writeLong(8);
p.context(handler1).inboundByteBuffer().writeLong(8);
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler1).inboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler1).inboundByteBuffer().readableBytes());
pipeline.replace(handler1, "handler2", handler2); p.replace(handler1, "handler2", handler2);
assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes());
latch.countDown();
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(((ChannelInboundByteHandlerImpl) handler2.stateHandler()).updated); assertTrue(((ChannelInboundByteHandlerImpl) handler2.stateHandler()).updated);
assertTrue(((ChannelOutboundByteHandlerImpl) handler2.operationHandler()).flushed); assertTrue(((ChannelOutboundByteHandlerImpl) handler2.operationHandler()).flushed);
} }
@Test @Test(timeout = 10000)
public void testRemoveAndForwardDuplexByte() throws Exception { public void testRemoveAndForwardDuplexByte() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl(); final ChannelOutboundByteHandlerImpl handler1 = new ChannelOutboundByteHandlerImpl();
final ByteHandlerImpl handler2 = new ByteHandlerImpl(); final ByteHandlerImpl handler2 = new ByteHandlerImpl();
final ChannelInboundByteHandlerImpl handler3 = new ChannelInboundByteHandlerImpl(); final ChannelInboundByteHandlerImpl handler3 = new ChannelInboundByteHandlerImpl();
pipeline.addLast("handler1", handler1);
pipeline.addLast("handler2", handler2);
pipeline.addLast("handler3", handler3);
final CountDownLatch latch = new CountDownLatch(1); setUp(handler1, handler2, handler3);
channel.eventLoop().execute(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler2).outboundByteBuffer().writeLong(8); ChannelPipeline p = self.pipeline();
pipeline.context(handler2).inboundByteBuffer().writeLong(8); p.context(handler2).outboundByteBuffer().writeLong(8);
p.context(handler2).inboundByteBuffer().writeLong(8);
assertEquals(8, pipeline.context(handler2).outboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler2).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler2).inboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler2).inboundByteBuffer().readableBytes());
assertEquals(0, pipeline.context(handler1).outboundByteBuffer().readableBytes()); assertEquals(0, p.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(0, pipeline.context(handler3).inboundByteBuffer().readableBytes()); assertEquals(0, p.context(handler3).inboundByteBuffer().readableBytes());
pipeline.remove(handler2); p.remove(handler2);
assertEquals(8, pipeline.context(handler1).outboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler1).outboundByteBuffer().readableBytes());
assertEquals(8, pipeline.context(handler3).inboundByteBuffer().readableBytes()); assertEquals(8, p.context(handler3).inboundByteBuffer().readableBytes());
latch.countDown();
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler1.flushed); assertTrue(handler1.flushed);
assertTrue(handler3.updated); assertTrue(handler3.updated);
} }
@Test @Test(timeout = 10000)
public void testRemoveAndForwardInboundMessage() throws Exception { public void testRemoveAndForwardInboundMessage() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelInboundMessageHandlerImpl handler1 = new ChannelInboundMessageHandlerImpl(); final ChannelInboundMessageHandlerImpl handler1 = new ChannelInboundMessageHandlerImpl();
final ChannelInboundMessageHandlerImpl handler2 = new ChannelInboundMessageHandlerImpl(); final ChannelInboundMessageHandlerImpl handler2 = new ChannelInboundMessageHandlerImpl();
pipeline.addLast("handler1", handler1);
pipeline.addLast("handler2", handler2); setUp(handler1, handler2);
final CountDownLatch latch = new CountDownLatch(1);
channel.eventLoop().execute(new Runnable() { self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).inboundMessageBuffer().add(new Object()); ChannelPipeline p = self.pipeline();
assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size()); p.context(handler1).inboundMessageBuffer().add(new Object());
assertEquals(0, pipeline.context(handler2).inboundMessageBuffer().size()); assertEquals(1, p.context(handler1).inboundMessageBuffer().size());
pipeline.remove(handler1); assertEquals(0, p.context(handler2).inboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size()); p.remove(handler1);
latch.countDown(); assertEquals(1, p.context(handler2).inboundMessageBuffer().size());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler2.updated); assertTrue(handler2.updated);
} }
@Test @Test(timeout = 10000)
public void testReplaceAndForwardInboundMessage() throws Exception { public void testReplaceAndForwardInboundMessage() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelInboundMessageHandlerImpl handler1 = new ChannelInboundMessageHandlerImpl(); final ChannelInboundMessageHandlerImpl handler1 = new ChannelInboundMessageHandlerImpl();
final ChannelInboundMessageHandlerImpl handler2 = new ChannelInboundMessageHandlerImpl(); final ChannelInboundMessageHandlerImpl handler2 = new ChannelInboundMessageHandlerImpl();
pipeline.addLast("handler1", handler1);
final CountDownLatch latch = new CountDownLatch(1); setUp(handler1);
channel.eventLoop().execute(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).inboundMessageBuffer().add(new Object()); ChannelPipeline p = self.pipeline();
assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size()); p.context(handler1).inboundMessageBuffer().add(new Object());
pipeline.replace(handler1, "handler2", handler2); assertEquals(1, p.context(handler1).inboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size()); p.replace(handler1, "handler2", handler2);
latch.countDown(); assertEquals(1, p.context(handler2).inboundMessageBuffer().size());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler2.updated); assertTrue(handler2.updated);
} }
@Test @Test(timeout = 10000)
public void testRemoveAndForwardOutboundMessage() throws Exception { public void testRemoveAndForwardOutboundMessage() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl(); final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl();
final ChannelOutboundMessageHandlerImpl handler2 = new ChannelOutboundMessageHandlerImpl(); final ChannelOutboundMessageHandlerImpl handler2 = new ChannelOutboundMessageHandlerImpl();
pipeline.addLast("handler1", handler1);
pipeline.addLast("handler2", handler2); setUp(handler1, handler2);
final CountDownLatch latch = new CountDownLatch(1);
channel.eventLoop().execute(new Runnable() { self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler2).outboundMessageBuffer().add(new Object()); ChannelPipeline p = self.pipeline();
assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size()); p.context(handler2).outboundMessageBuffer().add(new Object());
assertEquals(0, pipeline.context(handler1).outboundMessageBuffer().size()); assertEquals(1, p.context(handler2).outboundMessageBuffer().size());
pipeline.remove(handler2); assertEquals(0, p.context(handler1).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size()); p.remove(handler2);
latch.countDown(); assertEquals(1, p.context(handler1).outboundMessageBuffer().size());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler1.flushed); assertTrue(handler1.flushed);
} }
@Test @Test(timeout = 10000)
public void testReplaceAndForwardOutboundMessage() throws Exception { public void testReplaceAndForwardOutboundMessage() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl(); final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl();
final ChannelOutboundMessageHandlerImpl handler2 = new ChannelOutboundMessageHandlerImpl(); final ChannelOutboundMessageHandlerImpl handler2 = new ChannelOutboundMessageHandlerImpl();
pipeline.addLast("handler1", handler1);
final CountDownLatch latch = new CountDownLatch(1); setUp(handler1);
channel.eventLoop().execute(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).outboundMessageBuffer().add(new Object()); ChannelPipeline p = self.pipeline();
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size()); p.context(handler1).outboundMessageBuffer().add(new Object());
pipeline.replace(handler1, "handler2", handler2); assertEquals(1, p.context(handler1).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size()); p.replace(handler1, "handler2", handler2);
latch.countDown(); assertEquals(1, p.context(handler2).outboundMessageBuffer().size());
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler2.flushed); assertTrue(handler2.flushed);
} }
@Test @Test(timeout = 10000)
public void testReplaceAndForwardDuplexMessage() throws Exception { public void testReplaceAndForwardDuplexMessage() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final MessageHandlerImpl handler1 = new MessageHandlerImpl(); final MessageHandlerImpl handler1 = new MessageHandlerImpl();
final MessageHandlerImpl handler2 = new MessageHandlerImpl(); final MessageHandlerImpl handler2 = new MessageHandlerImpl();
pipeline.addLast("handler1", handler1);
final CountDownLatch latch = new CountDownLatch(1); setUp(handler1);
channel.eventLoop().execute(new Runnable() {
self.eventLoop().submit(new Runnable() {
@Override @Override
public void run() { public void run() {
pipeline.context(handler1).outboundMessageBuffer().add(new Object()); ChannelPipeline p = self.pipeline();
pipeline.context(handler1).inboundMessageBuffer().add(new Object()); p.context(handler1).outboundMessageBuffer().add(new Object());
p.context(handler1).inboundMessageBuffer().add(new Object());
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size()); assertEquals(1, p.context(handler1).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler1).inboundMessageBuffer().size()); assertEquals(1, p.context(handler1).inboundMessageBuffer().size());
pipeline.replace(handler1, "handler2", handler2); p.replace(handler1, "handler2", handler2);
assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size()); assertEquals(1, p.context(handler2).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size()); assertEquals(1, p.context(handler2).inboundMessageBuffer().size());
latch.countDown();
} }
}); }).sync();
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(((ChannelInboundMessageHandlerImpl) handler2.stateHandler()).updated); assertTrue(((ChannelInboundMessageHandlerImpl) handler2.stateHandler()).updated);
assertTrue(((ChannelOutboundMessageHandlerImpl) handler2.operationHandler()).flushed); assertTrue(((ChannelOutboundMessageHandlerImpl) handler2.operationHandler()).flushed);
} }
@Test(timeout = 10000)
public void testRemoveAndForwardDuplexMessage() throws Exception {
final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl();
final MessageHandlerImpl handler2 = new MessageHandlerImpl();
final ChannelInboundMessageHandlerImpl handler3 = new ChannelInboundMessageHandlerImpl();
setUp(handler1, handler2, handler3);
self.eventLoop().submit(new Runnable() {
@Override
public void run() {
ChannelPipeline p = self.pipeline();
p.context(handler2).outboundMessageBuffer().add(new Object());
p.context(handler2).inboundMessageBuffer().add(new Object());
assertEquals(1, p.context(handler2).outboundMessageBuffer().size());
assertEquals(1, p.context(handler2).inboundMessageBuffer().size());
assertEquals(0, p.context(handler1).outboundMessageBuffer().size());
assertEquals(0, p.context(handler3).inboundMessageBuffer().size());
p.remove(handler2);
assertEquals(1, p.context(handler1).outboundMessageBuffer().size());
assertEquals(1, p.context(handler3).inboundMessageBuffer().size());
}
}).sync();
assertTrue(handler1.flushed);
assertTrue(handler3.updated);
}
@Test(timeout = 20000) @Test(timeout = 20000)
public void testLifeCycleAware() throws Exception { public void testLifeCycleAwareness() throws Exception {
LocalChannel channel = new LocalChannel(); setUp();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly(); ChannelPipeline p = self.pipeline();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final List<LifeCycleAwareTestHandler> handlers = new ArrayList<LifeCycleAwareTestHandler>(); final List<LifeCycleAwareTestHandler> handlers = new ArrayList<LifeCycleAwareTestHandler>();
final CountDownLatch addLatch = new CountDownLatch(20); final int COUNT = 20;
for (int i = 0; i < 20; i++) { final CountDownLatch addLatch = new CountDownLatch(COUNT);
for (int i = 0; i < COUNT; i++) {
final LifeCycleAwareTestHandler handler = new LifeCycleAwareTestHandler("handler-" + i); final LifeCycleAwareTestHandler handler = new LifeCycleAwareTestHandler("handler-" + i);
// Add handler. // Add handler.
pipeline.addFirst(handler.name, handler); p.addFirst(handler.name, handler);
channel.eventLoop().execute(new Runnable() { self.eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
// Validate handler life-cycle methods called. // Validate handler life-cycle methods called.
@ -664,12 +667,12 @@ public class DefaultChannelPipelineTest {
// Change the order of remove operations over all handlers in the pipeline. // Change the order of remove operations over all handlers in the pipeline.
Collections.shuffle(handlers); Collections.shuffle(handlers);
final CountDownLatch removeLatch = new CountDownLatch(20); final CountDownLatch removeLatch = new CountDownLatch(COUNT);
for (final LifeCycleAwareTestHandler handler : handlers) { for (final LifeCycleAwareTestHandler handler : handlers) {
assertSame(handler, pipeline.remove(handler.name)); assertSame(handler, p.remove(handler.name));
channel.eventLoop().execute(new Runnable() { self.eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void run() {
// Validate handler life-cycle methods called. // Validate handler life-cycle methods called.
@ -681,44 +684,6 @@ public class DefaultChannelPipelineTest {
removeLatch.await(); removeLatch.await();
} }
@Test
public void testRemoveAndForwardDuplexMessage() throws Exception {
LocalChannel channel = new LocalChannel();
LocalEventLoopGroup group = new LocalEventLoopGroup();
group.register(channel).awaitUninterruptibly();
final DefaultChannelPipeline pipeline = new DefaultChannelPipeline(channel);
final ChannelOutboundMessageHandlerImpl handler1 = new ChannelOutboundMessageHandlerImpl();
final MessageHandlerImpl handler2 = new MessageHandlerImpl();
final ChannelInboundMessageHandlerImpl handler3 = new ChannelInboundMessageHandlerImpl();
pipeline.addLast("handler1", handler1);
pipeline.addLast("handler2", handler2);
pipeline.addLast("handler3", handler3);
final CountDownLatch latch = new CountDownLatch(1);
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.context(handler2).outboundMessageBuffer().add(new Object());
pipeline.context(handler2).inboundMessageBuffer().add(new Object());
assertEquals(1, pipeline.context(handler2).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler2).inboundMessageBuffer().size());
assertEquals(0, pipeline.context(handler1).outboundMessageBuffer().size());
assertEquals(0, pipeline.context(handler3).inboundMessageBuffer().size());
pipeline.remove(handler2);
assertEquals(1, pipeline.context(handler1).outboundMessageBuffer().size());
assertEquals(1, pipeline.context(handler3).inboundMessageBuffer().size());
latch.countDown();
}
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertTrue(handler1.flushed);
assertTrue(handler3.updated);
}
private static int next(DefaultChannelHandlerContext ctx) { private static int next(DefaultChannelHandlerContext ctx) {
DefaultChannelHandlerContext next = ctx.next; DefaultChannelHandlerContext next = ctx.next;
if (next == null) { if (next == null) {
@ -736,10 +701,10 @@ public class DefaultChannelPipelineTest {
} }
} }
private static void verifyContextNumber(DefaultChannelPipeline pipeline, int expectedNumber) { private static void verifyContextNumber(ChannelPipeline pipeline, int expectedNumber) {
DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext(); DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) pipeline.firstContext();
int handlerNumber = 0; int handlerNumber = 0;
while (ctx != pipeline.tail) { while (ctx != ((DefaultChannelPipeline) pipeline).tail) {
handlerNumber++; handlerNumber++;
ctx = ctx.next; ctx = ctx.next;
} }