No HTTP/2 RST_STREAM if no prior HEADERS were sent

Motivation:

Because we flow control HEADERS frames, it's possible that an intermediate error can result in a RST_STREAM frame being sent for a frame that the other endpoint is not yet aware of. This is a violation of the spec and will either result in spammy logs at the other endpoint or broken connections.

Modifications:

Modified the HTTP/2 handler so that it only sends RST_STREAM if it has sent at least one HEADERS frame to the remote endpoint for the stream.

Result:

Fixes #4465
This commit is contained in:
nmittler 2015-11-20 12:12:16 -08:00
parent 227e67900e
commit 8cd259896e
9 changed files with 435 additions and 297 deletions

View File

@ -296,6 +296,7 @@ public class DefaultHttp2Connection implements Http2Connection {
private int totalChildWeights;
private int prioritizableForTree = 1;
private boolean resetSent;
private boolean headerSent;
DefaultStream(int id, State state) {
this.id = id;
@ -323,6 +324,17 @@ public class DefaultHttp2Connection implements Http2Connection {
return this;
}
@Override
public boolean isHeaderSent() {
return headerSent;
}
@Override
public Http2Stream headerSent() {
headerSent = true;
return this;
}
@Override
public final <V> V setProperty(PropertyKey key, V value) {
return properties.add(verifyKey(key), value);
@ -432,6 +444,9 @@ public class DefaultHttp2Connection implements Http2Connection {
@Override
public Http2Stream open(boolean halfClosed) throws Http2Exception {
state = activeState(id, state, isLocal(), halfClosed);
if (!createdBy().canOpenStream()) {
throw connectionError(PROTOCOL_ERROR, "Maximum active streams violated for this endpoint.");
}
activate();
return this;
}
@ -548,17 +563,7 @@ public class DefaultHttp2Connection implements Http2Connection {
children = new IntObjectHashMap<DefaultStream>(INITIAL_CHILDREN_MAP_SIZE);
}
@Override
public final boolean remoteSideOpen() {
return state == HALF_CLOSED_LOCAL || state == OPEN || state == RESERVED_REMOTE;
}
@Override
public final boolean localSideOpen() {
return state == HALF_CLOSED_REMOTE || state == OPEN || state == RESERVED_LOCAL;
}
final DefaultEndpoint<? extends Http2FlowController> createdBy() {
DefaultEndpoint<? extends Http2FlowController> createdBy() {
return localEndpoint.isValidStreamId(id) ? localEndpoint : remoteEndpoint;
}
@ -792,11 +797,26 @@ public class DefaultHttp2Connection implements Http2Connection {
return false;
}
@Override
DefaultEndpoint<? extends Http2FlowController> createdBy() {
return null;
}
@Override
public Http2Stream resetSent() {
throw new UnsupportedOperationException();
}
@Override
public boolean isHeaderSent() {
return false;
}
@Override
public Http2Stream headerSent() {
throw new UnsupportedOperationException();
}
@Override
public Http2Stream setPriority(int parentStreamId, short weight, boolean exclusive) {
throw new UnsupportedOperationException();
@ -869,12 +889,17 @@ public class DefaultHttp2Connection implements Http2Connection {
}
@Override
public boolean canCreateStream() {
return nextStreamId() > 0 && numActiveStreams + 1 <= maxActiveStreams;
public boolean isExhausted() {
return nextStreamId() <= 0;
}
@Override
public boolean canOpenStream() {
return numActiveStreams + 1 <= maxActiveStreams;
}
private DefaultStream createStream(int streamId, State state) throws Http2Exception {
checkNewStreamAllowed(streamId);
checkNewStreamAllowed(streamId, state);
// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId, state);
@ -899,6 +924,11 @@ public class DefaultHttp2Connection implements Http2Connection {
return stream;
}
@Override
public boolean created(Http2Stream stream) {
return stream instanceof DefaultStream && ((DefaultStream) stream).createdBy() == this;
}
@Override
public boolean isServer() {
return server;
@ -909,16 +939,17 @@ public class DefaultHttp2Connection implements Http2Connection {
if (parent == null) {
throw connectionError(PROTOCOL_ERROR, "Parent stream missing");
}
if (isLocal() ? !parent.localSideOpen() : !parent.remoteSideOpen()) {
if (isLocal() ? !parent.state().localSideOpen() : !parent.state().remoteSideOpen()) {
throw connectionError(PROTOCOL_ERROR, "Stream %d is not open for sending push promise", parent.id());
}
if (!opposite().allowPushTo()) {
throw connectionError(PROTOCOL_ERROR, "Server push not allowed to opposite endpoint.");
}
checkNewStreamAllowed(streamId);
State state = isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE;
checkNewStreamAllowed(streamId, state);
// Create and initialize the stream.
DefaultStream stream = new DefaultStream(streamId, isLocal() ? RESERVED_LOCAL : RESERVED_REMOTE);
DefaultStream stream = new DefaultStream(streamId, state);
// Update the next and last stream IDs.
nextStreamId = streamId + 2;
@ -1005,7 +1036,7 @@ public class DefaultHttp2Connection implements Http2Connection {
return isLocal() ? remoteEndpoint : localEndpoint;
}
private void checkNewStreamAllowed(int streamId) throws Http2Exception {
private void checkNewStreamAllowed(int streamId, State state) throws Http2Exception {
if (goAwayReceived() && streamId > localEndpoint.lastStreamKnownByPeer()) {
throw connectionError(PROTOCOL_ERROR, "Cannot create stream %d since this endpoint has received a " +
"GOAWAY frame with last stream id %d.", streamId,
@ -1024,8 +1055,11 @@ public class DefaultHttp2Connection implements Http2Connection {
throw closedStreamError(PROTOCOL_ERROR, "Request stream %d is behind the next expected stream %d",
streamId, nextStreamId);
}
if (!canCreateStream()) {
throw connectionError(REFUSED_STREAM, "Maximum streams exceeded for this endpoint.");
if (isExhausted()) {
throw connectionError(REFUSED_STREAM, "Stream IDs are exhausted for this endpoint.");
}
if ((state.localSideOpen() || state.remoteSideOpen()) && !canOpenStream()) {
throw connectionError(REFUSED_STREAM, "Maximum active streams violated for this endpoint.");
}
}

View File

@ -409,7 +409,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
promise.addListener(this);
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
stream.headerSent();
frameWriter.writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
}

View File

@ -158,21 +158,34 @@ public interface Http2Connection {
boolean mayHaveCreatedStream(int streamId);
/**
* Indicates whether or not this endpoint is currently allowed to create new streams. This will be
* be false if {@link #numActiveStreams()} + 1 >= {@link #maxActiveStreams()} or if the stream IDs
* for this endpoint have been exhausted (i.e. {@link #nextStreamId()} < 0).
* Indicates whether or not this endpoint created the given stream.
*/
boolean canCreateStream();
boolean created(Http2Stream stream);
/**
* Indicates whether or not the stream IDs for this endpoint have been exhausted
* (i.e. {@link #nextStreamId()} < 0). If {@code true}, any attempt to create new streams
* on this endpoint will fail.
*/
boolean isExhausted();
/**
* Indicates whether or a stream created by this endpoint can be opened without violating
* {@link #maxActiveStreams()}.
*/
boolean canOpenStream();
/**
* Creates a stream initiated by this endpoint. This could fail for the following reasons:
* <ul>
* <li>The requested stream ID is not the next sequential ID for this endpoint.</li>
* <li>The stream already exists.</li>
* <li>{@link #canCreateStream()} is {@code false}.</li>
* <li>The connection is marked as going away.</li>
* </ul>
* <p>
* Note that IDLE streams can always be created so long as there are stream IDs available.
* The {@link #numActiveStreams()} will be enforced upon attempting to open the stream.
* <p>
* If the stream is intended to initialized to {@link Http2Stream.State#OPEN} then use
* {@link #createStream(int, boolean)} otherwise optimizations in {@link Listener}s may not work
* and memory may be thrashed. The caller is expected to {@link Http2Stream#open(boolean)} the stream.
@ -186,7 +199,8 @@ public interface Http2Connection {
* <ul>
* <li>The requested stream ID is not the next sequential ID for this endpoint.</li>
* <li>The stream already exists.</li>
* <li>{@link #canCreateStream()} is {@code false}.</li>
* <li>{@link #isExhausted()} is {@code true}</li>
* <li>{@link #canOpenStream()} is {@code false}.</li>
* <li>The connection is marked as going away.</li>
* </ul>
* <p>

View File

@ -43,6 +43,7 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
import static io.netty.handler.codec.http2.Http2FrameTypes.SETTINGS;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.util.CharsetUtil.UTF_8;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.min;
@ -790,7 +791,14 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
return promise.setSuccess();
}
ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
final ChannelFuture future;
if (stream.state() == IDLE || (connection().local().created(stream) && !stream.isHeaderSent())) {
// The other endpoint doesn't know about the stream yet, so we can't actually send
// the RST_STREAM frame. The HTTP/2 spec also disallows sending RST_STREAM for IDLE streams.
future = promise.setSuccess();
} else {
future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
}
// Synchronously set the resetSent flag to prevent any subsequent calls
// from resulting in multiple reset frames being sent.

View File

@ -27,17 +27,17 @@ public interface Http2LifecycleManager {
/**
* Closes the local side of the {@code stream}. Depending on the {@code stream} state this may result in
* {@code stream} being closed. See {@link closeStream(Http2Stream, ChannelFuture)}.
* {@code stream} being closed. See {@link #closeStream(Http2Stream, ChannelFuture)}.
* @param stream the stream to be half closed.
* @param future See {@link closeStream(Http2Stream, ChannelFuture)}.
* @param future See {@link #closeStream(Http2Stream, ChannelFuture)}.
*/
void closeStreamLocal(Http2Stream stream, ChannelFuture future);
/**
* Closes the remote side of the {@code stream}. Depending on the {@code stream} state this may result in
* {@code stream} being closed. See {@link closeStream(Http2Stream, ChannelFuture)}.
* {@code stream} being closed. See {@link #closeStream(Http2Stream, ChannelFuture)}.
* @param stream the stream to be half closed.
* @param future See {@link closeStream(Http2Stream, ChannelFuture)}.
* @param future See {@link #closeStream(Http2Stream, ChannelFuture)}.
*/
void closeStreamRemote(Http2Stream stream, ChannelFuture future);

View File

@ -24,13 +24,37 @@ public interface Http2Stream {
* The allowed states of an HTTP2 stream.
*/
enum State {
IDLE,
RESERVED_LOCAL,
RESERVED_REMOTE,
OPEN,
HALF_CLOSED_LOCAL,
HALF_CLOSED_REMOTE,
CLOSED
IDLE(false, false),
RESERVED_LOCAL(false, false),
RESERVED_REMOTE(false, false),
OPEN(true, true),
HALF_CLOSED_LOCAL(false, true),
HALF_CLOSED_REMOTE(true, false),
CLOSED(false, false);
private final boolean localSideOpen;
private final boolean remoteSideOpen;
State(boolean localSideOpen, boolean remoteSideOpen) {
this.localSideOpen = localSideOpen;
this.remoteSideOpen = remoteSideOpen;
}
/**
* Indicates whether the local side of this stream is open (i.e. the state is either
* {@link State#OPEN} or {@link State#HALF_CLOSED_REMOTE}).
*/
public boolean localSideOpen() {
return localSideOpen;
}
/**
* Indicates whether the remote side of this stream is open (i.e. the state is either
* {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}).
*/
public boolean remoteSideOpen() {
return remoteSideOpen;
}
}
/**
@ -87,16 +111,16 @@ public interface Http2Stream {
Http2Stream resetSent();
/**
* Indicates whether the remote side of this stream is open (i.e. the state is either
* {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}).
* Indicates whether or not at least one {@code HEADERS} frame has been sent from the local endpoint
* for this stream.
*/
boolean remoteSideOpen();
boolean isHeaderSent();
/**
* Indicates whether the local side of this stream is open (i.e. the state is either
* {@link State#OPEN} or {@link State#HALF_CLOSED_REMOTE}).
* Sets the flag indicating that a {@code HEADERS} frame has been sent from the local endpoint
* for this stream. This does not affect the stream state.
*/
boolean localSideOpen();
Http2Stream headerSent();
/**
* Associates the application-defined data with this stream.

View File

@ -14,20 +14,18 @@
*/
package io.netty.handler.codec.http2;
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Stream.State.CLOSED;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_LOCAL;
import static io.netty.handler.codec.http2.Http2Stream.State.HALF_CLOSED_REMOTE;
import static io.netty.handler.codec.http2.Http2Stream.State.IDLE;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_LOCAL;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -44,54 +42,38 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayList;
import java.util.List;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.List;
/**
* Tests for {@link DefaultHttp2ConnectionEncoder}
*/
public class DefaultHttp2ConnectionEncoderTest {
private static final int STREAM_ID = 1;
private static final int PUSH_STREAM_ID = 2;
private Http2ConnectionEncoder encoder;
private ChannelPromise promise;
@Mock
private Http2Connection connection;
@Mock
private Http2Connection.Endpoint<Http2RemoteFlowController> remote;
@Mock
private Http2Connection.Endpoint<Http2LocalFlowController> local;
private static final int STREAM_ID = 2;
private static final int PUSH_STREAM_ID = 4;
@Mock
private Http2RemoteFlowController remoteFlow;
@ -102,18 +84,6 @@ public class DefaultHttp2ConnectionEncoderTest {
@Mock
private Channel channel;
@Mock
private ChannelPromise voidPromise;
@Mock
private ChannelFuture future;
@Mock
private Http2Stream stream;
@Mock
private Http2Stream pushStream;
@Mock
private Http2FrameListener listener;
@ -123,167 +93,196 @@ public class DefaultHttp2ConnectionEncoderTest {
@Mock
private Http2FrameWriter.Configuration writerConfig;
@Mock
private Http2FrameSizePolicy frameSizePolicy;
@Mock
private Http2LifecycleManager lifecycleManager;
private DefaultHttp2ConnectionEncoder encoder;
private Http2Connection connection;
private ArgumentCaptor<Http2RemoteFlowController.FlowControlled> payloadCaptor;
private List<String> writtenData;
private List<Integer> writtenPadding;
private boolean streamClosed;
@SuppressWarnings("unchecked")
@Before
public void setup() throws Exception {
MockitoAnnotations.initMocks(this);
promise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
when(voidPromise.addListener(Matchers.<GenericFutureListener<Future<? super Void>>>any())).thenThrow(
new AssertionFailedError());
when(voidPromise.addListeners(Matchers.<GenericFutureListener<Future<? super Void>>>any())).thenThrow(
new AssertionFailedError());
when(voidPromise.channel()).thenReturn(channel);
when(channel.isActive()).thenReturn(true);
when(stream.id()).thenReturn(STREAM_ID);
when(stream.state()).thenReturn(OPEN);
when(stream.open(anyBoolean())).thenReturn(stream);
when(pushStream.id()).thenReturn(PUSH_STREAM_ID);
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
Http2StreamVisitor visitor = in.getArgumentAt(0, Http2StreamVisitor.class);
if (!visitor.visit(stream)) {
return stream;
}
return null;
}
}).when(connection).forEachActiveStream(any(Http2StreamVisitor.class));
when(connection.stream(STREAM_ID)).thenReturn(stream);
when(connection.local()).thenReturn(local);
when(connection.remote()).thenReturn(remote);
when(remote.flowController()).thenReturn(remoteFlow);
when(writer.configuration()).thenReturn(writerConfig);
when(local.createIdleStream(eq(STREAM_ID))).thenReturn(stream);
when(local.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(remote.createIdleStream(eq(STREAM_ID))).thenReturn(stream);
when(remote.reservePushStream(eq(PUSH_STREAM_ID), eq(stream))).thenReturn(pushStream);
when(writer.writeSettings(eq(ctx), any(Http2Settings.class), eq(promise))).thenReturn(future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise)))
.thenReturn(future);
when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy);
when(frameSizePolicy.maxFrameSize()).thenReturn(64);
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock in) throws Throwable {
return ((ChannelPromise) in.getArguments()[2]).setSuccess();
}
}).when(writer).writeSettings(eq(ctx), any(Http2Settings.class), any(ChannelPromise.class));
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock in) throws Throwable {
((ByteBuf) in.getArguments()[3]).release();
return ((ChannelPromise) in.getArguments()[4]).setSuccess();
}
}).when(writer).writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), any(ChannelPromise.class));
writtenData = new ArrayList<String>();
writtenPadding = new ArrayList<Integer>();
when(writer.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(), any(ChannelPromise.class)))
.then(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
// Make sure we only receive stream closure on the last frame and that void promises are used for
// all writes except the last one.
ChannelPromise receivedPromise = (ChannelPromise) invocationOnMock.getArguments()[5];
if (streamClosed) {
fail("Stream already closed");
} else {
streamClosed = (Boolean) invocationOnMock.getArguments()[4];
}
writtenPadding.add((Integer) invocationOnMock.getArguments()[3]);
ByteBuf data = (ByteBuf) invocationOnMock.getArguments()[2];
writtenData.add(data.toString(UTF_8));
// Release the buffer just as DefaultHttp2FrameWriter does
data.release();
// Let the promise succeed to trigger listeners.
receivedPromise.trySuccess();
return future;
}
});
when(writer.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean(),
any(ChannelPromise.class))).then(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock in) throws Throwable {
// Make sure we only receive stream closure on the last frame and that void promises
// are used for all writes except the last one.
ChannelPromise promise = (ChannelPromise) in.getArguments()[5];
if (streamClosed) {
fail("Stream already closed");
} else {
streamClosed = (Boolean) in.getArguments()[4];
}
writtenPadding.add((Integer) in.getArguments()[3]);
ByteBuf data = (ByteBuf) in.getArguments()[2];
writtenData.add(data.toString(UTF_8));
// Release the buffer just as DefaultHttp2FrameWriter does
data.release();
// Let the promise succeed to trigger listeners.
return promise.setSuccess();
}
});
when(writer.writeHeaders(eq(ctx), anyInt(), any(Http2Headers.class), anyInt(), anyShort(), anyBoolean(),
anyInt(), anyBoolean(), any(ChannelPromise.class)))
.then(new Answer<Object>() {
.then(new Answer<ChannelFuture>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
ChannelPromise receivedPromise = (ChannelPromise) invocationOnMock.getArguments()[8];
public ChannelFuture answer(InvocationOnMock invocationOnMock) throws Throwable {
ChannelPromise promise = (ChannelPromise) invocationOnMock.getArguments()[8];
if (streamClosed) {
fail("Stream already closed");
} else {
streamClosed = (Boolean) invocationOnMock.getArguments()[5];
}
receivedPromise.trySuccess();
return future;
return promise.setSuccess();
}
});
payloadCaptor = ArgumentCaptor.forClass(Http2RemoteFlowController.FlowControlled.class);
doNothing().when(remoteFlow).addFlowControlled(eq(stream), payloadCaptor.capture());
doNothing().when(remoteFlow).addFlowControlled(any(Http2Stream.class), payloadCaptor.capture());
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.channel()).thenReturn(channel);
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
doAnswer(new Answer<ChannelPromise>() {
@Override
public ChannelPromise answer(InvocationOnMock in) throws Throwable {
return newPromise();
}
}).when(ctx).newPromise();
doAnswer(new Answer<ChannelFuture>() {
@Override
public ChannelFuture answer(InvocationOnMock in) throws Throwable {
return newSucceededFuture();
}
}).when(ctx).newSucceededFuture();
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT);
// Use a server-side connection so we can test server push.
connection = new DefaultHttp2Connection(true);
connection.remote().flowController(remoteFlow);
encoder = new DefaultHttp2ConnectionEncoder(connection, writer);
encoder.lifecycleManager(lifecycleManager);
}
@Test
public void dataWriteShouldSucceed() throws Exception {
createStream(STREAM_ID, false);
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
ChannelPromise p = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, true, p);
assertEquals(payloadCaptor.getValue().size(), 8);
payloadCaptor.getValue().write(ctx, 8);
assertEquals(0, payloadCaptor.getValue().size());
assertEquals("abcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
assertTrue(p.isSuccess());
}
@Test
public void dataFramesShouldMerge() throws Exception {
createStream(STREAM_ID, false);
final ByteBuf data = dummyData().retain();
DefaultChannelPromise secondPromise = new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
encoder.writeData(ctx, STREAM_ID, data, 0, true, secondPromise);
ChannelPromise promise1 = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise1);
ChannelPromise promise2 = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise2);
// Now merge the two payloads.
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
FlowControlled mergedPayload = capturedWrites.get(0);
mergedPayload.merge(ctx, capturedWrites.get(1));
assertEquals(16, mergedPayload.size());
assertFalse(secondPromise.isSuccess());
assertFalse(promise1.isDone());
assertFalse(promise2.isDone());
// Write the merged payloads and verify it was written correctly.
mergedPayload.write(ctx, 16);
assertEquals(0, mergedPayload.size());
assertEquals("abcdefghabcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
// Second promise is notified after write of the merged payload completes
assertTrue(secondPromise.isSuccess());
assertTrue(promise1.isSuccess());
assertTrue(promise2.isSuccess());
}
@Test
public void dataFramesShouldMergeUseVoidPromise() throws Exception {
createStream(STREAM_ID, false);
final ByteBuf data = dummyData().retain();
when(voidPromise.isVoid()).thenReturn(true);
encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise);
encoder.writeData(ctx, STREAM_ID, data, 0, true, voidPromise);
ChannelPromise promise1 = newVoidPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise1);
ChannelPromise promise2 = newVoidPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise2);
// Now merge the two payloads.
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
FlowControlled mergedPayload = capturedWrites.get(0);
assertTrue(mergedPayload.merge(ctx, capturedWrites.get(1)));
mergedPayload.merge(ctx, capturedWrites.get(1));
assertEquals(16, mergedPayload.size());
assertFalse(promise1.isSuccess());
assertFalse(promise2.isSuccess());
// Write the merged payloads and verify it was written correctly.
mergedPayload.write(ctx, 16);
assertEquals(0, mergedPayload.size());
assertEquals("abcdefghabcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
// The promises won't be set since there are no listeners.
assertFalse(promise1.isSuccess());
assertFalse(promise2.isSuccess());
}
@Test
public void dataFramesDontMergeWithHeaders() throws Exception {
createStream(STREAM_ID, false);
final ByteBuf data = dummyData().retain();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
encoder.writeData(ctx, STREAM_ID, data, 0, true, newPromise());
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, newPromise());
List<FlowControlled> capturedWrites = payloadCaptor.getAllValues();
assertFalse(capturedWrites.get(0).merge(ctx, capturedWrites.get(1)));
}
@Test
public void emptyFrameShouldWritePadding() throws Exception {
public void emptyFrameShouldSplitPadding() throws Exception {
ByteBuf data = Unpooled.buffer(0);
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertSplitPaddingOnEmptyBuffer(data);
assertEquals(0, data.refCnt());
}
private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Exception {
createStream(STREAM_ID, false);
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
ChannelPromise p = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 10, true, p);
assertEquals(payloadCaptor.getValue().size(), 10);
payloadCaptor.getValue().write(ctx, 10);
// writer was called 2 times
@ -291,128 +290,136 @@ public class DefaultHttp2ConnectionEncoderTest {
assertEquals("", writtenData.get(0));
assertEquals(10, (int) writtenPadding.get(0));
assertEquals(0, data.refCnt());
assertTrue(p.isSuccess());
}
@Test
public void headersWriteForUnknownStreamShouldCreateStream() throws Exception {
int streamId = 5;
when(stream.id()).thenReturn(streamId);
when(stream.state()).thenReturn(IDLE);
mockFutureAddListener(true);
when(local.createStream(eq(streamId), anyBoolean())).thenReturn(stream);
writeAllFlowControlledFrames();
final int streamId = 6;
ChannelPromise promise = newPromise();
encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(local).createStream(eq(streamId), eq(false));
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(ctx, 0);
verify(writer).writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
assertTrue(promise.isSuccess());
}
@Test
public void headersWriteShouldOpenStreamForPush() throws Exception {
mockFutureAddListener(true);
when(stream.state()).thenReturn(RESERVED_LOCAL);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(stream).open(false);
verify(stream, never()).closeLocalSide();
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(ctx, 0);
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
writeAllFlowControlledFrames();
Http2Stream parent = createStream(STREAM_ID, false);
reservePushStream(PUSH_STREAM_ID, parent);
ChannelPromise promise = newPromise();
encoder.writeHeaders(ctx, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
assertEquals(HALF_CLOSED_REMOTE, stream(PUSH_STREAM_ID).state());
verify(writer).writeHeaders(eq(ctx), eq(PUSH_STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void pushPromiseWriteAfterGoAwayReceivedShouldFail() throws Exception {
when(connection.goAwayReceived()).thenReturn(true);
createStream(STREAM_ID, false);
goAwayReceived(0);
ChannelFuture future = encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0,
promise);
assertTrue(future.awaitUninterruptibly().cause() instanceof Http2Exception);
newPromise());
assertTrue(future.isDone());
assertFalse(future.isSuccess());
}
@Test
public void pushPromiseWriteShouldReserveStream() throws Exception {
createStream(STREAM_ID, false);
ChannelPromise promise = newPromise();
encoder.writePushPromise(ctx, STREAM_ID, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, promise);
verify(local).reservePushStream(eq(PUSH_STREAM_ID), eq(stream));
assertEquals(RESERVED_LOCAL, stream(PUSH_STREAM_ID).state());
verify(writer).writePushPromise(eq(ctx), eq(STREAM_ID), eq(PUSH_STREAM_ID),
eq(EmptyHttp2Headers.INSTANCE), eq(0), eq(promise));
}
@Test
public void priorityWriteAfterGoAwayShouldSucceed() throws Exception {
when(connection.goAwayReceived()).thenReturn(true);
createStream(STREAM_ID, false);
goAwayReceived(Integer.MAX_VALUE);
ChannelPromise promise = newPromise();
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
}
@Test
public void priorityWriteShouldSetPriorityForStream() throws Exception {
when(connection.stream(STREAM_ID)).thenReturn(null);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
ChannelPromise promise = newPromise();
short weight = 255;
encoder.writePriority(ctx, STREAM_ID, 0, weight, true, promise);
// Verify that this created an idle stream with the desired weight.
Http2Stream stream = stream(STREAM_ID);
assertEquals(IDLE, stream.state());
assertEquals(weight, stream.weight());
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
verify(local).createIdleStream(STREAM_ID);
verify(stream, never()).open(anyBoolean());
}
@Test
public void priorityWriteOnPreviouslyExistingStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(local).createIdleStream(eq(STREAM_ID));
when(connection.stream(STREAM_ID)).thenReturn(null);
// Just return the stream object as the connection stream to ensure the dependent stream "exists"
when(connection.stream(0)).thenReturn(stream);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream, never()).setPriority(anyInt(), anyShort(), anyBoolean());
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
verify(local).createIdleStream(STREAM_ID);
createStream(STREAM_ID, false).close();
ChannelPromise promise = newPromise();
short weight = 255;
encoder.writePriority(ctx, STREAM_ID, 0, weight, true, promise);
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq(weight), eq(true), eq(promise));
}
@Test
public void priorityWriteOnPreviouslyExistingParentStreamShouldSucceed() throws Exception {
doAnswer(new Answer<Http2Stream>() {
@Override
public Http2Stream answer(InvocationOnMock in) throws Throwable {
throw new ClosedStreamCreationException(Http2Error.INTERNAL_ERROR);
}
}).when(stream).setPriority(eq(0), eq((short) 255), eq(true));
when(connection.stream(STREAM_ID)).thenReturn(stream);
encoder.writePriority(ctx, STREAM_ID, 0, (short) 255, true, promise);
verify(stream).setPriority(eq(0), eq((short) 255), eq(true));
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(0), eq((short) 255), eq(true), eq(promise));
final int parentStreamId = STREAM_ID + 2;
createStream(STREAM_ID, false);
createStream(parentStreamId, false).close();
ChannelPromise promise = newPromise();
short weight = 255;
encoder.writePriority(ctx, STREAM_ID, parentStreamId, weight, true, promise);
verify(writer).writePriority(eq(ctx), eq(STREAM_ID), eq(parentStreamId), eq(weight), eq(true), eq(promise));
}
@Test
public void rstStreamWriteForUnknownStreamShouldIgnore() throws Exception {
ChannelPromise promise = newPromise();
encoder.writeRstStream(ctx, 5, PROTOCOL_ERROR.code(), promise);
verify(writer, never()).writeRstStream(eq(ctx), anyInt(), anyLong(), eq(promise));
}
@Test
public void rstStreamWriteShouldCloseStream() throws Exception {
public void rstStreamShouldCloseStream() throws Exception {
// Create the stream and send headers.
writeAllFlowControlledFrames();
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, newPromise());
// Now verify that a stream reset is performed.
stream(STREAM_ID);
ChannelPromise promise = newPromise();
encoder.writeRstStream(ctx, STREAM_ID, PROTOCOL_ERROR.code(), promise);
verify(lifecycleManager).resetStream(eq(ctx), eq(STREAM_ID), eq(PROTOCOL_ERROR.code()), eq(promise));
verify(lifecycleManager).resetStream(eq(ctx), eq(STREAM_ID), anyInt(), eq(promise));
}
@Test
public void pingWriteAfterGoAwayShouldSucceed() throws Exception {
when(connection.goAwayReceived()).thenReturn(true);
ChannelPromise promise = newPromise();
goAwayReceived(0);
encoder.writePing(ctx, false, emptyPingBuf(), promise);
verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise));
}
@Test
public void pingWriteShouldSucceed() throws Exception {
ChannelPromise promise = newPromise();
encoder.writePing(ctx, false, emptyPingBuf(), promise);
verify(writer).writePing(eq(ctx), eq(false), eq(emptyPingBuf()), eq(promise));
}
@Test
public void settingsWriteAfterGoAwayShouldSucceed() throws Exception {
when(connection.goAwayReceived()).thenReturn(true);
goAwayReceived(0);
ChannelPromise promise = newPromise();
encoder.writeSettings(ctx, new Http2Settings(), promise);
verify(writer).writeSettings(eq(ctx), any(Http2Settings.class), eq(promise));
}
@ -421,18 +428,23 @@ public class DefaultHttp2ConnectionEncoderTest {
public void settingsWriteShouldNotUpdateSettings() throws Exception {
Http2Settings settings = new Http2Settings();
settings.initialWindowSize(100);
settings.pushEnabled(false);
settings.maxConcurrentStreams(1000);
settings.headerTableSize(2000);
ChannelPromise promise = newPromise();
encoder.writeSettings(ctx, settings, promise);
verify(writer).writeSettings(eq(ctx), eq(settings), eq(promise));
}
@Test
public void dataWriteShouldCreateHalfClosedStream() {
mockSendFlowControlledWriteEverything();
public void dataWriteShouldCreateHalfClosedStream() throws Exception {
writeAllFlowControlledFrames();
Http2Stream stream = createStream(STREAM_ID, false);
ByteBuf data = dummyData();
ChannelPromise promise = newPromise();
encoder.writeData(ctx, STREAM_ID, data.retain(), 0, true, promise);
assertTrue(promise.isSuccess());
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
verify(lifecycleManager).closeStreamLocal(stream, promise);
assertEquals(data.toString(UTF_8), writtenData.get(0));
@ -441,51 +453,53 @@ public class DefaultHttp2ConnectionEncoderTest {
@Test
public void headersWriteShouldHalfCloseStream() throws Exception {
mockSendFlowControlledWriteEverything();
int streamId = 5;
when(stream.id()).thenReturn(streamId);
when(stream.state()).thenReturn(HALF_CLOSED_LOCAL);
mockFutureAddListener(true);
when(local.createStream(eq(streamId), anyBoolean())).thenReturn(stream);
when(writer.writeHeaders(eq(ctx), eq(streamId), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)))
.thenReturn(future);
encoder.writeHeaders(ctx, streamId, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(local).createStream(eq(streamId), eq(true));
// Trigger the write and mark the promise successful to trigger listeners
assertNotNull(payloadCaptor.getValue());
payloadCaptor.getValue().write(ctx, 0);
promise.trySuccess();
verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise));
writeAllFlowControlledFrames();
createStream(STREAM_ID, false);
ChannelPromise promise = newPromise();
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
assertTrue(promise.isSuccess());
verify(lifecycleManager).closeStreamLocal(eq(stream(STREAM_ID)), eq(promise));
}
@Test
public void headersWriteShouldHalfClosePushStream() throws Exception {
mockSendFlowControlledWriteEverything();
mockFutureAddListener(true);
when(stream.state()).thenReturn(RESERVED_LOCAL).thenReturn(HALF_CLOSED_LOCAL);
when(writer.writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(true), eq(promise)))
.thenReturn(future);
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
verify(stream).open(true);
promise.trySuccess();
writeAllFlowControlledFrames();
Http2Stream parent = createStream(STREAM_ID, false);
Http2Stream stream = reservePushStream(PUSH_STREAM_ID, parent);
ChannelPromise promise = newPromise();
encoder.writeHeaders(ctx, PUSH_STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, true, promise);
assertEquals(HALF_CLOSED_REMOTE, stream.state());
assertTrue(promise.isSuccess());
verify(lifecycleManager).closeStreamLocal(eq(stream), eq(promise));
}
@Test
public void encoderDelegatesGoAwayToLifeCycleManager() {
ChannelPromise promise = newPromise();
encoder.writeGoAway(ctx, STREAM_ID, Http2Error.INTERNAL_ERROR.code(), null, promise);
verify(lifecycleManager).goAway(eq(ctx), eq(STREAM_ID), eq(Http2Error.INTERNAL_ERROR.code()),
eq((ByteBuf) null), eq(promise));
eq((ByteBuf) null), eq(promise));
verifyNoMoreInteractions(writer);
}
@Test
public void dataWriteToClosedStreamShouldFail() {
when(stream.state()).thenReturn(CLOSED);
public void dataWriteToClosedStreamShouldFail() throws Exception {
createStream(STREAM_ID, false).close();
ByteBuf data = mock(ByteBuf.class);
ChannelPromise promise = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
assertTrue(promise.isDone());
assertFalse(promise.isSuccess());
assertThat(promise.cause(), instanceOf(IllegalArgumentException.class));
verify(data).release();
}
@Test
public void dataWriteToHalfClosedLocalStreamShouldFail() throws Exception {
createStream(STREAM_ID, true);
ByteBuf data = mock(ByteBuf.class);
ChannelPromise promise = newPromise();
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
assertTrue(promise.isDone());
assertFalse(promise.isSuccess());
@ -494,51 +508,45 @@ public class DefaultHttp2ConnectionEncoderTest {
}
@Test
public void dataWriteToHalfClosedLocalStreamShouldFail() {
when(stream.state()).thenReturn(HALF_CLOSED_LOCAL);
public void canWriteDataFrameAfterGoAwaySent() throws Exception {
Http2Stream stream = createStream(STREAM_ID, false);
connection.goAwaySent(0, 0, EMPTY_BUFFER);
ByteBuf data = mock(ByteBuf.class);
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
assertTrue(promise.isDone());
assertFalse(promise.isSuccess());
assertThat(promise.cause(), instanceOf(IllegalStateException.class));
verify(data).release();
}
@Test
public void canWriteDataFrameAfterGoAwaySent() {
when(connection.goAwaySent()).thenReturn(true);
when(remote.lastStreamKnownByPeer()).thenReturn(0);
ByteBuf data = mock(ByteBuf.class);
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
encoder.writeData(ctx, STREAM_ID, data, 0, false, newPromise());
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
}
@Test
public void canWriteHeaderFrameAfterGoAwaySent() {
when(connection.goAwaySent()).thenReturn(true);
when(remote.lastStreamKnownByPeer()).thenReturn(0);
public void canWriteHeaderFrameAfterGoAwaySent() throws Exception {
writeAllFlowControlledFrames();
createStream(STREAM_ID, false);
goAwaySent(0);
ChannelPromise promise = newPromise();
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
@Test
public void canWriteDataFrameAfterGoAwayReceived() {
when(connection.goAwayReceived()).thenReturn(true);
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
public void canWriteDataFrameAfterGoAwayReceived() throws Exception {
Http2Stream stream = createStream(STREAM_ID, false);
goAwayReceived(STREAM_ID);
ByteBuf data = mock(ByteBuf.class);
encoder.writeData(ctx, STREAM_ID, data, 0, false, promise);
encoder.writeData(ctx, STREAM_ID, data, 0, false, newPromise());
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
}
@Test
public void canWriteHeaderFrameAfterGoAwayReceived() {
when(connection.goAwayReceived()).thenReturn(true);
when(local.lastStreamKnownByPeer()).thenReturn(STREAM_ID);
writeAllFlowControlledFrames();
goAwayReceived(STREAM_ID);
ChannelPromise promise = newPromise();
encoder.writeHeaders(ctx, STREAM_ID, EmptyHttp2Headers.INSTANCE, 0, false, promise);
verify(remoteFlow).addFlowControlled(eq(stream), any(FlowControlled.class));
verify(writer).writeHeaders(eq(ctx), eq(STREAM_ID), eq(EmptyHttp2Headers.INSTANCE), eq(0),
eq(DEFAULT_PRIORITY_WEIGHT), eq(false), eq(0), eq(false), eq(promise));
}
private void mockSendFlowControlledWriteEverything() {
private void writeAllFlowControlledFrames() {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
@ -547,22 +555,56 @@ public class DefaultHttp2ConnectionEncoderTest {
flowControlled.writeComplete();
return null;
}
}).when(remoteFlow).addFlowControlled(eq(stream), payloadCaptor.capture());
}).when(remoteFlow).addFlowControlled(any(Http2Stream.class), payloadCaptor.capture());
}
private void mockFutureAddListener(boolean success) {
when(future.isSuccess()).thenReturn(success);
if (!success) {
when(future.cause()).thenReturn(new Exception("Fake Exception"));
}
doAnswer(new Answer<Void>() {
private Http2Stream createStream(int streamId, boolean halfClosed) throws Http2Exception {
return connection.local().createStream(streamId, halfClosed);
}
private Http2Stream reservePushStream(int pushStreamId, Http2Stream parent) throws Http2Exception {
return connection.local().reservePushStream(pushStreamId, parent);
}
private Http2Stream stream(int streamId) {
return connection.stream(streamId);
}
private void goAwayReceived(int lastStreamId) {
connection.goAwayReceived(lastStreamId, 0, EMPTY_BUFFER);
}
private void goAwaySent(int lastStreamId) {
connection.goAwaySent(lastStreamId, 0, EMPTY_BUFFER);
}
private ChannelPromise newPromise() {
return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
}
private ChannelPromise newVoidPromise() {
return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE) {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
ChannelFutureListener listener = (ChannelFutureListener) invocation.getArguments()[0];
listener.operationComplete(future);
return null;
public ChannelPromise addListener(
GenericFutureListener<? extends Future<? super Void>> listener) {
throw new AssertionFailedError();
}
}).when(future).addListener(any(ChannelFutureListener.class));
@Override
public ChannelPromise addListeners(
GenericFutureListener<? extends Future<? super Void>>... listeners) {
throw new AssertionFailedError();
}
@Override
public boolean isVoid() {
return true;
}
};
}
private ChannelFuture newSucceededFuture() {
return newPromise().setSuccess();
}
private static ByteBuf dummyData() {

View File

@ -33,28 +33,32 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.internal.PlatformDependent;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/**
* Tests for {@link DefaultHttp2Connection}.
*/
public class DefaultHttp2ConnectionTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
private DefaultHttp2Connection server;
private DefaultHttp2Connection client;
@ -208,11 +212,22 @@ public class DefaultHttp2ConnectionTest {
}
@Test(expected = Http2Exception.class)
public void maxAllowedStreamsExceededShouldThrow() throws Http2Exception {
public void createShouldThrowWhenMaxAllowedStreamsExceeded() throws Http2Exception {
server.local().maxActiveStreams(0);
server.local().createStream(2, true);
}
@Test
public void createIdleShouldSucceedWhenMaxAllowedStreamsExceeded() throws Http2Exception {
server.local().maxActiveStreams(0);
Http2Stream stream = server.local().createIdleStream(2);
// Opening should fail, however.
thrown.expect(Http2Exception.class);
thrown.expectMessage("Maximum active streams violated for this endpoint.");
stream.open(false);
}
@Test(expected = Http2Exception.class)
public void reserveWithPushDisallowedShouldThrow() throws Http2Exception {
Http2Stream stream = server.remote().createStream(3, true);

View File

@ -316,11 +316,11 @@ public class Http2ConnectionHandlerTest {
when(frameWriter.writeRstStream(eq(ctx), eq(STREAM_ID),
anyLong(), any(ChannelPromise.class))).thenReturn(future);
when(stream.state()).thenReturn(CLOSED);
when(stream.isHeaderSent()).thenReturn(true);
// The stream is "closed" but is still known about by the connection (connection().stream(..)
// will return the stream). We should still write a RST_STREAM frame in this scenario.
handler.resetStream(ctx, STREAM_ID, STREAM_CLOSED.code(), promise);
verify(frameWriter).writeRstStream(eq(ctx), eq(STREAM_ID), anyLong(),
any(ChannelPromise.class));
verify(frameWriter).writeRstStream(eq(ctx), eq(STREAM_ID), anyLong(), any(ChannelPromise.class));
}
@SuppressWarnings("unchecked")