diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java index d62aa1c17d..5ad39a5588 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2Connection.java @@ -296,6 +296,7 @@ public class DefaultHttp2Connection implements Http2Connection { private int totalChildWeights; private int prioritizableForTree = 1; private boolean resetSent; + private boolean headerSent; DefaultStream(int id, State state) { this.id = id; @@ -323,6 +324,17 @@ public class DefaultHttp2Connection implements Http2Connection { return this; } + @Override + public boolean isHeaderSent() { + return headerSent; + } + + @Override + public Http2Stream headerSent() { + headerSent = true; + return this; + } + @Override public final V setProperty(PropertyKey key, V value) { return properties.add(verifyKey(key), value); @@ -432,6 +444,9 @@ public class DefaultHttp2Connection implements Http2Connection { @Override public Http2Stream open(boolean halfClosed) throws Http2Exception { state = activeState(id, state, isLocal(), halfClosed); + if (!createdBy().canOpenStream()) { + throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint."); + } activate(); return this; } @@ -548,17 +563,7 @@ public class DefaultHttp2Connection implements Http2Connection { children = new IntObjectHashMap(INITIAL_CHILDREN_MAP_SIZE); } - @Override - public final boolean remoteSideOpen() { - return state == HALF_CLOSED_LOCAL || state == OPEN || state == RESERVED_REMOTE; - } - - @Override - public final boolean localSideOpen() { - return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL; - } - - final DefaultEndpoint createdBy() { + DefaultEndpoint createdBy() { return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint; } @@ -792,11 +797,26 @@ public class DefaultHttp2Connection implements Http2Connection { return false; } + @Override + DefaultEndpoint createdBy() { + return null; + } + @Override public Http2Stream resetSent() { throw new UnsupportedOperationException(); } + @Override + public boolean isHeaderSent() { + return false; + } + + @Override + public Http2Stream headerSent() { + throw new UnsupportedOperationException(); + } + @Override public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive) { throw new UnsupportedOperationException(); @@ -869,12 +889,17 @@ public class DefaultHttp2Connection implements Http2Connection { } @Override - public boolean canCreateStream() { - return nextStreamId() > 0 && numActiveStreams + 1 <= maxActiveStreams; + public boolean isExhausted() { + return nextStreamId() <= 0; + } + + @Override + public boolean canOpenStream() { + return numActiveStreams + 1 <= maxActiveStreams; } private DefaultStream createStream(int streamId, State state) throws Http2Exception { - checkNewStreamAllowed(streamId); + checkNewStreamAllowed(streamId, state); // Create and initialize the stream. DefaultStream stream = new DefaultStream(streamId, state); @@ -899,6 +924,11 @@ public class DefaultHttp2Connection implements Http2Connection { return stream; } + @Override + public boolean created(Http2Stream stream) { + return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this; + } + @Override public boolean isServer() { return server; @@ -909,16 +939,17 @@ public class DefaultHttp2Connection implements Http2Connection { if (parent == null) { throw connectionError(PROTOCOL_ERROR, "Parent stream missing"); } - if (isLocal() ? !parent.localSideOpen() : !parent.remoteSideOpen()) { + if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) { throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id()); } if (!opposite().allowPushTo()) { throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint."); } - checkNewStreamAllowed(streamId); + State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE; + checkNewStreamAllowed(streamId, state); // Create and initialize the stream. - DefaultStream stream = new DefaultStream(streamId, isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE); + DefaultStream stream = new DefaultStream(streamId, state); // Update the next and last stream IDs. nextStreamId = streamId + 2; @@ -1005,7 +1036,7 @@ public class DefaultHttp2Connection implements Http2Connection { return isLocal() ? remoteEndpoint : localEndpoint; } - private void checkNewStreamAllowed(int streamId) throws Http2Exception { + private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception { if (goAwayReceived() && streamId > localEndpoint.lastStreamKnownByPeer()) { throw connectionError(PROTOCOL_ERROR, "Cannot create stream %d since this endpoint has received a " + "GOAWAY frame with last stream id %d.", streamId, @@ -1024,8 +1055,11 @@ public class DefaultHttp2Connection implements Http2Connection { throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d", streamId, nextStreamId); } - if (!canCreateStream()) { - throw connectionError(REFUSED_STREAM, "Maximum streams exceeded for this endpoint."); + if (isExhausted()) { + throw connectionError(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint."); + } + if ((state.localSideOpen() || state.remoteSideOpen()) && !canOpenStream()) { + throw connectionError(REFUSED_STREAM, "Maximum active streams violated for this endpoint."); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index e1b3b7f40d..f343506136 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -409,7 +409,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { } promise.addListener(this); - frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, + stream.headerSent(); + frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, padding, endOfStream, promise); } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java index 178a501c97..b57583c328 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Connection.java @@ -158,21 +158,34 @@ public interface Http2Connection { boolean mayHaveCreatedStream(int streamId); /** - * Indicates whether or not this endpoint is currently allowed to create new streams. This will be - * be false if {@link #numActiveStreams()} + 1 >= {@link #maxActiveStreams()} or if the stream IDs - * for this endpoint have been exhausted (i.e. {@link #nextStreamId()} < 0). + * Indicates whether or not this endpoint created the given stream. */ - boolean canCreateStream(); + boolean created(Http2Stream stream); + + /** + * Indicates whether or not the stream IDs for this endpoint have been exhausted + * (i.e. {@link #nextStreamId()} < 0). If {@code true}, any attempt to create new streams + * on this endpoint will fail. + */ + boolean isExhausted(); + + /** + * Indicates whether or a stream created by this endpoint can be opened without violating + * {@link #maxActiveStreams()}. + */ + boolean canOpenStream(); /** * Creates a stream initiated by this endpoint. This could fail for the following reasons: * *

+ * Note that IDLE streams can always be created so long as there are stream IDs available. + * The {@link #numActiveStreams()} will be enforced upon attempting to open the stream. + *

* If the stream is intended to initialized to {@link Http2Stream.State#OPEN} then use * {@link #createStream(int, boolean)} otherwise optimizations in {@link Listener}s may not work * and memory may be thrashed. The caller is expected to {@link Http2Stream#open(boolean)} the stream. @@ -186,7 +199,8 @@ public interface Http2Connection { *

*

diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java index 8b2a25e787..78c41ab923 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2ConnectionHandler.java @@ -43,6 +43,7 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.isStreamError; import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS; +import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; import static io.netty.util.CharsetUtil.UTF_8; import static io.netty.util.internal.ObjectUtil.checkNotNull; import static java.lang.Math.min; @@ -790,7 +791,14 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http return promise.setSuccess(); } - ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise); + final ChannelFuture future; + if (stream.state() == IDLE || (connection().local().created(stream) && !stream.isHeaderSent())) { + // The other endpoint doesn't know about the stream yet, so we can't actually send + // the RST_STREAM frame. The HTTP/2 spec also disallows sending RST_STREAM for IDLE streams. + future = promise.setSuccess(); + } else { + future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise); + } // Synchronously set the resetSent flag to prevent any subsequent calls // from resulting in multiple reset frames being sent. diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java index 605a25fe12..c815efa905 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2LifecycleManager.java @@ -27,17 +27,17 @@ public interface Http2LifecycleManager { /** * Closes the local side of the {@code stream}. Depending on the {@code stream} state this may result in - * {@code stream} being closed. See {@link closeStream(Http2Stream, ChannelFuture)}. + * {@code stream} being closed. See {@link #closeStream(Http2Stream, ChannelFuture)}. * @param stream the stream to be half closed. - * @param future See {@link closeStream(Http2Stream, ChannelFuture)}. + * @param future See {@link #closeStream(Http2Stream, ChannelFuture)}. */ void closeStreamLocal(Http2Stream stream, ChannelFuture future); /** * Closes the remote side of the {@code stream}. Depending on the {@code stream} state this may result in - * {@code stream} being closed. See {@link closeStream(Http2Stream, ChannelFuture)}. + * {@code stream} being closed. See {@link #closeStream(Http2Stream, ChannelFuture)}. * @param stream the stream to be half closed. - * @param future See {@link closeStream(Http2Stream, ChannelFuture)}. + * @param future See {@link #closeStream(Http2Stream, ChannelFuture)}. */ void closeStreamRemote(Http2Stream stream, ChannelFuture future); diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java index 5916f28782..3ded9c7088 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2Stream.java @@ -24,13 +24,37 @@ public interface Http2Stream { * The allowed states of an HTTP2 stream. */ enum State { - IDLE, - RESERVED_LOCAL, - RESERVED_REMOTE, - OPEN, - HALF_CLOSED_LOCAL, - HALF_CLOSED_REMOTE, - CLOSED + IDLE(false, false), + RESERVED_LOCAL(false, false), + RESERVED_REMOTE(false, false), + OPEN(true, true), + HALF_CLOSED_LOCAL(false, true), + HALF_CLOSED_REMOTE(true, false), + CLOSED(false, false); + + private final boolean localSideOpen; + private final boolean remoteSideOpen; + + State(boolean localSideOpen, boolean remoteSideOpen) { + this.localSideOpen = localSideOpen; + this.remoteSideOpen = remoteSideOpen; + } + + /** + * Indicates whether the local side of this stream is open (i.e. the state is either + * {@link State#OPEN} or {@link State#HALF_CLOSED_REMOTE}). + */ + public boolean localSideOpen() { + return localSideOpen; + } + + /** + * Indicates whether the remote side of this stream is open (i.e. the state is either + * {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}). + */ + public boolean remoteSideOpen() { + return remoteSideOpen; + } } /** @@ -87,16 +111,16 @@ public interface Http2Stream { Http2Stream resetSent(); /** - * Indicates whether the remote side of this stream is open (i.e. the state is either - * {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}). + * Indicates whether or not at least one {@code HEADERS} frame has been sent from the local endpoint + * for this stream. */ - boolean remoteSideOpen(); + boolean isHeaderSent(); /** - * Indicates whether the local side of this stream is open (i.e. the state is either - * {@link State#OPEN} or {@link State#HALF_CLOSED_REMOTE}). + * Sets the flag indicating that a {@code HEADERS} frame has been sent from the local endpoint + * for this stream. This does not affect the stream state. */ - boolean localSideOpen(); + Http2Stream headerSent(); /** * Associates the application-defined data with this stream. diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index ebd9dd77c7..9158d2b3b2 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -14,20 +14,18 @@ */ package io.netty.handler.codec.http2; +import static io.netty.buffer.Unpooled.EMPTY_BUFFER; import static io.netty.buffer.Unpooled.wrappedBuffer; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT; import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf; import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; -import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED; -import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL; +import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE; import static io.netty.handler.codec.http2.Http2Stream.State.IDLE; -import static io.netty.handler.codec.http2.Http2Stream.State.OPEN; import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL; import static io.netty.util.CharsetUtil.UTF_8; import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -44,54 +42,38 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; + import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.DefaultChannelPromise; -import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; - -import java.util.ArrayList; -import java.util.List; - import junit.framework.AssertionFailedError; - import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.ArrayList; +import java.util.List; + /** * Tests for {@link DefaultHttp2ConnectionEncoder} */ public class DefaultHttp2ConnectionEncoderTest { - private static final int STREAM_ID = 1; - private static final int PUSH_STREAM_ID = 2; - - private Http2ConnectionEncoder encoder; - private ChannelPromise promise; - - @Mock - private Http2Connection connection; - - @Mock - private Http2Connection.Endpoint remote; - - @Mock - private Http2Connection.Endpoint local; + private static final int STREAM_ID = 2; + private static final int PUSH_STREAM_ID = 4; @Mock private Http2RemoteFlowController remoteFlow; @@ -102,18 +84,6 @@ public class DefaultHttp2ConnectionEncoderTest { @Mock private Channel channel; - @Mock - private ChannelPromise voidPromise; - - @Mock - private ChannelFuture future; - - @Mock - private Http2Stream stream; - - @Mock - private Http2Stream pushStream; - @Mock private Http2FrameListener listener; @@ -123,167 +93,196 @@ public class DefaultHttp2ConnectionEncoderTest { @Mock private Http2FrameWriter.Configuration writerConfig; + @Mock + private Http2FrameSizePolicy frameSizePolicy; + @Mock private Http2LifecycleManager lifecycleManager; + private DefaultHttp2ConnectionEncoder encoder; + private Http2Connection connection; private ArgumentCaptor payloadCaptor; private List writtenData; private List writtenPadding; private boolean streamClosed; - @SuppressWarnings("unchecked") @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); - promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); - when(voidPromise.addListener(Matchers.>>any())).thenThrow( - new AssertionFailedError()); - when(voidPromise.addListeners(Matchers.>>any())).thenThrow( - new AssertionFailedError()); - when(voidPromise.channel()).thenReturn(channel); - when(channel.isActive()).thenReturn(true); - when(stream.id()).thenReturn(STREAM_ID); - when(stream.state()).thenReturn(OPEN); - when(stream.open(anyBoolean())).thenReturn(stream); - when(pushStream.id()).thenReturn(PUSH_STREAM_ID); - doAnswer(new Answer() { - @Override - public Http2Stream answer(InvocationOnMock in) throws Throwable { - Http2StreamVisitor visitor = in.getArgumentAt(0, Http2StreamVisitor.class); - if (!visitor.visit(stream)) { - return stream; - } - return null; - } - }).when(connection).forEachActiveStream(any(Http2StreamVisitor.class)); - when(connection.stream(STREAM_ID)).thenReturn(stream); - when(connection.local()).thenReturn(local); - when(connection.remote()).thenReturn(remote); - when(remote.flowController()).thenReturn(remoteFlow); when(writer.configuration()).thenReturn(writerConfig); - when(local.createIdleStream(eq(STREAM_ID))).thenReturn(stream); - when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); - when(remote.createIdleStream(eq(STREAM_ID))).thenReturn(stream); - when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream); - when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future); - when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise))) - .thenReturn(future); + when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy); + when(frameSizePolicy.maxFrameSize()).thenReturn(64); + doAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock in) throws Throwable { + return ((ChannelPromise) in.getArguments()[2]).setSuccess(); + } + }).when(writer).writeSettings(eq(ctx), any(Http2Settings.class), any(ChannelPromise.class)); + doAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock in) throws Throwable { + ((ByteBuf) in.getArguments()[3]).release(); + return ((ChannelPromise) in.getArguments()[4]).setSuccess(); + } + }).when(writer).writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), any(ChannelPromise.class)); writtenData = new ArrayList(); writtenPadding = new ArrayList(); - when(writer.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), any(ChannelPromise.class))) - .then(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - // Make sure we only receive stream closure on the last frame and that void promises are used for - // all writes except the last one. - ChannelPromise receivedPromise = (ChannelPromise) invocationOnMock.getArguments()[5]; - if (streamClosed) { - fail("Stream already closed"); - } else { - streamClosed = (Boolean) invocationOnMock.getArguments()[4]; - } - writtenPadding.add((Integer) invocationOnMock.getArguments()[3]); - ByteBuf data = (ByteBuf) invocationOnMock.getArguments()[2]; - writtenData.add(data.toString(UTF_8)); - // Release the buffer just as DefaultHttp2FrameWriter does - data.release(); - // Let the promise succeed to trigger listeners. - receivedPromise.trySuccess(); - return future; - } - }); + when(writer.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), + any(ChannelPromise.class))).then(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock in) throws Throwable { + // Make sure we only receive stream closure on the last frame and that void promises + // are used for all writes except the last one. + ChannelPromise promise = (ChannelPromise) in.getArguments()[5]; + if (streamClosed) { + fail("Stream already closed"); + } else { + streamClosed = (Boolean) in.getArguments()[4]; + } + writtenPadding.add((Integer) in.getArguments()[3]); + ByteBuf data = (ByteBuf) in.getArguments()[2]; + writtenData.add(data.toString(UTF_8)); + // Release the buffer just as DefaultHttp2FrameWriter does + data.release(); + // Let the promise succeed to trigger listeners. + return promise.setSuccess(); + } + }); when(writer.writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyShort(), anyBoolean(), anyInt(), anyBoolean(), any(ChannelPromise.class))) - .then(new Answer() { + .then(new Answer() { @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ChannelPromise receivedPromise = (ChannelPromise) invocationOnMock.getArguments()[8]; + public ChannelFuture answer(InvocationOnMock invocationOnMock) throws Throwable { + ChannelPromise promise = (ChannelPromise) invocationOnMock.getArguments()[8]; if (streamClosed) { fail("Stream already closed"); } else { streamClosed = (Boolean) invocationOnMock.getArguments()[5]; } - receivedPromise.trySuccess(); - return future; + return promise.setSuccess(); } }); payloadCaptor = ArgumentCaptor.forClass(Http2RemoteFlowController.FlowControlled.class); - doNothing().when(remoteFlow).addFlowControlled(eq(stream), payloadCaptor.capture()); + doNothing().when(remoteFlow).addFlowControlled(any(Http2Stream.class), payloadCaptor.capture()); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(ctx.channel()).thenReturn(channel); - when(ctx.newSucceededFuture()).thenReturn(future); - when(ctx.newPromise()).thenReturn(promise); - when(ctx.write(any())).thenReturn(future); + doAnswer(new Answer() { + @Override + public ChannelPromise answer(InvocationOnMock in) throws Throwable { + return newPromise(); + } + }).when(ctx).newPromise(); + doAnswer(new Answer() { + @Override + public ChannelFuture answer(InvocationOnMock in) throws Throwable { + return newSucceededFuture(); + } + }).when(ctx).newSucceededFuture(); when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT); + // Use a server-side connection so we can test server push. + connection = new DefaultHttp2Connection(true); + connection.remote().flowController(remoteFlow); + encoder = new DefaultHttp2ConnectionEncoder(connection, writer); encoder.lifecycleManager(lifecycleManager); } @Test public void dataWriteShouldSucceed() throws Exception { + createStream(STREAM_ID, false); final ByteBuf data = dummyData(); - encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); + ChannelPromise p = newPromise(); + encoder.writeData(ctx, STREAM_ID, data, 0, true, p); assertEquals(payloadCaptor.getValue().size(), 8); payloadCaptor.getValue().write(ctx, 8); assertEquals(0, payloadCaptor.getValue().size()); assertEquals("abcdefgh", writtenData.get(0)); assertEquals(0, data.refCnt()); + assertTrue(p.isSuccess()); } @Test public void dataFramesShouldMerge() throws Exception { + createStream(STREAM_ID, false); final ByteBuf data = dummyData().retain(); - DefaultChannelPromise secondPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); - encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); - encoder.writeData(ctx, STREAM_ID, data, 0, true, secondPromise); + + ChannelPromise promise1 = newPromise(); + encoder.writeData(ctx, STREAM_ID, data, 0, true, promise1); + ChannelPromise promise2 = newPromise(); + encoder.writeData(ctx, STREAM_ID, data, 0, true, promise2); + + // Now merge the two payloads. List capturedWrites = payloadCaptor.getAllValues(); FlowControlled mergedPayload = capturedWrites.get(0); mergedPayload.merge(ctx, capturedWrites.get(1)); - assertEquals(16, mergedPayload.size()); - assertFalse(secondPromise.isSuccess()); + assertFalse(promise1.isDone()); + assertFalse(promise2.isDone()); + + // Write the merged payloads and verify it was written correctly. mergedPayload.write(ctx, 16); assertEquals(0, mergedPayload.size()); assertEquals("abcdefghabcdefgh", writtenData.get(0)); assertEquals(0, data.refCnt()); - // Second promise is notified after write of the merged payload completes - assertTrue(secondPromise.isSuccess()); + assertTrue(promise1.isSuccess()); + assertTrue(promise2.isSuccess()); } @Test public void dataFramesShouldMergeUseVoidPromise() throws Exception { + createStream(STREAM_ID, false); final ByteBuf data = dummyData().retain(); - when(voidPromise.isVoid()).thenReturn(true); - encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise); - encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise); + + ChannelPromise promise1 = newVoidPromise(); + encoder.writeData(ctx, STREAM_ID, data, 0, true, promise1); + ChannelPromise promise2 = newVoidPromise(); + encoder.writeData(ctx, STREAM_ID, data, 0, true, promise2); + + // Now merge the two payloads. List capturedWrites = payloadCaptor.getAllValues(); FlowControlled mergedPayload = capturedWrites.get(0); - assertTrue(mergedPayload.merge(ctx, capturedWrites.get(1))); - + mergedPayload.merge(ctx, capturedWrites.get(1)); assertEquals(16, mergedPayload.size()); + assertFalse(promise1.isSuccess()); + assertFalse(promise2.isSuccess()); + + // Write the merged payloads and verify it was written correctly. mergedPayload.write(ctx, 16); assertEquals(0, mergedPayload.size()); assertEquals("abcdefghabcdefgh", writtenData.get(0)); assertEquals(0, data.refCnt()); + + // The promises won't be set since there are no listeners. + assertFalse(promise1.isSuccess()); + assertFalse(promise2.isSuccess()); } @Test public void dataFramesDontMergeWithHeaders() throws Exception { + createStream(STREAM_ID, false); final ByteBuf data = dummyData().retain(); - encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); - encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); + encoder.writeData(ctx, STREAM_ID, data, 0, true, newPromise()); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, newPromise()); List capturedWrites = payloadCaptor.getAllValues(); assertFalse(capturedWrites.get(0).merge(ctx, capturedWrites.get(1))); } @Test - public void emptyFrameShouldWritePadding() throws Exception { + public void emptyFrameShouldSplitPadding() throws Exception { ByteBuf data = Unpooled.buffer(0); - encoder.writeData(ctx, STREAM_ID, data, 10, true, promise); + assertSplitPaddingOnEmptyBuffer(data); + assertEquals(0, data.refCnt()); + } + + private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Exception { + createStream(STREAM_ID, false); + when(frameSizePolicy.maxFrameSize()).thenReturn(5); + ChannelPromise p = newPromise(); + encoder.writeData(ctx, STREAM_ID, data, 10, true, p); assertEquals(payloadCaptor.getValue().size(), 10); payloadCaptor.getValue().write(ctx, 10); // writer was called 2 times @@ -291,128 +290,136 @@ public class DefaultHttp2ConnectionEncoderTest { assertEquals("", writtenData.get(0)); assertEquals(10, (int) writtenPadding.get(0)); assertEquals(0, data.refCnt()); + assertTrue(p.isSuccess()); } @Test public void headersWriteForUnknownStreamShouldCreateStream() throws Exception { - int streamId = 5; - when(stream.id()).thenReturn(streamId); - when(stream.state()).thenReturn(IDLE); - mockFutureAddListener(true); - when(local.createStream(eq(streamId), anyBoolean())).thenReturn(stream); + writeAllFlowControlledFrames(); + final int streamId = 6; + ChannelPromise promise = newPromise(); encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false, promise); - verify(local).createStream(eq(streamId), eq(false)); - assertNotNull(payloadCaptor.getValue()); - payloadCaptor.getValue().write(ctx, 0); verify(writer).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); + assertTrue(promise.isSuccess()); } @Test public void headersWriteShouldOpenStreamForPush() throws Exception { - mockFutureAddListener(true); - when(stream.state()).thenReturn(RESERVED_LOCAL); - encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); - verify(stream).open(false); - verify(stream, never()).closeLocalSide(); - assertNotNull(payloadCaptor.getValue()); - payloadCaptor.getValue().write(ctx, 0); - verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); + writeAllFlowControlledFrames(); + Http2Stream parent = createStream(STREAM_ID, false); + reservePushStream(PUSH_STREAM_ID, parent); + + ChannelPromise promise = newPromise(); + encoder.writeHeaders(ctx, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); + assertEquals(HALF_CLOSED_REMOTE, stream(PUSH_STREAM_ID).state()); + verify(writer).writeHeaders(eq(ctx), eq(PUSH_STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); } @Test public void pushPromiseWriteAfterGoAwayReceivedShouldFail() throws Exception { - when(connection.goAwayReceived()).thenReturn(true); + createStream(STREAM_ID, false); + goAwayReceived(0); ChannelFuture future = encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, - promise); - assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception); + newPromise()); + assertTrue(future.isDone()); + assertFalse(future.isSuccess()); } @Test public void pushPromiseWriteShouldReserveStream() throws Exception { + createStream(STREAM_ID, false); + ChannelPromise promise = newPromise(); encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, promise); - verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream)); + assertEquals(RESERVED_LOCAL, stream(PUSH_STREAM_ID).state()); verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(promise)); } @Test public void priorityWriteAfterGoAwayShouldSucceed() throws Exception { - when(connection.goAwayReceived()).thenReturn(true); + createStream(STREAM_ID, false); + goAwayReceived(Integer.MAX_VALUE); + ChannelPromise promise = newPromise(); encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); } @Test public void priorityWriteShouldSetPriorityForStream() throws Exception { - when(connection.stream(STREAM_ID)).thenReturn(null); - encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); - verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); + ChannelPromise promise = newPromise(); + short weight = 255; + encoder.writePriority(ctx, STREAM_ID, 0, weight, true, promise); + + // Verify that this created an idle stream with the desired weight. + Http2Stream stream = stream(STREAM_ID); + assertEquals(IDLE, stream.state()); + assertEquals(weight, stream.weight()); + verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); - verify(local).createIdleStream(STREAM_ID); - verify(stream, never()).open(anyBoolean()); } @Test public void priorityWriteOnPreviouslyExistingStreamShouldSucceed() throws Exception { - doAnswer(new Answer() { - @Override - public Http2Stream answer(InvocationOnMock in) throws Throwable { - throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR); - } - }).when(local).createIdleStream(eq(STREAM_ID)); - when(connection.stream(STREAM_ID)).thenReturn(null); - // Just return the stream object as the connection stream to ensure the dependent stream "exists" - when(connection.stream(0)).thenReturn(stream); - encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); - verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean()); - verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); - verify(local).createIdleStream(STREAM_ID); + createStream(STREAM_ID, false).close(); + ChannelPromise promise = newPromise(); + short weight = 255; + encoder.writePriority(ctx, STREAM_ID, 0, weight, true, promise); + verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq(weight), eq(true), eq(promise)); } @Test public void priorityWriteOnPreviouslyExistingParentStreamShouldSucceed() throws Exception { - doAnswer(new Answer() { - @Override - public Http2Stream answer(InvocationOnMock in) throws Throwable { - throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR); - } - }).when(stream).setPriority(eq(0), eq((short) 255), eq(true)); - when(connection.stream(STREAM_ID)).thenReturn(stream); - encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise); - verify(stream).setPriority(eq(0), eq((short) 255), eq(true)); - verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise)); + final int parentStreamId = STREAM_ID + 2; + createStream(STREAM_ID, false); + createStream(parentStreamId, false).close(); + + ChannelPromise promise = newPromise(); + short weight = 255; + encoder.writePriority(ctx, STREAM_ID, parentStreamId, weight, true, promise); + verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(parentStreamId), eq(weight), eq(true), eq(promise)); } @Test public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception { + ChannelPromise promise = newPromise(); encoder.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise); verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise)); } @Test - public void rstStreamWriteShouldCloseStream() throws Exception { + public void rstStreamShouldCloseStream() throws Exception { + // Create the stream and send headers. + writeAllFlowControlledFrames(); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, newPromise()); + + // Now verify that a stream reset is performed. + stream(STREAM_ID); + ChannelPromise promise = newPromise(); encoder.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise); - verify(lifecycleManager).resetStream(eq(ctx), eq(STREAM_ID), eq(PROTOCOL_ERROR.code()), eq(promise)); + verify(lifecycleManager).resetStream(eq(ctx), eq(STREAM_ID), anyInt(), eq(promise)); } @Test public void pingWriteAfterGoAwayShouldSucceed() throws Exception { - when(connection.goAwayReceived()).thenReturn(true); + ChannelPromise promise = newPromise(); + goAwayReceived(0); encoder.writePing(ctx, false, emptyPingBuf(), promise); verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise)); } @Test public void pingWriteShouldSucceed() throws Exception { + ChannelPromise promise = newPromise(); encoder.writePing(ctx, false, emptyPingBuf(), promise); verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise)); } @Test public void settingsWriteAfterGoAwayShouldSucceed() throws Exception { - when(connection.goAwayReceived()).thenReturn(true); + goAwayReceived(0); + ChannelPromise promise = newPromise(); encoder.writeSettings(ctx, new Http2Settings(), promise); verify(writer).writeSettings(eq(ctx), any(Http2Settings.class), eq(promise)); } @@ -421,18 +428,23 @@ public class DefaultHttp2ConnectionEncoderTest { public void settingsWriteShouldNotUpdateSettings() throws Exception { Http2Settings settings = new Http2Settings(); settings.initialWindowSize(100); - settings.pushEnabled(false); settings.maxConcurrentStreams(1000); settings.headerTableSize(2000); + + ChannelPromise promise = newPromise(); encoder.writeSettings(ctx, settings, promise); verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise)); } @Test - public void dataWriteShouldCreateHalfClosedStream() { - mockSendFlowControlledWriteEverything(); + public void dataWriteShouldCreateHalfClosedStream() throws Exception { + writeAllFlowControlledFrames(); + + Http2Stream stream = createStream(STREAM_ID, false); ByteBuf data = dummyData(); + ChannelPromise promise = newPromise(); encoder.writeData(ctx, STREAM_ID, data.retain(), 0, true, promise); + assertTrue(promise.isSuccess()); verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class)); verify(lifecycleManager).closeStreamLocal(stream, promise); assertEquals(data.toString(UTF_8), writtenData.get(0)); @@ -441,51 +453,53 @@ public class DefaultHttp2ConnectionEncoderTest { @Test public void headersWriteShouldHalfCloseStream() throws Exception { - mockSendFlowControlledWriteEverything(); - int streamId = 5; - when(stream.id()).thenReturn(streamId); - when(stream.state()).thenReturn(HALF_CLOSED_LOCAL); - mockFutureAddListener(true); - when(local.createStream(eq(streamId), anyBoolean())).thenReturn(stream); - when(writer.writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise))) - .thenReturn(future); - encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, true, promise); - verify(local).createStream(eq(streamId), eq(true)); - // Trigger the write and mark the promise successful to trigger listeners - assertNotNull(payloadCaptor.getValue()); - payloadCaptor.getValue().write(ctx, 0); - promise.trySuccess(); - verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise)); + writeAllFlowControlledFrames(); + createStream(STREAM_ID, false); + ChannelPromise promise = newPromise(); + encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise); + + assertTrue(promise.isSuccess()); + verify(lifecycleManager).closeStreamLocal(eq(stream(STREAM_ID)), eq(promise)); } @Test public void headersWriteShouldHalfClosePushStream() throws Exception { - mockSendFlowControlledWriteEverything(); - mockFutureAddListener(true); - when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL); - when(writer.writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), - eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise))) - .thenReturn(future); - encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise); - verify(stream).open(true); - - promise.trySuccess(); + writeAllFlowControlledFrames(); + Http2Stream parent = createStream(STREAM_ID, false); + Http2Stream stream = reservePushStream(PUSH_STREAM_ID, parent); + ChannelPromise promise = newPromise(); + encoder.writeHeaders(ctx, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise); + assertEquals(HALF_CLOSED_REMOTE, stream.state()); + assertTrue(promise.isSuccess()); verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise)); } @Test public void encoderDelegatesGoAwayToLifeCycleManager() { + ChannelPromise promise = newPromise(); encoder.writeGoAway(ctx, STREAM_ID, Http2Error.INTERNAL_ERROR.code(), null, promise); verify(lifecycleManager).goAway(eq(ctx), eq(STREAM_ID), eq(Http2Error.INTERNAL_ERROR.code()), - eq((ByteBuf) null), eq(promise)); + eq((ByteBuf) null), eq(promise)); verifyNoMoreInteractions(writer); } @Test - public void dataWriteToClosedStreamShouldFail() { - when(stream.state()).thenReturn(CLOSED); + public void dataWriteToClosedStreamShouldFail() throws Exception { + createStream(STREAM_ID, false).close(); ByteBuf data = mock(ByteBuf.class); + ChannelPromise promise = newPromise(); + encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + assertTrue(promise.isDone()); + assertFalse(promise.isSuccess()); + assertThat(promise.cause(), instanceOf(IllegalArgumentException.class)); + verify(data).release(); + } + + @Test + public void dataWriteToHalfClosedLocalStreamShouldFail() throws Exception { + createStream(STREAM_ID, true); + ByteBuf data = mock(ByteBuf.class); + ChannelPromise promise = newPromise(); encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); assertTrue(promise.isDone()); assertFalse(promise.isSuccess()); @@ -494,51 +508,45 @@ public class DefaultHttp2ConnectionEncoderTest { } @Test - public void dataWriteToHalfClosedLocalStreamShouldFail() { - when(stream.state()).thenReturn(HALF_CLOSED_LOCAL); + public void canWriteDataFrameAfterGoAwaySent() throws Exception { + Http2Stream stream = createStream(STREAM_ID, false); + connection.goAwaySent(0, 0, EMPTY_BUFFER); ByteBuf data = mock(ByteBuf.class); - encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); - assertTrue(promise.isDone()); - assertFalse(promise.isSuccess()); - assertThat(promise.cause(), instanceOf(IllegalStateException.class)); - verify(data).release(); - } - - @Test - public void canWriteDataFrameAfterGoAwaySent() { - when(connection.goAwaySent()).thenReturn(true); - when(remote.lastStreamKnownByPeer()).thenReturn(0); - ByteBuf data = mock(ByteBuf.class); - encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + encoder.writeData(ctx, STREAM_ID, data, 0, false, newPromise()); verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class)); } @Test - public void canWriteHeaderFrameAfterGoAwaySent() { - when(connection.goAwaySent()).thenReturn(true); - when(remote.lastStreamKnownByPeer()).thenReturn(0); + public void canWriteHeaderFrameAfterGoAwaySent() throws Exception { + writeAllFlowControlledFrames(); + createStream(STREAM_ID, false); + goAwaySent(0); + ChannelPromise promise = newPromise(); encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); - verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class)); + verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); } @Test - public void canWriteDataFrameAfterGoAwayReceived() { - when(connection.goAwayReceived()).thenReturn(true); - when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID); + public void canWriteDataFrameAfterGoAwayReceived() throws Exception { + Http2Stream stream = createStream(STREAM_ID, false); + goAwayReceived(STREAM_ID); ByteBuf data = mock(ByteBuf.class); - encoder.writeData(ctx, STREAM_ID, data, 0, false, promise); + encoder.writeData(ctx, STREAM_ID, data, 0, false, newPromise()); verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class)); } @Test public void canWriteHeaderFrameAfterGoAwayReceived() { - when(connection.goAwayReceived()).thenReturn(true); - when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID); + writeAllFlowControlledFrames(); + goAwayReceived(STREAM_ID); + ChannelPromise promise = newPromise(); encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise); - verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class)); + verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0), + eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise)); } - private void mockSendFlowControlledWriteEverything() { + private void writeAllFlowControlledFrames() { doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { @@ -547,22 +555,56 @@ public class DefaultHttp2ConnectionEncoderTest { flowControlled.writeComplete(); return null; } - }).when(remoteFlow).addFlowControlled(eq(stream), payloadCaptor.capture()); + }).when(remoteFlow).addFlowControlled(any(Http2Stream.class), payloadCaptor.capture()); } - private void mockFutureAddListener(boolean success) { - when(future.isSuccess()).thenReturn(success); - if (!success) { - when(future.cause()).thenReturn(new Exception("Fake Exception")); - } - doAnswer(new Answer() { + private Http2Stream createStream(int streamId, boolean halfClosed) throws Http2Exception { + return connection.local().createStream(streamId, halfClosed); + } + + private Http2Stream reservePushStream(int pushStreamId, Http2Stream parent) throws Http2Exception { + return connection.local().reservePushStream(pushStreamId, parent); + } + + private Http2Stream stream(int streamId) { + return connection.stream(streamId); + } + + private void goAwayReceived(int lastStreamId) { + connection.goAwayReceived(lastStreamId, 0, EMPTY_BUFFER); + } + + private void goAwaySent(int lastStreamId) { + connection.goAwaySent(lastStreamId, 0, EMPTY_BUFFER); + } + + private ChannelPromise newPromise() { + return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE); + } + + private ChannelPromise newVoidPromise() { + return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE) { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ChannelFutureListener listener = (ChannelFutureListener) invocation.getArguments()[0]; - listener.operationComplete(future); - return null; + public ChannelPromise addListener( + GenericFutureListener> listener) { + throw new AssertionFailedError(); } - }).when(future).addListener(any(ChannelFutureListener.class)); + + @Override + public ChannelPromise addListeners( + GenericFutureListener>... listeners) { + throw new AssertionFailedError(); + } + + @Override + public boolean isVoid() { + return true; + } + }; + } + + private ChannelFuture newSucceededFuture() { + return newPromise().setSuccess(); } private static ByteBuf dummyData() { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java index 91be041130..39fe6af6aa 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionTest.java @@ -33,28 +33,32 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http2.Http2Connection.Endpoint; import io.netty.handler.codec.http2.Http2Stream.State; import io.netty.util.internal.PlatformDependent; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicReference; - import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + /** * Tests for {@link DefaultHttp2Connection}. */ public class DefaultHttp2ConnectionTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); private DefaultHttp2Connection server; private DefaultHttp2Connection client; @@ -208,11 +212,22 @@ public class DefaultHttp2ConnectionTest { } @Test(expected = Http2Exception.class) - public void maxAllowedStreamsExceededShouldThrow() throws Http2Exception { + public void createShouldThrowWhenMaxAllowedStreamsExceeded() throws Http2Exception { server.local().maxActiveStreams(0); server.local().createStream(2, true); } + @Test + public void createIdleShouldSucceedWhenMaxAllowedStreamsExceeded() throws Http2Exception { + server.local().maxActiveStreams(0); + Http2Stream stream = server.local().createIdleStream(2); + + // Opening should fail, however. + thrown.expect(Http2Exception.class); + thrown.expectMessage("Maximum active streams violated for this endpoint."); + stream.open(false); + } + @Test(expected = Http2Exception.class) public void reserveWithPushDisallowedShouldThrow() throws Http2Exception { Http2Stream stream = server.remote().createStream(3, true); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java index a4fcf6e441..6d002cfcdc 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2ConnectionHandlerTest.java @@ -316,11 +316,11 @@ public class Http2ConnectionHandlerTest { when(frameWriter.writeRstStream(eq(ctx), eq(STREAM_ID), anyLong(), any(ChannelPromise.class))).thenReturn(future); when(stream.state()).thenReturn(CLOSED); + when(stream.isHeaderSent()).thenReturn(true); // The stream is "closed" but is still known about by the connection (connection().stream(..) // will return the stream). We should still write a RST_STREAM frame in this scenario. handler.resetStream(ctx, STREAM_ID, STREAM_CLOSED.code(), promise); - verify(frameWriter).writeRstStream(eq(ctx), eq(STREAM_ID), anyLong(), - any(ChannelPromise.class)); + verify(frameWriter).writeRstStream(eq(ctx), eq(STREAM_ID), anyLong(), any(ChannelPromise.class)); } @SuppressWarnings("unchecked")