[#977] Stop processing messages/bytes in a loop when the handler was removed from the ChannelPipeline

This commit is contained in:
Norman Maurer 2013-01-23 07:35:44 +01:00
parent 3f72add89a
commit b9aaf9a167
7 changed files with 56 additions and 7 deletions

View File

@ -45,6 +45,7 @@ import io.netty.channel.ChannelInboundByteHandlerAdapter;
public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter { public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter {
private volatile boolean singleDecode; private volatile boolean singleDecode;
private boolean removed;
/** /**
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
@ -102,7 +103,7 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
*/ */
private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) { private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) {
int oldOutSize = out.readableBytes(); int oldOutSize = out.readableBytes();
while (in.readable()) { while (!removed && in.readable()) {
int oldInSize = in.readableBytes(); int oldInSize = in.readableBytes();
try { try {
decode(ctx, in, out); decode(ctx, in, out);
@ -144,4 +145,10 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
decode(ctx, in, out); decode(ctx, in, out);
} }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
} }

View File

@ -45,6 +45,7 @@ import io.netty.channel.PartialFlushException;
* </pre> * </pre>
*/ */
public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapter { public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapter {
private boolean removed;
@Override @Override
public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
@ -52,7 +53,7 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
ByteBuf out = ctx.nextOutboundByteBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer();
boolean encoded = false; boolean encoded = false;
while (in.readable()) { while (!removed && in.readable()) {
int oldInSize = in.readableBytes(); int oldInSize = in.readableBytes();
try { try {
encode(ctx, in, out); encode(ctx, in, out);
@ -89,4 +90,10 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
* @throws Exception is thrown if an error accour * @throws Exception is thrown if an error accour
*/ */
protected abstract void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception; protected abstract void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception;
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
} }

View File

@ -45,6 +45,7 @@ public abstract class ByteToMessageDecoder
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private volatile boolean singleDecode; private volatile boolean singleDecode;
private boolean removed;
/** /**
* If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call.
@ -113,7 +114,7 @@ public abstract class ByteToMessageDecoder
ByteBuf in = ctx.inboundByteBuffer(); ByteBuf in = ctx.inboundByteBuffer();
boolean decoded = false; boolean decoded = false;
while (in.readable()) { while (!removed && in.readable()) {
try { try {
int oldInputLength = in.readableBytes(); int oldInputLength = in.readableBytes();
Object o = decode(ctx, in); Object o = decode(ctx, in);
@ -204,4 +205,10 @@ public abstract class ByteToMessageDecoder
protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
return decode(ctx, in); return decode(ctx, in);
} }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
} }

View File

@ -45,6 +45,7 @@ import io.netty.channel.ChannelPromise;
public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
private final Class<?>[] acceptedMsgTypes; private final Class<?>[] acceptedMsgTypes;
private boolean removed;
/** /**
* The types which will be accepted by the encoder. If a received message is an other type it will be just forwared * The types which will be accepted by the encoder. If a received message is an other type it will be just forwared
@ -59,7 +60,7 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHand
MessageBuf<I> in = ctx.outboundMessageBuffer(); MessageBuf<I> in = ctx.outboundMessageBuffer();
ByteBuf out = ctx.nextOutboundByteBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer();
for (;;) { while (!removed) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
break; break;
@ -105,4 +106,10 @@ public abstract class MessageToByteEncoder<I> extends ChannelOutboundMessageHand
* @throws Exception is thrown if an error accour * @throws Exception is thrown if an error accour
*/ */
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
} }

View File

@ -49,6 +49,7 @@ public abstract class MessageToMessageDecoder<I>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> { extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {
private final Class<?>[] acceptedMsgTypes; private final Class<?>[] acceptedMsgTypes;
private boolean removed;
/** /**
* The types which will be accepted by the decoder. If a received message is an other type it will be just forwarded * The types which will be accepted by the decoder. If a received message is an other type it will be just forwarded
@ -68,7 +69,7 @@ public abstract class MessageToMessageDecoder<I>
throws Exception { throws Exception {
MessageBuf<I> in = ctx.inboundMessageBuffer(); MessageBuf<I> in = ctx.inboundMessageBuffer();
boolean notify = false; boolean notify = false;
for (;;) { while (!removed) {
try { try {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -141,4 +142,10 @@ public abstract class MessageToMessageDecoder<I>
protected void freeInboundMessage(I msg) throws Exception { protected void freeInboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg); ChannelHandlerUtil.freeMessage(msg);
} }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
} }

View File

@ -48,6 +48,7 @@ import io.netty.channel.PartialFlushException;
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> { public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageHandlerAdapter<I> {
private final Class<?>[] acceptedMsgTypes; private final Class<?>[] acceptedMsgTypes;
private boolean removed;
/** /**
* The types which will be accepted by the decoder. If a received message is an other type it will be just forwared * The types which will be accepted by the decoder. If a received message is an other type it will be just forwared
@ -62,7 +63,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
MessageBuf<I> in = ctx.outboundMessageBuffer(); MessageBuf<I> in = ctx.outboundMessageBuffer();
boolean encoded = false; boolean encoded = false;
for (;;) { while (!removed) {
try { try {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
@ -140,4 +141,10 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
protected void freeOutboundMessage(I msg) throws Exception { protected void freeOutboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg); ChannelHandlerUtil.freeMessage(msg);
} }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
} }

View File

@ -44,6 +44,7 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> { extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler<I> {
private final Class<?>[] acceptedMsgTypes; private final Class<?>[] acceptedMsgTypes;
private boolean removed;
/** /**
* The types which will be accepted by the message handler. If a received message is an other type it will be just * The types which will be accepted by the message handler. If a received message is an other type it will be just
@ -73,7 +74,7 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
try { try {
MessageBuf<I> in = ctx.inboundMessageBuffer(); MessageBuf<I> in = ctx.inboundMessageBuffer();
for (;;) { while (!removed) {
Object msg = in.poll(); Object msg = in.poll();
if (msg == null) { if (msg == null) {
break; break;
@ -164,4 +165,10 @@ public abstract class ChannelInboundMessageHandlerAdapter<I>
protected void freeInboundMessage(I msg) throws Exception { protected void freeInboundMessage(I msg) throws Exception {
ChannelHandlerUtil.freeMessage(msg); ChannelHandlerUtil.freeMessage(msg);
} }
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
super.afterRemove(ctx);
removed = true;
}
} }