Correctly handle multiple calls to DefaultHttp2StreamChannel.Unsafe.close(...)
Motivation: Calling DefaultHttp2StreamChannel.Unsafe.close(...) multiple times should not fail. Modification: - Correctly handle multiple calls to DefaultHttp2StreamChannel.Unsafe.close(...) - Complete closePromise and promise that is given to close(...) in the correct order. - Add unit test Result: Fixes [#7628] and [#7641]
This commit is contained in:
parent
b1695fe17d
commit
b423a35783
@ -32,7 +32,6 @@ import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.DefaultChannelPipeline;
|
||||
import io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator;
|
||||
import io.netty.channel.DelegatingChannelPromiseNotifier;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.MessageSizeEstimator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
@ -768,7 +767,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
@SuppressWarnings("deprecation")
|
||||
private RecvByteBufAllocator.ExtendedHandle recvHandle;
|
||||
private boolean writeDoneAndNoFlush;
|
||||
private ChannelPromise pendingClosePromise;
|
||||
private boolean closeInitiated;
|
||||
|
||||
@Override
|
||||
public void connect(final SocketAddress remoteAddress,
|
||||
@ -831,58 +830,59 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
|
||||
|
||||
@Override
|
||||
public void disconnect(ChannelPromise promise) {
|
||||
if (!promise.setUncancellable()) {
|
||||
return;
|
||||
}
|
||||
close(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ChannelPromise promise) {
|
||||
public void close(final ChannelPromise promise) {
|
||||
if (!promise.setUncancellable()) {
|
||||
return;
|
||||
}
|
||||
if (closePromise.isDone()) {
|
||||
promise.setFailure(new ClosedChannelException());
|
||||
return;
|
||||
}
|
||||
if (pendingClosePromise != null) {
|
||||
pendingClosePromise.addListener(new DelegatingChannelPromiseNotifier(promise));
|
||||
return;
|
||||
}
|
||||
pendingClosePromise = promise;
|
||||
try {
|
||||
closePending = false;
|
||||
fireChannelReadPending = false;
|
||||
|
||||
// Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at
|
||||
// all anyway.
|
||||
if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) {
|
||||
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
|
||||
write(resetFrame, unsafe().voidPromise());
|
||||
flush();
|
||||
}
|
||||
|
||||
if (inboundBuffer != null) {
|
||||
for (;;) {
|
||||
Object msg = inboundBuffer.poll();
|
||||
if (msg == null) {
|
||||
break;
|
||||
if (closeInitiated) {
|
||||
if (closePromise.isDone()) {
|
||||
// Closed already.
|
||||
promise.setSuccess();
|
||||
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
|
||||
// This means close() was called before so we just register a listener and return
|
||||
closePromise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
promise.setSuccess();
|
||||
}
|
||||
ReferenceCountUtil.release(msg);
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
closeInitiated = true;
|
||||
|
||||
closePending = false;
|
||||
fireChannelReadPending = false;
|
||||
|
||||
// Only ever send a reset frame if the connection is still alive as otherwise it makes no sense at
|
||||
// all anyway.
|
||||
if (parent().isActive() && !streamClosedWithoutError && isStreamIdValid(stream().id())) {
|
||||
Http2StreamFrame resetFrame = new DefaultHttp2ResetFrame(Http2Error.CANCEL).stream(stream());
|
||||
write(resetFrame, unsafe().voidPromise());
|
||||
flush();
|
||||
}
|
||||
|
||||
if (inboundBuffer != null) {
|
||||
for (;;) {
|
||||
Object msg = inboundBuffer.poll();
|
||||
if (msg == null) {
|
||||
break;
|
||||
}
|
||||
ReferenceCountUtil.release(msg);
|
||||
}
|
||||
}
|
||||
|
||||
// The promise should be notified before we call fireChannelInactive().
|
||||
promise.setSuccess();
|
||||
closePromise.setSuccess();
|
||||
// The promise should be notified before we call fireChannelInactive().
|
||||
closePromise.setSuccess();
|
||||
promise.setSuccess();
|
||||
|
||||
pipeline().fireChannelInactive();
|
||||
if (isRegistered()) {
|
||||
deregister(unsafe().voidPromise());
|
||||
}
|
||||
} finally {
|
||||
pendingClosePromise = null;
|
||||
pipeline().fireChannelInactive();
|
||||
if (isRegistered()) {
|
||||
deregister(unsafe().voidPromise());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -464,6 +464,18 @@ public class Http2MultiplexCodecTest {
|
||||
assertSame(headers, headers2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void callUnsafeCloseMultipleTimes() {
|
||||
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
|
||||
Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
|
||||
childChannel.unsafe().close(childChannel.voidPromise());
|
||||
|
||||
ChannelPromise promise = childChannel.newPromise();
|
||||
childChannel.unsafe().close(promise);
|
||||
promise.syncUninterruptibly();
|
||||
childChannel.closeFuture().syncUninterruptibly();
|
||||
}
|
||||
|
||||
private LastInboundHandler streamActiveAndWriteHeaders(Http2FrameStream stream) {
|
||||
LastInboundHandler inboundHandler = new LastInboundHandler();
|
||||
childChannelInitializer.handler = inboundHandler;
|
||||
|
Loading…
Reference in New Issue
Block a user