Correctly fix problem in ByteToMessageDecoder and ReplayingDecoder which could let to have a released buffer passed to the decode methods.
This fixes #1664 and revert also the original commit which was meant to fix it 3b1881b523
. The problem with the original commit was that it could delay handlerRemove(..) calls and so mess up the order or forward bytes to late.
This commit is contained in:
parent
e3410680de
commit
a06295fe0a
@ -229,6 +229,15 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
|||||||
int outSize = out.size();
|
int outSize = out.size();
|
||||||
int oldInputLength = in.readableBytes();
|
int oldInputLength = in.readableBytes();
|
||||||
decode(ctx, in, out);
|
decode(ctx, in, out);
|
||||||
|
|
||||||
|
// Check if this handler was removed before try to continue the loop.
|
||||||
|
// If it was removed it is not safe to continue to operate on the buffer
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/1664
|
||||||
|
if (ctx.isRemoved()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (outSize == out.size()) {
|
if (outSize == out.size()) {
|
||||||
if (oldInputLength == in.readableBytes()) {
|
if (oldInputLength == in.readableBytes()) {
|
||||||
break;
|
break;
|
||||||
|
@ -358,6 +358,15 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
|||||||
int oldInputLength = in.readableBytes();
|
int oldInputLength = in.readableBytes();
|
||||||
try {
|
try {
|
||||||
decode(ctx, replayable, out);
|
decode(ctx, replayable, out);
|
||||||
|
|
||||||
|
// Check if this handler was removed before try to continue the loop.
|
||||||
|
// If it was removed it is not safe to continue to operate on the buffer
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/1664
|
||||||
|
if (ctx.isRemoved()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (outSize == out.size()) {
|
if (outSize == out.size()) {
|
||||||
if (oldInputLength == in.readableBytes() && oldState == state) {
|
if (oldInputLength == in.readableBytes() && oldState == state) {
|
||||||
throw new DecoderException(
|
throw new DecoderException(
|
||||||
@ -371,6 +380,15 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
|
|||||||
}
|
}
|
||||||
} catch (Signal replay) {
|
} catch (Signal replay) {
|
||||||
replay.expect(REPLAY);
|
replay.expect(REPLAY);
|
||||||
|
|
||||||
|
// Check if this handler was removed before try to continue the loop.
|
||||||
|
// If it was removed it is not safe to continue to operate on the buffer
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/1664
|
||||||
|
if (ctx.isRemoved()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Return to the checkpoint (or oldPosition) and retry.
|
// Return to the checkpoint (or oldPosition) and retry.
|
||||||
int checkpoint = this.checkpoint;
|
int checkpoint = this.checkpoint;
|
||||||
if (checkpoint >= 0) {
|
if (checkpoint >= 0) {
|
||||||
|
@ -34,7 +34,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
private final DefaultChannelPipeline pipeline;
|
private final DefaultChannelPipeline pipeline;
|
||||||
private final String name;
|
private final String name;
|
||||||
private final ChannelHandler handler;
|
private final ChannelHandler handler;
|
||||||
private int callDepth;
|
|
||||||
private boolean removed;
|
private boolean removed;
|
||||||
|
|
||||||
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
// Will be set to null if no child executor should be used, otherwise it will be set to the
|
||||||
@ -78,10 +77,6 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean isRunning() {
|
|
||||||
return callDepth != 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
|
/** Invocation initiated by {@link DefaultChannelPipeline#teardownAll()}}. */
|
||||||
void teardown() {
|
void teardown() {
|
||||||
EventExecutor executor = executor();
|
EventExecutor executor = executor();
|
||||||
@ -159,13 +154,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeChannelRegistered() {
|
private void invokeChannelRegistered() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).channelRegistered(this);
|
((ChannelInboundHandler) handler).channelRegistered(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -189,13 +179,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeChannelUnregistered() {
|
private void invokeChannelUnregistered() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).channelUnregistered(this);
|
((ChannelInboundHandler) handler).channelUnregistered(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -219,13 +204,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeChannelActive() {
|
private void invokeChannelActive() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).channelActive(this);
|
((ChannelInboundHandler) handler).channelActive(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -249,13 +229,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeChannelInactive() {
|
private void invokeChannelInactive() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).channelInactive(this);
|
((ChannelInboundHandler) handler).channelInactive(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -292,13 +267,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeExceptionCaught(final Throwable cause) {
|
private void invokeExceptionCaught(final Throwable cause) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
handler.exceptionCaught(this, cause);
|
handler.exceptionCaught(this, cause);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (logger.isWarnEnabled()) {
|
if (logger.isWarnEnabled()) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
@ -330,13 +300,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeUserEventTriggered(Object event) {
|
private void invokeUserEventTriggered(Object event) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).userEventTriggered(this, event);
|
((ChannelInboundHandler) handler).userEventTriggered(this, event);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -364,13 +329,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeChannelRead(Object msg) {
|
private void invokeChannelRead(Object msg) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).channelRead(this, msg);
|
((ChannelInboundHandler) handler).channelRead(this, msg);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -398,13 +358,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeChannelReadComplete() {
|
private void invokeChannelReadComplete() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).channelReadComplete(this);
|
((ChannelInboundHandler) handler).channelReadComplete(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -432,13 +387,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeChannelWritabilityChanged() {
|
private void invokeChannelWritabilityChanged() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelInboundHandler) handler).channelWritabilityChanged(this);
|
((ChannelInboundHandler) handler).channelWritabilityChanged(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -498,13 +448,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
|
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
|
((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyOutboundHandlerException(t, promise);
|
notifyOutboundHandlerException(t, promise);
|
||||||
}
|
}
|
||||||
@ -541,13 +486,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
|
((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyOutboundHandlerException(t, promise);
|
notifyOutboundHandlerException(t, promise);
|
||||||
}
|
}
|
||||||
@ -581,13 +521,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeDisconnect(ChannelPromise promise) {
|
private void invokeDisconnect(ChannelPromise promise) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).disconnect(this, promise);
|
((ChannelOutboundHandler) handler).disconnect(this, promise);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyOutboundHandlerException(t, promise);
|
notifyOutboundHandlerException(t, promise);
|
||||||
}
|
}
|
||||||
@ -614,13 +549,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeClose(ChannelPromise promise) {
|
private void invokeClose(ChannelPromise promise) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).close(this, promise);
|
((ChannelOutboundHandler) handler).close(this, promise);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyOutboundHandlerException(t, promise);
|
notifyOutboundHandlerException(t, promise);
|
||||||
}
|
}
|
||||||
@ -647,13 +577,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeDeregister(ChannelPromise promise) {
|
private void invokeDeregister(ChannelPromise promise) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).deregister(this, promise);
|
((ChannelOutboundHandler) handler).deregister(this, promise);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyOutboundHandlerException(t, promise);
|
notifyOutboundHandlerException(t, promise);
|
||||||
}
|
}
|
||||||
@ -682,13 +607,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeRead() {
|
private void invokeRead() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).read(this);
|
((ChannelOutboundHandler) handler).read(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
@ -723,13 +643,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeWrite(Object msg, ChannelPromise promise) {
|
private void invokeWrite(Object msg, ChannelPromise promise) {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).write(this, msg, promise);
|
((ChannelOutboundHandler) handler).write(this, msg, promise);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyOutboundHandlerException(t, promise);
|
notifyOutboundHandlerException(t, promise);
|
||||||
}
|
}
|
||||||
@ -758,13 +673,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void invokeFlush() {
|
private void invokeFlush() {
|
||||||
try {
|
|
||||||
callDepth ++;
|
|
||||||
try {
|
try {
|
||||||
((ChannelOutboundHandler) handler).flush(this);
|
((ChannelOutboundHandler) handler).flush(this);
|
||||||
} finally {
|
|
||||||
callDepth --;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
notifyHandlerException(t);
|
notifyHandlerException(t);
|
||||||
}
|
}
|
||||||
|
@ -506,8 +506,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void callHandlerRemoved(final DefaultChannelHandlerContext ctx) {
|
private void callHandlerRemoved(final DefaultChannelHandlerContext ctx) {
|
||||||
if (ctx.channel().isRegistered()) {
|
if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
|
||||||
if (!ctx.executor().inEventLoop() || ctx.isRunning()) {
|
|
||||||
ctx.executor().execute(new Runnable() {
|
ctx.executor().execute(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
@ -516,7 +515,6 @@ final class DefaultChannelPipeline implements ChannelPipeline {
|
|||||||
});
|
});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
callHandlerRemoved0(ctx);
|
callHandlerRemoved0(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user