[#980] Automatically trigger a read operation if isAutoRead() == false but we only had a partial decode

This commit is contained in:
Norman Maurer 2013-01-27 09:03:29 +01:00
parent 3843cfd702
commit 291293a6dc
2 changed files with 46 additions and 1 deletions

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelHandlerUtil;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
@ -42,6 +43,7 @@ public abstract class ByteToMessageDecoder
extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler { extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler {
private volatile boolean singleDecode; private volatile boolean singleDecode;
private boolean decodeWasNull;
/** /**
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
@ -62,7 +64,6 @@ public abstract class ByteToMessageDecoder
public boolean isSingleDecode() { public boolean isSingleDecode() {
return singleDecode; return singleDecode;
} }
@Override @Override
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ctx.alloc().buffer(); return ctx.alloc().buffer();
@ -78,6 +79,17 @@ public abstract class ByteToMessageDecoder
callDecode(ctx); callDecode(ctx);
} }
@Override
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
super.channelReadSuspended(ctx);
}
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ByteBuf in = ctx.inboundByteBuffer(); ByteBuf in = ctx.inboundByteBuffer();
@ -101,6 +113,8 @@ public abstract class ByteToMessageDecoder
} }
protected void callDecode(ChannelHandlerContext ctx) { protected void callDecode(ChannelHandlerContext ctx) {
boolean wasNull = false;
ByteBuf in = ctx.inboundByteBuffer(); ByteBuf in = ctx.inboundByteBuffer();
boolean decoded = false; boolean decoded = false;
@ -109,12 +123,14 @@ public abstract class ByteToMessageDecoder
int oldInputLength = in.readableBytes(); int oldInputLength = in.readableBytes();
Object o = decode(ctx, in); Object o = decode(ctx, in);
if (o == null) { if (o == null) {
wasNull = true;
if (oldInputLength == in.readableBytes()) { if (oldInputLength == in.readableBytes()) {
break; break;
} else { } else {
continue; continue;
} }
} }
wasNull = false;
if (oldInputLength == in.readableBytes()) { if (oldInputLength == in.readableBytes()) {
throw new IllegalStateException( throw new IllegalStateException(
"decode() did not read anything but decoded a message."); "decode() did not read anything but decoded a message.");
@ -143,7 +159,12 @@ public abstract class ByteToMessageDecoder
} }
if (decoded) { if (decoded) {
decodeWasNull = false;
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} else {
if (wasNull) {
decodeWasNull = true;
}
} }
} }

View File

@ -272,6 +272,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
private ReplayingDecoderBuffer replayable; private ReplayingDecoderBuffer replayable;
private S state; private S state;
private int checkpoint = -1; private int checkpoint = -1;
private boolean decodeWasNull;
/** /**
* Creates a new instance with no initial state (i.e: {@code null}). * Creates a new instance with no initial state (i.e: {@code null}).
@ -385,6 +386,8 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
@Override @Override
protected void callDecode(ChannelHandlerContext ctx) { protected void callDecode(ChannelHandlerContext ctx) {
boolean wasNull = false;
ByteBuf in = cumulation; ByteBuf in = cumulation;
boolean decoded = false; boolean decoded = false;
while (in.readable()) { while (in.readable()) {
@ -417,10 +420,13 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
} }
if (result == null) { if (result == null) {
wasNull = true;
// Seems like more data is required. // Seems like more data is required.
// Let us wait for the next notification. // Let us wait for the next notification.
break; break;
} }
wasNull = false;
if (oldReaderIndex == in.readerIndex() && oldState == state) { if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new IllegalStateException( throw new IllegalStateException(
@ -451,7 +457,25 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
} }
if (decoded) { if (decoded) {
decodeWasNull = false;
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} else {
if (wasNull) {
decodeWasNull = true;
}
} }
} }
@Override
public void channelReadSuspended(ChannelHandlerContext ctx) throws Exception {
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
super.channelReadSuspended(ctx);
}
} }