Defer channelInactive and channelUnregistered events in Http2MultiplexCodec (#8021)
Motivation: There is an inconsistency between the order of events in the StreamChannel implementation in Http2MultiplexCodec and other Channel implementations that extend AbstractChannel where channelInactive and channelUnregistered events are not performed 'later'. This can cause an unexected order of events for ChannelHandler implementations that call Channel.close() in response to some event. Modification: The Http2MultiplexCodec.DefaultHttp2StreamChannel.Http2ChannelUnsafe was modified to bounce the deregistration and channelInactive events through the parent channels EventLoop. Result: Stream events are now in the proper order. Fixes #8018.
This commit is contained in:
parent
3e3e5155b9
commit
c7c8e6a3ec
@ -43,11 +43,14 @@ import io.netty.util.ReferenceCounted;
|
|||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
import io.netty.util.internal.ThrowableUtil;
|
import io.netty.util.internal.ThrowableUtil;
|
||||||
import io.netty.util.internal.UnstableApi;
|
import io.netty.util.internal.UnstableApi;
|
||||||
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.ArrayDeque;
|
import java.util.ArrayDeque;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
|
||||||
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
|
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
|
||||||
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
|
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
|
||||||
@ -104,6 +107,8 @@ import static java.lang.Math.min;
|
|||||||
@UnstableApi
|
@UnstableApi
|
||||||
public class Http2MultiplexCodec extends Http2FrameCodec {
|
public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||||
|
|
||||||
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultHttp2StreamChannel.class);
|
||||||
|
|
||||||
private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
|
private static final ChannelFutureListener CHILD_CHANNEL_REGISTRATION_LISTENER = new ChannelFutureListener() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(ChannelFuture future) throws Exception {
|
public void operationComplete(ChannelFuture future) throws Exception {
|
||||||
@ -899,6 +904,8 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
closePending = false;
|
closePending = false;
|
||||||
fireChannelReadPending = false;
|
fireChannelReadPending = false;
|
||||||
|
|
||||||
|
final boolean wasActive = isActive();
|
||||||
|
|
||||||
// Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at
|
// Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at
|
||||||
// all anyway.
|
// all anyway.
|
||||||
if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) {
|
if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) {
|
||||||
@ -922,10 +929,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
closePromise.setSuccess();
|
closePromise.setSuccess();
|
||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
|
|
||||||
pipeline().fireChannelInactive();
|
fireChannelInactiveAndDeregister(voidPromise(), wasActive);
|
||||||
if (isRegistered()) {
|
|
||||||
deregister(unsafe().voidPromise());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -935,16 +939,71 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deregister(ChannelPromise promise) {
|
public void deregister(ChannelPromise promise) {
|
||||||
|
fireChannelInactiveAndDeregister(promise, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fireChannelInactiveAndDeregister(final ChannelPromise promise,
|
||||||
|
final boolean fireChannelInactive) {
|
||||||
if (!promise.setUncancellable()) {
|
if (!promise.setUncancellable()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (registered) {
|
|
||||||
registered = true;
|
if (!registered) {
|
||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
pipeline().fireChannelUnregistered();
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
|
||||||
|
// we need to ensure we do the actual deregister operation later. This is necessary to preserve the
|
||||||
|
// behavior of the AbstractChannel, which always invokes channelUnregistered and channelInactive
|
||||||
|
// events 'later' to ensure the current events in the handler are completed before these events.
|
||||||
|
//
|
||||||
|
// See:
|
||||||
|
// https://github.com/netty/netty/issues/4435
|
||||||
|
invokeLater(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
if (fireChannelInactive) {
|
||||||
|
pipeline.fireChannelInactive();
|
||||||
|
}
|
||||||
|
// Some transports like local and AIO does not allow the deregistration of
|
||||||
|
// an open channel. Their doDeregister() calls close(). Consequently,
|
||||||
|
// close() calls deregister() again - no need to fire channelUnregistered, so check
|
||||||
|
// if it was registered.
|
||||||
|
if (registered) {
|
||||||
|
registered = false;
|
||||||
|
pipeline.fireChannelUnregistered();
|
||||||
} else {
|
} else {
|
||||||
promise.setFailure(new IllegalStateException("Not registered"));
|
promise.setFailure(new IllegalStateException("Not registered"));
|
||||||
}
|
}
|
||||||
|
safeSetSuccess(promise);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void safeSetSuccess(ChannelPromise promise) {
|
||||||
|
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
|
||||||
|
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void invokeLater(Runnable task) {
|
||||||
|
try {
|
||||||
|
// This method is used by outbound operation implementations to trigger an inbound event later.
|
||||||
|
// They do not trigger an inbound event immediately because an outbound operation might have been
|
||||||
|
// triggered by another inbound event handler method. If fired immediately, the call stack
|
||||||
|
// will look like this for example:
|
||||||
|
//
|
||||||
|
// handlerA.inboundBufferUpdated() - (1) an inbound handler method closes a connection.
|
||||||
|
// -> handlerA.ctx.close()
|
||||||
|
// -> channel.unsafe.close()
|
||||||
|
// -> handlerA.channelInactive() - (2) another inbound handler method called while in (1) yet
|
||||||
|
//
|
||||||
|
// which means the execution of two inbound handler methods of the same handler overlap undesirably.
|
||||||
|
eventLoop().execute(task);
|
||||||
|
} catch (RejectedExecutionException e) {
|
||||||
|
logger.warn("Can't invoke task later as EventLoop rejected it", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -34,6 +34,7 @@ import io.netty.util.AttributeKey;
|
|||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -583,6 +584,54 @@ public class Http2MultiplexCodecTest {
|
|||||||
assertFalse(channelActive.get());
|
assertFalse(channelActive.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void channelInactiveHappensAfterExceptionCaughtEvents() throws Exception {
|
||||||
|
final AtomicInteger count = new AtomicInteger(0);
|
||||||
|
final AtomicInteger exceptionCaught = new AtomicInteger(-1);
|
||||||
|
final AtomicInteger channelInactive = new AtomicInteger(-1);
|
||||||
|
final AtomicInteger channelUnregistered = new AtomicInteger(-1);
|
||||||
|
Http2StreamChannel childChannel = newOutboundStream();
|
||||||
|
|
||||||
|
childChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||||
|
ctx.close();
|
||||||
|
throw new Exception("exception");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
childChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
channelInactive.set(count.getAndIncrement());
|
||||||
|
super.channelInactive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
|
exceptionCaught.set(count.getAndIncrement());
|
||||||
|
super.exceptionCaught(ctx, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
channelUnregistered.set(count.getAndIncrement());
|
||||||
|
super.channelUnregistered(ctx);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
childChannel.pipeline().fireUserEventTriggered(new Object());
|
||||||
|
parentChannel.runPendingTasks();
|
||||||
|
|
||||||
|
// The events should have happened in this order because the inactive and deregistration events
|
||||||
|
// get deferred as they do in the AbstractChannel.
|
||||||
|
assertEquals(0, exceptionCaught.get());
|
||||||
|
assertEquals(1, channelInactive.get());
|
||||||
|
assertEquals(2, channelUnregistered.get());
|
||||||
|
}
|
||||||
|
|
||||||
@Ignore("not supported anymore atm")
|
@Ignore("not supported anymore atm")
|
||||||
@Test
|
@Test
|
||||||
public void cancellingWritesBeforeFlush() {
|
public void cancellingWritesBeforeFlush() {
|
||||||
|
Loading…
Reference in New Issue
Block a user