Revert "Ensure ctx.fireChannelReadComplete() is only called when something was produced"
This reverts commit 375b9e1307
.
This commit is contained in:
parent
76dcda8259
commit
b35f2b4cb4
@ -133,6 +133,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
ByteBuf cumulation;
|
ByteBuf cumulation;
|
||||||
private Cumulator cumulator = MERGE_CUMULATOR;
|
private Cumulator cumulator = MERGE_CUMULATOR;
|
||||||
private boolean singleDecode;
|
private boolean singleDecode;
|
||||||
|
private boolean decodeWasNull;
|
||||||
private boolean first;
|
private boolean first;
|
||||||
|
|
||||||
protected ByteToMessageDecoder() {
|
protected ByteToMessageDecoder() {
|
||||||
@ -237,6 +238,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
cumulation = null;
|
cumulation = null;
|
||||||
}
|
}
|
||||||
int size = out.size();
|
int size = out.size();
|
||||||
|
decodeWasNull = size == 0;
|
||||||
|
|
||||||
for (int i = 0; i < size; i ++) {
|
for (int i = 0; i < size; i ++) {
|
||||||
ctx.fireChannelRead(out.get(i));
|
ctx.fireChannelRead(out.get(i));
|
||||||
@ -260,6 +262,12 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
// - https://github.com/netty/netty/issues/1764
|
// - https://github.com/netty/netty/issues/1764
|
||||||
cumulation.discardSomeReadBytes();
|
cumulation.discardSomeReadBytes();
|
||||||
}
|
}
|
||||||
|
if (decodeWasNull) {
|
||||||
|
decodeWasNull = false;
|
||||||
|
if (!ctx.channel().config().isAutoRead()) {
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
ctx.fireChannelReadComplete();
|
ctx.fireChannelReadComplete();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ import io.netty.buffer.Unpooled;
|
|||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.util.CharsetUtil;
|
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -28,7 +27,6 @@ import org.junit.Test;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingDeque;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
public class ByteToMessageDecoderTest {
|
public class ByteToMessageDecoderTest {
|
||||||
|
|
||||||
@ -162,61 +160,4 @@ public class ByteToMessageDecoderTest {
|
|||||||
Assert.assertEquals(3, (int) queue.take());
|
Assert.assertEquals(3, (int) queue.take());
|
||||||
Assert.assertTrue(queue.isEmpty());
|
Assert.assertTrue(queue.isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
// See https://github.com/netty/netty/pull/3263
|
|
||||||
@Test
|
|
||||||
public void testFireChannelReadCompleteOnlyWhenDecoded() {
|
|
||||||
final AtomicInteger readComplete = new AtomicInteger();
|
|
||||||
EmbeddedChannel ch = new EmbeddedChannel(new ByteToMessageDecoder() {
|
|
||||||
@Override
|
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
|
||||||
// Do nothing
|
|
||||||
}
|
|
||||||
}, new ChannelInboundHandlerAdapter() {
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
readComplete.incrementAndGet();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Assert.assertFalse(ch.writeInbound(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)));
|
|
||||||
Assert.assertFalse(ch.finish());
|
|
||||||
Assert.assertEquals(0, readComplete.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
// See https://github.com/netty/netty/pull/3263
|
|
||||||
@Test
|
|
||||||
public void testFireChannelReadCompleteWhenDecodeOnce() {
|
|
||||||
final AtomicInteger readComplete = new AtomicInteger();
|
|
||||||
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
ctx.fireChannelRead(Unpooled.EMPTY_BUFFER);
|
|
||||||
}
|
|
||||||
}, new ByteToMessageDecoder() {
|
|
||||||
private boolean first = true;
|
|
||||||
@Override
|
|
||||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
|
||||||
if (first) {
|
|
||||||
first = false;
|
|
||||||
out.add(in.readSlice(in.readableBytes()).retain());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}, new ChannelInboundHandlerAdapter() {
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
readComplete.incrementAndGet();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Assert.assertTrue(ch.writeInbound(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)));
|
|
||||||
Assert.assertTrue(ch.finish());
|
|
||||||
Assert.assertEquals(1, readComplete.get());
|
|
||||||
for (;;) {
|
|
||||||
ByteBuf buf = (ByteBuf) ch.readInbound();
|
|
||||||
if (buf == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
buf.release();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -40,12 +40,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
private final DefaultChannelPipeline pipeline;
|
private final DefaultChannelPipeline pipeline;
|
||||||
private final String name;
|
private final String name;
|
||||||
private boolean removed;
|
private boolean removed;
|
||||||
// This does not need to be volatile as we always check and set this flag from EventExecutor thread. This means
|
|
||||||
// that at worse we will submit a task for channelReadComplete() that may do nothing if nextChannelReadInvoked
|
|
||||||
// is false. This is prefered to introduce another volatile flag because often fireChannelRead(...) and
|
|
||||||
// fireChannelReadComplete() are triggered from the EventExecutor thread anyway.
|
|
||||||
private boolean nextChannelReadInvoked;
|
|
||||||
private boolean readInvoked;
|
|
||||||
|
|
||||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||||
// child executor.
|
// child executor.
|
||||||
@ -297,12 +291,12 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
final AbstractChannelHandlerContext next = findContextInbound();
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
invokeNextChannelRead(next, msg);
|
next.invokeChannelRead(msg);
|
||||||
} else {
|
} else {
|
||||||
executor.execute(new OneTimeTask() {
|
executor.execute(new OneTimeTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
invokeNextChannelRead(next, msg);
|
next.invokeChannelRead(msg);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -317,24 +311,19 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void invokeNextChannelRead(AbstractChannelHandlerContext next, Object msg) {
|
|
||||||
nextChannelReadInvoked = true;
|
|
||||||
next.invokeChannelRead(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext fireChannelReadComplete() {
|
public ChannelHandlerContext fireChannelReadComplete() {
|
||||||
final AbstractChannelHandlerContext next = findContextInbound();
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
if (executor.inEventLoop()) {
|
if (executor.inEventLoop()) {
|
||||||
invokeNextChannelReadComplete(next);
|
next.invokeChannelReadComplete();
|
||||||
} else {
|
} else {
|
||||||
Runnable task = next.invokeChannelReadCompleteTask;
|
Runnable task = next.invokeChannelReadCompleteTask;
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
next.invokeChannelReadCompleteTask = task = new Runnable() {
|
next.invokeChannelReadCompleteTask = task = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
invokeNextChannelReadComplete(next);
|
next.invokeChannelReadComplete();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@ -351,24 +340,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void invokeNextChannelReadComplete(AbstractChannelHandlerContext next) {
|
|
||||||
if (nextChannelReadInvoked) {
|
|
||||||
nextChannelReadInvoked = false;
|
|
||||||
readInvoked = false;
|
|
||||||
|
|
||||||
next.invokeChannelReadComplete();
|
|
||||||
} else if (readInvoked && !channel().config().isAutoRead()) {
|
|
||||||
// As this context not belongs to the last handler in the pipeline and autoRead is false we need to
|
|
||||||
// trigger read again as otherwise we may stop reading before a full message was passed on to the
|
|
||||||
// pipeline. This is especially true for all kind of decoders that usually buffer bytes/messages until
|
|
||||||
// they are able to compose a full message that is passed via fireChannelRead(...) and so be consumed
|
|
||||||
// be the rest of the handlers in the pipeline.
|
|
||||||
read();
|
|
||||||
} else {
|
|
||||||
readInvoked = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
||||||
final AbstractChannelHandlerContext next = findContextInbound();
|
final AbstractChannelHandlerContext next = findContextInbound();
|
||||||
@ -630,7 +601,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
|||||||
|
|
||||||
private void invokeRead() {
|
private void invokeRead() {
|
||||||
try {
|
try {
|
||||||
readInvoked = true;
|
|
||||||
((ChannelOutboundHandler) handler()).read(this);
|
((ChannelOutboundHandler) handler()).read(this);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
|
@ -21,7 +21,6 @@ import io.netty.bootstrap.ServerBootstrap;
|
|||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandler.Sharable;
|
import io.netty.channel.ChannelHandler.Sharable;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
import io.netty.channel.local.LocalChannel;
|
import io.netty.channel.local.LocalChannel;
|
||||||
import io.netty.channel.local.LocalEventLoopGroup;
|
import io.netty.channel.local.LocalEventLoopGroup;
|
||||||
@ -29,8 +28,6 @@ import io.netty.channel.local.LocalServerChannel;
|
|||||||
import io.netty.util.AbstractReferenceCounted;
|
import io.netty.util.AbstractReferenceCounted;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.concurrent.DefaultEventExecutorGroup;
|
|
||||||
import io.netty.util.concurrent.EventExecutorGroup;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -42,7 +39,6 @@ import java.util.List;
|
|||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
@ -539,249 +535,6 @@ public class DefaultChannelPipelineTest {
|
|||||||
assertNull(pipeline.last());
|
assertNull(pipeline.last());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSurpressChannelReadComplete() throws Exception {
|
|
||||||
testSurpressChannelReadComplete0(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSurpressChannelReadCompleteDifferentExecutors() throws Exception {
|
|
||||||
testSurpressChannelReadComplete0(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
// See:
|
|
||||||
// https://github.com/netty/netty/pull/3263
|
|
||||||
// https://github.com/netty/netty/pull/3272
|
|
||||||
private static void testSurpressChannelReadComplete0(boolean executors) throws Exception {
|
|
||||||
final AtomicInteger read1 = new AtomicInteger();
|
|
||||||
final AtomicInteger read2 = new AtomicInteger();
|
|
||||||
final AtomicInteger readComplete1 = new AtomicInteger();
|
|
||||||
final AtomicInteger readComplete2 = new AtomicInteger();
|
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(1);
|
|
||||||
|
|
||||||
final EventExecutorGroup group = executors ? new DefaultEventExecutorGroup(2) : null;
|
|
||||||
|
|
||||||
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInitializer<Channel>() {
|
|
||||||
@Override
|
|
||||||
protected void initChannel(Channel ch) throws Exception {
|
|
||||||
ch.pipeline().addLast(group, new ChannelInboundHandlerAdapter() {
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
if (read1.incrementAndGet() == 1) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
readComplete1.incrementAndGet();
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
ch.pipeline().addLast(group, new ChannelInboundHandlerAdapter() {
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
read2.incrementAndGet();
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
readComplete2.incrementAndGet();
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
latch.countDown();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
ch.writeInbound(1);
|
|
||||||
ch.writeInbound(2);
|
|
||||||
ch.writeInbound(3);
|
|
||||||
ch.finish();
|
|
||||||
latch.await();
|
|
||||||
assertEquals(3, read1.get());
|
|
||||||
assertEquals(3, readComplete1.get());
|
|
||||||
|
|
||||||
assertEquals(2, read2.get());
|
|
||||||
assertEquals(2, readComplete2.get());
|
|
||||||
|
|
||||||
assertEquals(2, ch.readInbound());
|
|
||||||
assertEquals(3, ch.readInbound());
|
|
||||||
assertNull(ch.readInbound());
|
|
||||||
|
|
||||||
if (group != null) {
|
|
||||||
group.shutdownGracefully();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testChannelReadTriggered() {
|
|
||||||
final AtomicInteger read1 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelRead1 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelReadComplete1 = new AtomicInteger();
|
|
||||||
final AtomicInteger read2 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelRead2 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelReadComplete2 = new AtomicInteger();
|
|
||||||
final AtomicInteger read3 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelRead3 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelReadComplete3 = new AtomicInteger();
|
|
||||||
|
|
||||||
EmbeddedChannel ch = new EmbeddedChannel(new ChannelDuplexHandler() {
|
|
||||||
@Override
|
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
read1.incrementAndGet();
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
channelRead1.incrementAndGet();
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
channelReadComplete1.incrementAndGet();
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
}, new ChannelDuplexHandler() {
|
|
||||||
@Override
|
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
read2.incrementAndGet();
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
// Consume
|
|
||||||
channelRead2.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
channelReadComplete2.incrementAndGet();
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
}, new ChannelDuplexHandler() {
|
|
||||||
@Override
|
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
read3.incrementAndGet();
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
channelRead3.incrementAndGet();
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
channelReadComplete3.incrementAndGet();
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
ch.config().setAutoRead(false);
|
|
||||||
|
|
||||||
ch.writeInbound(1);
|
|
||||||
ch.writeInbound(2);
|
|
||||||
ch.writeInbound(3);
|
|
||||||
ch.finish();
|
|
||||||
assertEquals(4, read1.get());
|
|
||||||
assertEquals(3, channelRead1.get());
|
|
||||||
assertEquals(3, channelReadComplete1.get());
|
|
||||||
|
|
||||||
assertEquals(1, read2.get());
|
|
||||||
assertEquals(3, channelRead2.get());
|
|
||||||
assertEquals(3, channelReadComplete1.get());
|
|
||||||
|
|
||||||
assertEquals(1, read3.get());
|
|
||||||
assertEquals(0, channelRead3.get());
|
|
||||||
assertEquals(0, channelReadComplete3.get());
|
|
||||||
|
|
||||||
assertNull(ch.readInbound());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testChannelReadNotTriggeredWhenLast() throws Exception {
|
|
||||||
final AtomicInteger read1 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelRead1 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelReadComplete1 = new AtomicInteger();
|
|
||||||
final AtomicInteger read2 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelRead2 = new AtomicInteger();
|
|
||||||
final AtomicInteger channelReadComplete2 = new AtomicInteger();
|
|
||||||
|
|
||||||
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
|
||||||
|
|
||||||
// Ensure pipeline is really empty
|
|
||||||
ChannelPipeline pipeline = ch.pipeline();
|
|
||||||
while (pipeline.first() != null) {
|
|
||||||
pipeline.removeFirst();
|
|
||||||
}
|
|
||||||
|
|
||||||
pipeline.addLast(new ChannelDuplexHandler() {
|
|
||||||
@Override
|
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
read1.incrementAndGet();
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
channelRead1.incrementAndGet();
|
|
||||||
ctx.fireChannelRead(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
channelReadComplete1.incrementAndGet();
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
}, new ChannelDuplexHandler() {
|
|
||||||
@Override
|
|
||||||
public void read(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
read2.incrementAndGet();
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
||||||
// Consume
|
|
||||||
channelRead2.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
channelReadComplete2.incrementAndGet();
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
ch.config().setAutoRead(false);
|
|
||||||
|
|
||||||
ch.writeInbound(1);
|
|
||||||
ch.writeInbound(2);
|
|
||||||
ch.writeInbound(3);
|
|
||||||
ch.finish();
|
|
||||||
assertEquals(0, read1.get());
|
|
||||||
assertEquals(3, channelRead1.get());
|
|
||||||
assertEquals(3, channelReadComplete1.get());
|
|
||||||
|
|
||||||
assertEquals(0, read2.get());
|
|
||||||
assertEquals(3, channelRead2.get());
|
|
||||||
assertEquals(3, channelReadComplete1.get());
|
|
||||||
|
|
||||||
assertNull(ch.readInbound());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int next(AbstractChannelHandlerContext ctx) {
|
private static int next(AbstractChannelHandlerContext ctx) {
|
||||||
AbstractChannelHandlerContext next = ctx.next;
|
AbstractChannelHandlerContext next = ctx.next;
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
|
Loading…
Reference in New Issue
Block a user