[#2705] Call fireChannelReadComplete() if channelActive(...) decodes messages in ReplayingDecoder / ByteToMessageDecoder
Motivation: In ReplayingDecoder / ByteToMessageDecoder channelInactive(...) method we try to decode a last time and fire all decoded messages throw the pipeline before call ctx.fireChannelInactive(...). To keep the correct order of events we also need to call ctx.fireChannelReadComplete() if we read anything. Modifications: - Channel channelInactive(...) to call ctx.fireChannelReadComplete() if something was decoded - Move out.recycle() to finally block Result: Correct order of events.
This commit is contained in:
parent
ce069e2dc4
commit
f5faada77c
@ -212,16 +212,24 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
} catch (Exception e) {
|
||||
throw new DecoderException(e);
|
||||
} finally {
|
||||
if (cumulation != null) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
try {
|
||||
if (cumulation != null) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
}
|
||||
int size = out.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
ctx.fireChannelRead(out.get(i));
|
||||
}
|
||||
if (size > 0) {
|
||||
// Something was read, call fireChannelReadComplete()
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
ctx.fireChannelInactive();
|
||||
} finally {
|
||||
// recycle in all cases
|
||||
out.recycle();
|
||||
}
|
||||
int size = out.size();
|
||||
for (int i = 0; i < size; i ++) {
|
||||
ctx.fireChannelRead(out.get(i));
|
||||
}
|
||||
ctx.fireChannelInactive();
|
||||
out.recycle();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -336,17 +336,24 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
||||
} catch (Exception e) {
|
||||
throw new DecoderException(e);
|
||||
} finally {
|
||||
if (cumulation != null) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
try {
|
||||
if (cumulation != null) {
|
||||
cumulation.release();
|
||||
cumulation = null;
|
||||
}
|
||||
int size = out.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
ctx.fireChannelRead(out.get(i));
|
||||
}
|
||||
if (size > 0) {
|
||||
// Something was read, call fireChannelReadComplete()
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
ctx.fireChannelInactive();
|
||||
} finally {
|
||||
// recycle in all cases
|
||||
out.recycle();
|
||||
}
|
||||
|
||||
int size = out.size();
|
||||
for (int i = 0; i < size; i ++) {
|
||||
ctx.fireChannelRead(out.get(i));
|
||||
}
|
||||
ctx.fireChannelInactive();
|
||||
out.recycle();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -18,12 +18,15 @@ package io.netty.handler.codec;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
public class ByteToMessageDecoderTest {
|
||||
|
||||
@ -119,4 +122,42 @@ public class ByteToMessageDecoderTest {
|
||||
Assert.assertEquals(channel.readInbound(), Unpooled.wrappedBuffer(new byte[] {'b'}));
|
||||
Assert.assertNull(channel.readInbound());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFireChannelReadCompleteOnInactive() throws InterruptedException {
|
||||
final BlockingQueue<Integer> queue = new LinkedBlockingDeque<Integer>();
|
||||
final ByteBuf buf = ReferenceCountUtil.releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a', 'b'}));
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
in.skipBytes(in.readableBytes());
|
||||
if (!ctx.channel().isActive()) {
|
||||
out.add("data");
|
||||
}
|
||||
}
|
||||
}, new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
queue.add(3);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
queue.add(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!ctx.channel().isActive()) {
|
||||
queue.add(2);
|
||||
}
|
||||
}
|
||||
});
|
||||
Assert.assertFalse(channel.writeInbound(buf));
|
||||
channel.finish();
|
||||
Assert.assertEquals(1, (int) queue.take());
|
||||
Assert.assertEquals(2, (int) queue.take());
|
||||
Assert.assertEquals(3, (int) queue.take());
|
||||
Assert.assertTrue(queue.isEmpty());
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,8 @@ import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
import static io.netty.util.ReferenceCountUtil.releaseLater;
|
||||
import static org.junit.Assert.*;
|
||||
@ -177,4 +179,44 @@ public class ReplayingDecoderTest {
|
||||
b.release();
|
||||
buf.release();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFireChannelReadCompleteOnInactive() throws InterruptedException {
|
||||
final BlockingQueue<Integer> queue = new LinkedBlockingDeque<Integer>();
|
||||
final ByteBuf buf = releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a', 'b'}));
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new ReplayingDecoder<Integer>() {
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
in.skipBytes(in.readableBytes());
|
||||
if (!ctx.channel().isActive()) {
|
||||
out.add("data");
|
||||
}
|
||||
}
|
||||
}, new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
queue.add(3);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
queue.add(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!ctx.channel().isActive()) {
|
||||
queue.add(2);
|
||||
}
|
||||
}
|
||||
});
|
||||
assertFalse(channel.writeInbound(buf));
|
||||
channel.finish();
|
||||
assertEquals(1, (int) queue.take());
|
||||
assertEquals(1, (int) queue.take());
|
||||
assertEquals(2, (int) queue.take());
|
||||
assertEquals(3, (int) queue.take());
|
||||
assertTrue(queue.isEmpty());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user