Reorder channel state changes in Http2MultiplexCodec child channel

Motivation:

If a write fails for a Http2MultiplexChannel stream channel, the channel
may be forcibly closed, but only after the promise has been failed. That
means continuations attached to the promise may see the channel in an
inconsistent state of still being open and active.

Modifications:

Move the satisfaction of the promise to after the channel cleanup logic
runs.

Result:

Listeners attached to the future that resulted in a Failed write will
see the stream channel in the correct state.
This commit is contained in:
Bryce Anderson 2018-06-27 13:28:41 -06:00 committed by Norman Maurer
parent 05e5ab1ecb
commit d5d1b898d5
2 changed files with 46 additions and 3 deletions

View File

@ -1145,9 +1145,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
writabilityChanged(Http2MultiplexCodec.this.isWritable(stream));
promise.setSuccess();
} else {
promise.setFailure(wrapStreamClosedError(cause));
// If the first write fails there is not much we can do, just close
closeForcibly();
promise.setFailure(wrapStreamClosedError(cause));
}
}
@ -1157,8 +1157,6 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
promise.setSuccess();
} else {
Throwable error = wrapStreamClosedError(cause);
promise.setFailure(error);
if (error instanceof ClosedChannelException) {
if (config.isAutoClose()) {
// Close channel if needed.
@ -1167,6 +1165,7 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
outboundClosed = true;
}
}
promise.setFailure(error);
}
}

View File

@ -33,9 +33,12 @@ import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.util.ReferenceCountUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@ -464,6 +467,7 @@ public class Http2MultiplexCodecTest {
childChannel.close(p).syncUninterruptibly();
assertFalse(channelOpen.get());
assertFalse(channelActive.get());
assertFalse(childChannel.isActive());
}
@ -488,6 +492,46 @@ public class Http2MultiplexCodecTest {
childChannel.close().syncUninterruptibly();
assertFalse(channelOpen.get());
assertFalse(channelActive.get());
assertFalse(childChannel.isActive());
}
@Test
public void channelClosedWhenWriteFutureFails() {
final Queue<ChannelPromise> writePromises = new ArrayDeque<ChannelPromise>();
writer = new Writer() {
@Override
void write(Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
writePromises.offer(promise);
}
};
LastInboundHandler inboundHandler = streamActiveAndWriteHeaders(inboundStream);
Http2StreamChannel childChannel = (Http2StreamChannel) inboundHandler.channel();
assertTrue(childChannel.isOpen());
assertTrue(childChannel.isActive());
final AtomicBoolean channelOpen = new AtomicBoolean(true);
final AtomicBoolean channelActive = new AtomicBoolean(true);
ChannelFuture f = childChannel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()));
assertFalse(f.isDone());
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
channelOpen.set(future.channel().isOpen());
channelActive.set(future.channel().isActive());
}
});
ChannelPromise first = writePromises.poll();
first.setFailure(new ClosedChannelException());
f.awaitUninterruptibly();
assertFalse(channelOpen.get());
assertFalse(channelActive.get());
assertFalse(childChannel.isActive());
}