Http2Connection stream id generation to support queueing

Motivation:
StreamBufferingEncoder provides queueing so that MAX_CONCURRENT_STREAMS is not violated. However the stream id generation provided by Http2Connection.nextStreamId() only returns the next stream id that is expected on the connection and does not account for queueing. The codec should provide a way to generate the next stream id for a given endpoint that functions with or without queueing.

Modifications:
- Change Http2Connection.nextStreamId to Http2Connection.incrementAndGetNextStreamId

Result:
Http2Connection can generate the next stream id in queued and non-queued scenarios.
Fixes https://github.com/netty/netty/issues/4704
This commit is contained in:
Scott Mitchell 2016-01-19 19:36:39 -08:00
parent 8ba4b63cb6
commit 11bcb8790c
4 changed files with 103 additions and 34 deletions

View File

@ -51,6 +51,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import static java.lang.Math.max;
/**
* Simple implementation of {@link Http2Connection}.
@ -71,7 +72,7 @@ public class DefaultHttp2Connection implements Http2Connection {
* dependencies to load).
*/
private static final int INITIAL_CHILDREN_MAP_SIZE =
Math.max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 4));
max(1, SystemPropertyUtil.getInt("io.netty.http2.childrenMapSize", 4));
/**
* We chose a {@link List} over a {@link Set} to avoid allocating an {@link Iterator} objects when iterating over
@ -833,8 +834,19 @@ public class DefaultHttp2Connection implements Http2Connection {
*/
private final class DefaultEndpoint<F extends Http2FlowController> implements Endpoint<F> {
private final boolean server;
private int nextStreamId;
private int lastStreamCreated;
/**
* The minimum stream ID allowed when creating the next stream. This only applies at the time the stream is
* created. If the ID of the stream being created is less than this value, stream creation will fail. Upon
* successful creation of a stream, this value is incremented to the next valid stream ID.
*/
private int nextStreamIdToCreate;
/**
* Used for reservation of stream IDs. Stream IDs can be reserved in advance by applications before the streams
* are actually created. For example, applications may choose to buffer stream creation attempts as a way of
* working around {@code SETTINGS_MAX_CONCURRENT_STREAMS}, in which case they will reserve stream IDs for each
* buffered stream.
*/
private int nextReservationStreamId;
private int lastStreamKnownByPeer = -1;
private boolean pushToAllowed = true;
private F flowController;
@ -849,7 +861,14 @@ public class DefaultHttp2Connection implements Http2Connection {
// are odd and server-initiated streams are even. Zero is reserved for the
// connection. Stream 1 is reserved client-initiated stream for responding to an
// upgrade from HTTP 1.1.
nextStreamId = server ? 2 : 1;
if (server) {
nextStreamIdToCreate = 2;
nextReservationStreamId = 0;
} else {
nextStreamIdToCreate = 1;
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
nextReservationStreamId = 1;
}
// Push is disallowed by default for servers and allowed for clients.
pushToAllowed = !server;
@ -857,9 +876,15 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public int nextStreamId() {
// For manually created client-side streams, 1 is reserved for HTTP upgrade, so start at 3.
return nextStreamId > 1 ? nextStreamId : nextStreamId + 2;
public int incrementAndGetNextStreamId() {
return nextReservationStreamId >= 0 ? nextReservationStreamId += 2 : nextReservationStreamId;
}
private void incrementExpectedStreamId(int streamId) {
if (streamId > nextReservationStreamId && nextReservationStreamId >= 0) {
nextReservationStreamId = streamId;
}
nextStreamIdToCreate = streamId + 2;
}
@Override
@ -870,12 +895,7 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public boolean mayHaveCreatedStream(int streamId) {
return isValidStreamId(streamId) && streamId <= lastStreamCreated;
}
@Override
public boolean isExhausted() {
return nextStreamId() <= 0;
return isValidStreamId(streamId) && streamId <= lastStreamCreated();
}
@Override
@ -889,9 +909,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId, state);
// Update the next and last stream IDs.
nextStreamId = streamId + 2;
lastStreamCreated = streamId;
incrementExpectedStreamId(streamId);
addStream(stream);
return stream;
@ -936,9 +954,7 @@ public class DefaultHttp2Connection implements Http2Connection {
// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId, state);
// Update the next and last stream IDs.
nextStreamId = streamId + 2;
lastStreamCreated = streamId;
incrementExpectedStreamId(streamId);
// Register the stream.
addStream(stream);
@ -994,7 +1010,7 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public int lastStreamCreated() {
return lastStreamCreated;
return nextStreamIdToCreate > 1 ? nextStreamIdToCreate - 2 : 0;
}
@Override
@ -1036,11 +1052,11 @@ public class DefaultHttp2Connection implements Http2Connection {
}
// This check must be after all id validated checks, but before the max streams check because it may be
// recoverable to some degree for handling frames which can be sent on closed streams.
if (streamId < nextStreamId) {
if (streamId < nextStreamIdToCreate) {
throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
streamId, nextStreamIdToCreate);
}
if (isExhausted()) {
if (nextStreamIdToCreate <= 0) {
throw connectionError(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.");
}
if ((state.localSideOpen() || state.remoteSideOpen()) && !canOpenStream()) {

View File

@ -138,12 +138,11 @@ public interface Http2Connection {
* A view of the connection from one endpoint (local or remote).
*/
interface Endpoint<F extends Http2FlowController> {
/**
* Returns the next valid streamId for this endpoint. If negative, the stream IDs are
* Increment and get the next generated stream id this endpoint. If negative, the stream IDs are
* exhausted for this endpoint an no further streams may be created.
*/
int nextStreamId();
int incrementAndGetNextStreamId();
/**
* Indicates whether the given streamId is from the set of IDs used by this endpoint to
@ -162,13 +161,6 @@ public interface Http2Connection {
*/
boolean created(Http2Stream stream);
/**
* Indicates whether or not the stream IDs for this endpoint have been exhausted
* (i.e. {@link #nextStreamId()} < 0). If {@code true}, any attempt to create new streams
* on this endpoint will fail.
*/
boolean isExhausted();
/**
* Indicates whether or a stream created by this endpoint can be opened without violating
* {@link #maxActiveStreams()}.

View File

@ -51,7 +51,7 @@ public class HttpToHttp2ConnectionHandler extends Http2ConnectionHandler {
*/
private int getStreamId(HttpHeaders httpHeaders) throws Exception {
return httpHeaders.getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text(),
connection().local().nextStreamId());
connection().local().incrementAndGetNextStreamId());
}
/**

View File

@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
@ -196,6 +197,46 @@ public class DefaultHttp2ConnectionTest {
assertEquals(4, server.local().lastStreamCreated());
}
@Test
public void serverRemoteIncrementAndGetStreamShouldSucceed() throws Http2Exception {
incrementAndGetStreamShouldSucceed(server.remote());
}
@Test
public void serverLocalIncrementAndGetStreamShouldSucceed() throws Http2Exception {
incrementAndGetStreamShouldSucceed(server.local());
}
@Test
public void clientRemoteIncrementAndGetStreamShouldSucceed() throws Http2Exception {
incrementAndGetStreamShouldSucceed(client.remote());
}
@Test
public void clientLocalIncrementAndGetStreamShouldSucceed() throws Http2Exception {
incrementAndGetStreamShouldSucceed(client.local());
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void serverRemoteIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(server.remote(), Integer.MAX_VALUE);
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void serverLocalIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(server.local(), Integer.MAX_VALUE - 1);
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void clientRemoteIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(client.remote(), Integer.MAX_VALUE - 1);
}
@Test(expected = Http2NoMoreStreamIdsException.class)
public void clientLocalIncrementAndGetStreamShouldRespectOverflow() throws Http2Exception {
incrementAndGetStreamShouldRespectOverflow(client.local(), Integer.MAX_VALUE);
}
@Test(expected = Http2Exception.class)
public void newStreamBehindExpectedShouldThrow() throws Http2Exception {
server.local().createStream(0, true);
@ -1066,6 +1107,26 @@ public class DefaultHttp2ConnectionTest {
}
}
private void incrementAndGetStreamShouldRespectOverflow(Endpoint<?> endpoint, int streamId) throws Http2Exception {
assertTrue(streamId > 0);
try {
endpoint.createStream(streamId, true);
streamId = endpoint.incrementAndGetNextStreamId();
} catch (Throwable t) {
fail();
}
assertTrue(streamId < 0);
endpoint.createStream(streamId, true);
}
private void incrementAndGetStreamShouldSucceed(Endpoint<?> endpoint) throws Http2Exception {
Http2Stream streamA = endpoint.createStream(endpoint.incrementAndGetNextStreamId(), true);
Http2Stream streamB = endpoint.createStream(streamA.id() + 2, true);
Http2Stream streamC = endpoint.createStream(endpoint.incrementAndGetNextStreamId(), true);
assertEquals(streamB.id() + 2, streamC.id());
endpoint.createStream(streamC.id() + 2, true);
}
private static final class ListenerExceptionThrower implements Answer<Void> {
private static final RuntimeException FAKE_EXCEPTION = new RuntimeException("Fake Exception");
private final boolean[] array;