[#2705] Call fireChannelReadComplete() if channelActive(...) decodes messages in ReplayingDecoder / ByteToMessageDecoder

Motivation:

In ReplayingDecoder / ByteToMessageDecoder channelInactive(...) method we try to decode a last time and fire all decoded messages throw the pipeline before call ctx.fireChannelInactive(...). To keep the correct order of events we also need to call ctx.fireChannelReadComplete() if we read anything.

Modifications:

- Channel channelInactive(...) to call ctx.fireChannelReadComplete() if something was decoded
- Move out.recycle() to finally block

Result:

Correct order of events.
This commit is contained in:
Norman Maurer 2014-07-24 14:33:56 +02:00
parent 48b7c25e00
commit 3bcd243ffc
4 changed files with 117 additions and 19 deletions

View File

@ -213,6 +213,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null) {
cumulation.release();
cumulation = null;
@ -221,10 +222,17 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.get(i));
}
if (size > 0) {
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
}
ctx.fireChannelInactive();
} finally {
// recycle in all cases
out.recycle();
}
}
}
/**
* Called once data should be decoded from the given {@link ByteBuf}. This method will call

View File

@ -336,19 +336,26 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null) {
cumulation.release();
cumulation = null;
}
int size = out.size();
for (int i = 0; i < size; i++) {
ctx.fireChannelRead(out.get(i));
}
if (size > 0) {
// Something was read, call fireChannelReadComplete()
ctx.fireChannelReadComplete();
}
ctx.fireChannelInactive();
} finally {
// recycle in all cases
out.recycle();
}
}
}
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {

View File

@ -18,12 +18,15 @@ package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.ReferenceCountUtil;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
public class ByteToMessageDecoderTest {
@ -119,4 +122,42 @@ public class ByteToMessageDecoderTest {
Assert.assertEquals(channel.readInbound(), Unpooled.wrappedBuffer(new byte[] {'b'}));
Assert.assertNull(channel.readInbound());
}
@Test
public void testFireChannelReadCompleteOnInactive() throws InterruptedException {
final BlockingQueue<Integer> queue = new LinkedBlockingDeque<Integer>();
final ByteBuf buf = ReferenceCountUtil.releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a', 'b'}));
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.skipBytes(in.readableBytes());
if (!ctx.channel().isActive()) {
out.add("data");
}
}
}, new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
queue.add(3);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
queue.add(1);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!ctx.channel().isActive()) {
queue.add(2);
}
}
});
Assert.assertFalse(channel.writeInbound(buf));
channel.finish();
Assert.assertEquals(1, (int) queue.take());
Assert.assertEquals(2, (int) queue.take());
Assert.assertEquals(3, (int) queue.take());
Assert.assertTrue(queue.isEmpty());
}
}

View File

@ -23,6 +23,8 @@ import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import static io.netty.util.ReferenceCountUtil.*;
import static org.junit.Assert.*;
@ -177,4 +179,44 @@ public class ReplayingDecoderTest {
b.release();
buf.release();
}
@Test
public void testFireChannelReadCompleteOnInactive() throws InterruptedException {
final BlockingQueue<Integer> queue = new LinkedBlockingDeque<Integer>();
final ByteBuf buf = releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a', 'b'}));
EmbeddedChannel channel = new EmbeddedChannel(new ReplayingDecoder<Integer>() {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
in.skipBytes(in.readableBytes());
if (!ctx.channel().isActive()) {
out.add("data");
}
}
}, new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
queue.add(3);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
queue.add(1);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!ctx.channel().isActive()) {
queue.add(2);
}
}
});
assertFalse(channel.writeInbound(buf));
channel.finish();
assertEquals(1, (int) queue.take());
assertEquals(1, (int) queue.take());
assertEquals(2, (int) queue.take());
assertEquals(3, (int) queue.take());
assertTrue(queue.isEmpty());
}
}