Correctly buffer multiple outbound streams if needed. (#8694)
Motivation: In Http2FrameCodec we made the incorrect assumption that we can only have 1 buffered outboundstream as maximum. This is not correct and we need to account for multiple buffered streams. Modifications: - Use a map to allow buffer multiple streams - Add unit test. Result: Fixes https://github.com/netty/netty/issues/8692.
This commit is contained in:
parent
d0891d08d7
commit
d06babf02a
|
@ -29,6 +29,8 @@ import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedExc
|
|||
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
@ -152,7 +154,8 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
|||
|
||||
/** Number of buffered streams if the {@link StreamBufferingEncoder} is used. **/
|
||||
private int numBufferedStreams;
|
||||
private DefaultHttp2FrameStream frameStreamToInitialize;
|
||||
private final IntObjectMap<DefaultHttp2FrameStream> frameStreamToInitializeMap =
|
||||
new IntObjectHashMap<DefaultHttp2FrameStream>(8);
|
||||
|
||||
Http2FrameCodec(Http2ConnectionEncoder encoder, Http2ConnectionDecoder decoder, Http2Settings initialSettings) {
|
||||
super(decoder, encoder, initialSettings);
|
||||
|
@ -358,12 +361,17 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
|||
}
|
||||
stream.id = streamId;
|
||||
|
||||
// TODO: This depends on the fact that the connection based API will create Http2Stream objects
|
||||
// synchronously. We should investigate how to refactor this later on when we consolidate some layers.
|
||||
assert frameStreamToInitialize == null;
|
||||
frameStreamToInitialize = stream;
|
||||
// Use a Map to store all pending streams as we may have multiple. This is needed as if we would store the
|
||||
// stream in a field directly we may override the stored field before onStreamAdded(...) was called
|
||||
// and so not correctly set the property for the buffered stream.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/8692
|
||||
Object old = frameStreamToInitializeMap.put(streamId, stream);
|
||||
|
||||
// TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary.
|
||||
// We should not re-use ids.
|
||||
assert old == null;
|
||||
|
||||
// TODO(buchgr): Once Http2FrameStream and Http2Stream are merged this is no longer necessary.
|
||||
final ChannelPromise writePromise = ctx.newPromise();
|
||||
|
||||
encoder().writeHeaders(ctx, streamId, headersFrame.headers(), headersFrame.padding(),
|
||||
|
@ -399,7 +407,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
|||
return;
|
||||
}
|
||||
|
||||
DefaultHttp2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
|
||||
Http2FrameStream stream2 = newStream().setStreamAndProperty(streamKey, stream);
|
||||
onHttp2StreamStateChanged(ctx, stream2);
|
||||
}
|
||||
|
||||
|
@ -407,9 +415,10 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
|||
|
||||
@Override
|
||||
public void onStreamAdded(Http2Stream stream) {
|
||||
if (frameStreamToInitialize != null && stream.id() == frameStreamToInitialize.id()) {
|
||||
frameStreamToInitialize.setStreamAndProperty(streamKey, stream);
|
||||
frameStreamToInitialize = null;
|
||||
DefaultHttp2FrameStream frameStream = frameStreamToInitializeMap.remove(stream.id());
|
||||
|
||||
if (frameStream != null) {
|
||||
frameStream.setStreamAndProperty(streamKey, stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -420,7 +429,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
|||
|
||||
@Override
|
||||
public void onStreamClosed(Http2Stream stream) {
|
||||
DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
|
||||
Http2FrameStream stream2 = stream.getProperty(streamKey);
|
||||
if (stream2 != null) {
|
||||
onHttp2StreamStateChanged(ctx, stream2);
|
||||
}
|
||||
|
@ -428,7 +437,7 @@ public class Http2FrameCodec extends Http2ConnectionHandler {
|
|||
|
||||
@Override
|
||||
public void onStreamHalfClosed(Http2Stream stream) {
|
||||
DefaultHttp2FrameStream stream2 = stream.getProperty(streamKey);
|
||||
Http2FrameStream stream2 = stream.getProperty(streamKey);
|
||||
if (stream2 != null) {
|
||||
onHttp2StreamStateChanged(ctx, stream2);
|
||||
}
|
||||
|
|
|
@ -619,6 +619,49 @@ public class Http2FrameCodecTest {
|
|||
assertTrue(promise2.syncUninterruptibly().isSuccess());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multipleNewOutboundStreamsShouldBeBuffered() throws Exception {
|
||||
// We use a limit of 1 and then increase it step by step.
|
||||
setUp(Http2FrameCodecBuilder.forServer().encoderEnforceMaxConcurrentStreams(true),
|
||||
new Http2Settings().maxConcurrentStreams(1));
|
||||
|
||||
Http2FrameStream stream1 = frameCodec.newStream();
|
||||
Http2FrameStream stream2 = frameCodec.newStream();
|
||||
Http2FrameStream stream3 = frameCodec.newStream();
|
||||
|
||||
ChannelPromise promise1 = channel.newPromise();
|
||||
ChannelPromise promise2 = channel.newPromise();
|
||||
ChannelPromise promise3 = channel.newPromise();
|
||||
|
||||
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream1), promise1);
|
||||
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream2), promise2);
|
||||
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream3), promise3);
|
||||
|
||||
assertTrue(isStreamIdValid(stream1.id()));
|
||||
channel.runPendingTasks();
|
||||
assertTrue(isStreamIdValid(stream2.id()));
|
||||
|
||||
assertTrue(promise1.syncUninterruptibly().isSuccess());
|
||||
assertFalse(promise2.isDone());
|
||||
assertFalse(promise3.isDone());
|
||||
|
||||
// Increase concurrent streams limit to 2
|
||||
frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(2));
|
||||
channel.flush();
|
||||
|
||||
// As we increased the limit to 2 we should have also succeed the second frame.
|
||||
assertTrue(promise2.syncUninterruptibly().isSuccess());
|
||||
assertFalse(promise3.isDone());
|
||||
|
||||
frameInboundWriter.writeInboundSettings(new Http2Settings().maxConcurrentStreams(3));
|
||||
channel.flush();
|
||||
|
||||
// With the max streams of 3 all streams should be succeed now.
|
||||
assertTrue(promise3.syncUninterruptibly().isSuccess());
|
||||
|
||||
assertFalse(channel.finishAndReleaseAll());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void streamIdentifiersExhausted() throws Http2Exception {
|
||||
int maxServerStreamId = Integer.MAX_VALUE - 1;
|
||||
|
|
Loading…
Reference in New Issue
Block a user