Consolidating HTTP/2 stream state
Motivation: Http2Stream has several methods that provide state information. We need to simplify how state is used and consolidate as many of these fields as possible. Modifications: Since we already have a concept of a stream being active or inactive, I'm now separating the deactivation of a stream from the act of closing it. The reason for this is the case of sending a frame with endOfStream=true. In this case we want to close the stream immediately in order to disallow further writing, but we don't want to mark the stream as inactive until the write has completed since the inactive event triggers the flow controller to cancel any pending writes on the stream. With deactivation separated out, we are able to eliminate most of the additional state methods with the exception of `isResetSent`. This is still required because we need to ignore inbound frames in this case (as per the spec), since the remote endpoint may not yet know that the stream has been closed. Result: Fixes #3382
This commit is contained in:
parent
cabecee127
commit
a833fbe9b9
@ -139,6 +139,11 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
return Collections.unmodifiableSet(activeStreams);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deactivate(Http2Stream stream) {
|
||||
deactivateInternal((DefaultStream) stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Endpoint<Http2LocalFlowController> local() {
|
||||
return localEndpoint;
|
||||
@ -195,7 +200,7 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
stream.parent().removeChild(stream);
|
||||
}
|
||||
|
||||
private void activate(DefaultStream stream) {
|
||||
private void activateInternal(DefaultStream stream) {
|
||||
if (activeStreams.add(stream)) {
|
||||
// Update the number of active streams initiated by the endpoint.
|
||||
stream.createdBy().numActiveStreams++;
|
||||
@ -207,6 +212,21 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
}
|
||||
|
||||
private void deactivateInternal(DefaultStream stream) {
|
||||
if (activeStreams.remove(stream)) {
|
||||
// Update the number of active streams initiated by the endpoint.
|
||||
stream.createdBy().numActiveStreams--;
|
||||
|
||||
// Notify the listeners.
|
||||
for (Listener listener : listeners) {
|
||||
listener.streamInactive(stream);
|
||||
}
|
||||
|
||||
// Mark this stream for removal.
|
||||
removalPolicy.markForRemoval(stream);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple stream implementation. Streams can be compared to each other by priority.
|
||||
*/
|
||||
@ -218,9 +238,6 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
private IntObjectMap<DefaultStream> children = newChildMap();
|
||||
private int totalChildWeights;
|
||||
private boolean resetSent;
|
||||
private boolean resetReceived;
|
||||
private boolean endOfStreamSent;
|
||||
private boolean endOfStreamReceived;
|
||||
private PropertyMap data;
|
||||
|
||||
DefaultStream(int id) {
|
||||
@ -238,39 +255,6 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
return state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEndOfStreamReceived() {
|
||||
return endOfStreamReceived;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream endOfStreamReceived() {
|
||||
endOfStreamReceived = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEndOfStreamSent() {
|
||||
return endOfStreamSent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream endOfStreamSent() {
|
||||
endOfStreamSent = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResetReceived() {
|
||||
return resetReceived;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream resetReceived() {
|
||||
resetReceived = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isResetSent() {
|
||||
return resetSent;
|
||||
@ -282,11 +266,6 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReset() {
|
||||
return resetSent || resetReceived;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object setProperty(Object key, Object value) {
|
||||
return data.put(key, value);
|
||||
@ -409,7 +388,7 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
throw streamError(id, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: " + state);
|
||||
}
|
||||
|
||||
activate(this);
|
||||
activateInternal(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -420,25 +399,10 @@ public class DefaultHttp2Connection implements Http2Connection {
|
||||
}
|
||||
|
||||
state = CLOSED;
|
||||
deactivate(this);
|
||||
|
||||
// Mark this stream for removal.
|
||||
removalPolicy.markForRemoval(this);
|
||||
deactivateInternal(this);
|
||||
return this;
|
||||
}
|
||||
|
||||
private void deactivate(DefaultStream stream) {
|
||||
if (activeStreams.remove(stream)) {
|
||||
// Update the number of active streams initiated by the endpoint.
|
||||
stream.createdBy().numActiveStreams--;
|
||||
|
||||
// Notify the listeners.
|
||||
for (Listener listener : listeners) {
|
||||
listener.streamInactive(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Http2Stream closeLocalSide() {
|
||||
switch (state) {
|
||||
|
@ -27,12 +27,13 @@ import io.netty.channel.ChannelHandlerContext;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Provides the default implementation for processing inbound frame events
|
||||
* and delegates to a {@link Http2FrameListener}
|
||||
* Provides the default implementation for processing inbound frame events and delegates to a
|
||||
* {@link Http2FrameListener}
|
||||
* <p>
|
||||
* This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
|
||||
* <p>
|
||||
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController}
|
||||
* This interface enforces inbound flow control functionality through
|
||||
* {@link Http2LocalFlowController}
|
||||
*/
|
||||
public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
private final Http2FrameListener internalFrameListener = new FrameReadListener();
|
||||
@ -215,25 +216,22 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
// Check if we received a data frame for a stream which is half-closed
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
|
||||
verifyEndOfStreamNotReceived(stream);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(stream);
|
||||
|
||||
// We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a
|
||||
// lower stream ID.
|
||||
boolean shouldApplyFlowControl = false;
|
||||
boolean shouldIgnore = shouldIgnoreFrame(stream, false);
|
||||
Http2Exception error = null;
|
||||
switch (stream.state()) {
|
||||
case OPEN:
|
||||
case HALF_CLOSED_LOCAL:
|
||||
shouldApplyFlowControl = true;
|
||||
break;
|
||||
case HALF_CLOSED_REMOTE:
|
||||
// Always fail the stream if we've more data after the remote endpoint half-closed.
|
||||
error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
|
||||
stream.id(), stream.state());
|
||||
break;
|
||||
case CLOSED:
|
||||
if (stream.isResetSent()) {
|
||||
shouldApplyFlowControl = true;
|
||||
}
|
||||
if (!shouldIgnore) {
|
||||
error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
|
||||
stream.id(), stream.state());
|
||||
@ -252,11 +250,9 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
Http2LocalFlowController flowController = flowController();
|
||||
try {
|
||||
// If we should apply flow control, do so now.
|
||||
if (shouldApplyFlowControl) {
|
||||
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
|
||||
// Update the unconsumed bytes after flow control is applied.
|
||||
unconsumedBytes = unconsumedBytes(stream);
|
||||
}
|
||||
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
|
||||
// Update the unconsumed bytes after flow control is applied.
|
||||
unconsumedBytes = unconsumedBytes(stream);
|
||||
|
||||
// If we should ignore this frame, do so now.
|
||||
if (shouldIgnore) {
|
||||
@ -288,12 +284,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
throw e;
|
||||
} finally {
|
||||
// If appropriate, returned the processed bytes to the flow controller.
|
||||
if (shouldApplyFlowControl && bytesToReturn > 0) {
|
||||
if (bytesToReturn > 0) {
|
||||
flowController.consumeBytes(ctx, stream, bytesToReturn);
|
||||
}
|
||||
|
||||
if (endOfStream) {
|
||||
stream.endOfStreamReceived();
|
||||
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
|
||||
}
|
||||
}
|
||||
@ -321,7 +316,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (shouldIgnoreFrame(stream, false)) {
|
||||
// Ignore this frame.
|
||||
return;
|
||||
@ -330,8 +324,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
if (stream == null) {
|
||||
stream = connection.createRemoteStream(streamId).open(endOfStream);
|
||||
} else {
|
||||
verifyEndOfStreamNotReceived(stream);
|
||||
|
||||
switch (stream.state()) {
|
||||
case RESERVED_REMOTE:
|
||||
case IDLE:
|
||||
@ -360,7 +352,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
|
||||
// If the headers completes this stream, close it.
|
||||
if (endOfStream) {
|
||||
stream.endOfStreamReceived();
|
||||
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
|
||||
}
|
||||
}
|
||||
@ -395,14 +386,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
verifyPrefaceReceived();
|
||||
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (stream.state() == CLOSED) {
|
||||
// RstStream frames must be ignored for closed streams.
|
||||
return;
|
||||
}
|
||||
|
||||
stream.resetReceived();
|
||||
|
||||
listener.onRstStreamRead(ctx, streamId, errorCode);
|
||||
|
||||
lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
|
||||
@ -503,12 +491,23 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
|
||||
Http2Stream parentStream = connection.requireStream(streamId);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(parentStream);
|
||||
if (shouldIgnoreFrame(parentStream, false)) {
|
||||
// Ignore frames for any stream created after we sent a go-away.
|
||||
return;
|
||||
}
|
||||
|
||||
switch (parentStream.state()) {
|
||||
case OPEN:
|
||||
case HALF_CLOSED_LOCAL:
|
||||
// Allowed to receive push promise in these states.
|
||||
break;
|
||||
default:
|
||||
// Connection error.
|
||||
throw connectionError(PROTOCOL_ERROR,
|
||||
"Stream %d in unexpected state for receiving push promise: %s",
|
||||
parentStream.id(), parentStream.state());
|
||||
}
|
||||
|
||||
// Reserve the push stream based with a priority based on the current stream's priority.
|
||||
connection.remote().reservePushStream(promisedStreamId, parentStream);
|
||||
|
||||
@ -531,7 +530,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
|
||||
Http2Stream stream = connection.requireStream(streamId);
|
||||
verifyGoAwayNotReceived();
|
||||
verifyRstStreamNotReceived(stream);
|
||||
if (stream.state() == CLOSED || shouldIgnoreFrame(stream, false)) {
|
||||
// Ignore frames for any stream created after we sent a go-away.
|
||||
return;
|
||||
@ -565,18 +563,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
return stream != null && !allowResetSent && stream.isResetSent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that a frame has not been received from remote endpoint with the
|
||||
* {@code END_STREAM} flag set. If it was, throws a connection error.
|
||||
*/
|
||||
private void verifyEndOfStreamNotReceived(Http2Stream stream) throws Http2Exception {
|
||||
if (stream.isEndOfStreamReceived()) {
|
||||
// Connection error.
|
||||
throw new Http2Exception(STREAM_CLOSED, String.format(
|
||||
"Received frame for stream %d after receiving END_STREAM", stream.id()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws a
|
||||
* connection error.
|
||||
@ -587,17 +573,5 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
|
||||
throw connectionError(PROTOCOL_ERROR, "Received frames after receiving GO_AWAY");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that a RST_STREAM frame was not previously received for the given stream. If it was, throws a
|
||||
* stream error.
|
||||
*/
|
||||
private void verifyRstStreamNotReceived(Http2Stream stream) throws Http2Exception {
|
||||
if (stream != null && stream.isResetReceived()) {
|
||||
// Stream error.
|
||||
throw streamError(stream.id(), STREAM_CLOSED,
|
||||
"Frame received after receiving RST_STREAM for stream: " + stream.id());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -155,12 +155,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
}
|
||||
|
||||
stream = connection.requireStream(streamId);
|
||||
if (stream.isResetSent()) {
|
||||
throw new IllegalStateException("Sending data after sending RST_STREAM.");
|
||||
}
|
||||
if (stream.isEndOfStreamSent()) {
|
||||
throw new IllegalStateException("Sending data after sending END_STREAM.");
|
||||
}
|
||||
|
||||
// Verify that the stream is in the appropriate state for sending DATA frames.
|
||||
switch (stream.state()) {
|
||||
@ -174,8 +168,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
}
|
||||
|
||||
if (endOfStream) {
|
||||
// Indicate that we have sent END_STREAM.
|
||||
stream.endOfStreamSent();
|
||||
lifecycleManager.closeLocalSide(stream, promise);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
data.release();
|
||||
@ -206,10 +199,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
Http2Stream stream = connection.stream(streamId);
|
||||
if (stream == null) {
|
||||
stream = connection.createLocalStream(streamId);
|
||||
} else if (stream.isResetSent()) {
|
||||
throw new IllegalStateException("Sending headers after sending RST_STREAM.");
|
||||
} else if (stream.isEndOfStreamSent()) {
|
||||
throw new IllegalStateException("Sending headers after sending END_STREAM.");
|
||||
}
|
||||
|
||||
switch (stream.state()) {
|
||||
@ -231,9 +220,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
|
||||
exclusive, padding, endOfStream, promise));
|
||||
if (endOfStream) {
|
||||
// Flag delivery of EOS synchronously to prevent subsequent frames being enqueued in the flow
|
||||
// controller.
|
||||
stream.endOfStreamSent();
|
||||
lifecycleManager.closeLocalSide(stream, promise);
|
||||
}
|
||||
return promise;
|
||||
} catch (Http2NoMoreStreamIdsException e) {
|
||||
@ -556,10 +543,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future == promise && endOfStream) {
|
||||
// Special case where we're listening to the original promise and need to close the stream.
|
||||
lifecycleManager.closeLocalSide(stream, promise);
|
||||
}
|
||||
if (!future.isSuccess()) {
|
||||
error(future.cause());
|
||||
}
|
||||
|
@ -71,15 +71,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
state(stream).window(initialWindowSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamHalfClosed(Http2Stream stream) {
|
||||
if (!stream.localSideOpen()) {
|
||||
// Any pending frames can never be written, clear and
|
||||
// write errors for any pending frames.
|
||||
state(stream).clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamInactive(Http2Stream stream) {
|
||||
// Any pending frames can never be written, clear and
|
||||
@ -212,7 +203,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
/**
|
||||
* Writes as many pending bytes as possible, according to stream priority.
|
||||
*/
|
||||
private void writePendingBytes() throws Http2Exception {
|
||||
private void writePendingBytes() {
|
||||
Http2Stream connectionStream = connection.connectionStream();
|
||||
int connectionWindow = state(connectionStream).window();
|
||||
|
||||
@ -390,10 +381,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of pending bytes for this node that will fit within the {@link #window}. This is used for
|
||||
* the priority algorithm to determine the aggregate total for {@link #priorityBytes} at each node. Each node
|
||||
* only takes into account it's stream window so that when a change occurs to the connection window, these
|
||||
* values need not change (i.e. no tree traversal is required).
|
||||
* Returns the number of pending bytes for this node that will fit within the
|
||||
* {@link #window}. This is used for the priority algorithm to determine the aggregate
|
||||
* number of bytes that can be written at each node. Each node only takes into account its
|
||||
* stream window so that when a change occurs to the connection window, these values need
|
||||
* not change (i.e. no tree traversal is required).
|
||||
*/
|
||||
int streamableBytes() {
|
||||
return max(0, min(pendingBytes, window));
|
||||
|
@ -121,9 +121,8 @@ public interface Http2Connection {
|
||||
* <li>The connection is marked as going away.</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* The caller is expected to {@link Http2Stream#open()} the stream.
|
||||
* The caller is expected to {@link Http2Stream#open(boolean)} the stream.
|
||||
* @param streamId The ID of the stream
|
||||
* @see Http2Stream#open()
|
||||
* @see Http2Stream#open(boolean)
|
||||
*/
|
||||
Http2Stream createStream(int streamId) throws Http2Exception;
|
||||
@ -232,16 +231,26 @@ public interface Http2Connection {
|
||||
Http2Stream connectionStream();
|
||||
|
||||
/**
|
||||
* Gets the number of streams that are currently either open or half-closed.
|
||||
* Gets the number of streams that actively in use. It is possible for a stream to be closed
|
||||
* but still be considered active (e.g. there is still pending data to be written).
|
||||
*/
|
||||
int numActiveStreams();
|
||||
|
||||
/**
|
||||
* Gets all streams that are currently either open or half-closed. The returned collection is
|
||||
* Gets all streams that are actively in use. The returned collection is
|
||||
* sorted by priority.
|
||||
*/
|
||||
Collection<Http2Stream> activeStreams();
|
||||
|
||||
/**
|
||||
* Indicates that the given stream is no longer actively in use. If this stream was active,
|
||||
* after calling this method it will no longer appear in the list returned by
|
||||
* {@link #activeStreams()} and {@link #numActiveStreams()} will be decremented. In addition,
|
||||
* all listeners will be notified of this event via
|
||||
* {@link Listener#streamInactive(Http2Stream)}.
|
||||
*/
|
||||
void deactivate(Http2Stream stream);
|
||||
|
||||
/**
|
||||
* Indicates whether or not the local endpoint for this connection is the server.
|
||||
*/
|
||||
|
@ -23,6 +23,7 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
@ -36,12 +37,13 @@ import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Provides the default implementation for processing inbound frame events
|
||||
* and delegates to a {@link Http2FrameListener}
|
||||
* Provides the default implementation for processing inbound frame events and delegates to a
|
||||
* {@link Http2FrameListener}
|
||||
* <p>
|
||||
* This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
|
||||
* <p>
|
||||
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController}
|
||||
* This interface enforces inbound flow control functionality through
|
||||
* {@link Http2LocalFlowController}
|
||||
*/
|
||||
public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager {
|
||||
private final Http2ConnectionDecoder decoder;
|
||||
@ -254,14 +256,22 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
|
||||
* @param future the future after which to close the channel.
|
||||
*/
|
||||
@Override
|
||||
public void closeStream(Http2Stream stream, ChannelFuture future) {
|
||||
public void closeStream(final Http2Stream stream, ChannelFuture future) {
|
||||
stream.close();
|
||||
|
||||
// If this connection is closing and there are no longer any
|
||||
// active streams, close after the current operation completes.
|
||||
if (closeListener != null && connection().numActiveStreams() == 0) {
|
||||
future.addListener(closeListener);
|
||||
}
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
// Deactivate this stream.
|
||||
connection().deactivate(stream);
|
||||
|
||||
// If this connection is closing and there are no longer any
|
||||
// active streams, close after the current operation completes.
|
||||
if (closeListener != null && connection().numActiveStreams() == 0) {
|
||||
closeListener.operationComplete(future);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -27,7 +27,7 @@ public interface Http2LifecycleManager {
|
||||
|
||||
/**
|
||||
* Closes the local side of the given stream. If this causes the stream to be closed, adds a
|
||||
* hook to close the channel after the given future completes.
|
||||
* hook to deactivate the stream and close the channel after the given future completes.
|
||||
*
|
||||
* @param stream the stream to be half closed.
|
||||
* @param future If closing, the future after which to close the channel.
|
||||
@ -36,7 +36,7 @@ public interface Http2LifecycleManager {
|
||||
|
||||
/**
|
||||
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a
|
||||
* hook to close the channel after the given future completes.
|
||||
* hook to deactivate the stream and close the channel after the given future completes.
|
||||
*
|
||||
* @param stream the stream to be half closed.
|
||||
* @param future If closing, the future after which to close the channel.
|
||||
@ -44,8 +44,8 @@ public interface Http2LifecycleManager {
|
||||
void closeRemoteSide(Http2Stream stream, ChannelFuture future);
|
||||
|
||||
/**
|
||||
* Closes the given stream and adds a hook to close the channel after the given future
|
||||
* completes.
|
||||
* Closes the given stream and adds a hook to deactivate the stream and close the channel after
|
||||
* the given future completes.
|
||||
*
|
||||
* @param stream the stream to be closed.
|
||||
* @param future the future after which to close the channel.
|
||||
|
@ -76,41 +76,6 @@ public interface Http2Stream {
|
||||
*/
|
||||
Http2Stream closeRemoteSide();
|
||||
|
||||
/**
|
||||
* Indicates whether a frame with {@code END_STREAM} set was received from the remote endpoint
|
||||
* for this stream.
|
||||
*/
|
||||
boolean isEndOfStreamReceived();
|
||||
|
||||
/**
|
||||
* Sets the flag indicating that a frame with {@code END_STREAM} set was received from the
|
||||
* remote endpoint for this stream.
|
||||
*/
|
||||
Http2Stream endOfStreamReceived();
|
||||
|
||||
/**
|
||||
* Indicates whether a frame with {@code END_STREAM} set was sent to the remote endpoint for
|
||||
* this stream.
|
||||
*/
|
||||
boolean isEndOfStreamSent();
|
||||
|
||||
/**
|
||||
* Sets the flag indicating that a frame with {@code END_STREAM} set was sent to the remote
|
||||
* endpoint for this stream.
|
||||
*/
|
||||
Http2Stream endOfStreamSent();
|
||||
|
||||
/**
|
||||
* Indicates whether a {@code RST_STREAM} frame has been received from the remote endpoint for this stream.
|
||||
*/
|
||||
boolean isResetReceived();
|
||||
|
||||
/**
|
||||
* Sets the flag indicating that a {@code RST_STREAM} frame has been received from the remote endpoint
|
||||
* for this stream. This does not affect the stream state.
|
||||
*/
|
||||
Http2Stream resetReceived();
|
||||
|
||||
/**
|
||||
* Indicates whether a {@code RST_STREAM} frame has been sent from the local endpoint for this stream.
|
||||
*/
|
||||
@ -122,12 +87,6 @@ public interface Http2Stream {
|
||||
*/
|
||||
Http2Stream resetSent();
|
||||
|
||||
/**
|
||||
* Indicates whether or not this stream has been reset. This is a short form for
|
||||
* {@link #isResetSent()} || {@link #isResetReceived()}.
|
||||
*/
|
||||
boolean isReset();
|
||||
|
||||
/**
|
||||
* Indicates whether the remote side of this stream is open (i.e. the state is either
|
||||
* {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}).
|
||||
|
@ -225,7 +225,7 @@ public class DefaultHttp2ConnectionDecoderTest {
|
||||
final ByteBuf data = dummyData();
|
||||
try {
|
||||
decode().onDataRead(ctx, STREAM_ID, data, 10, true);
|
||||
verify(localFlow, never()).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
|
||||
verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
|
||||
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
|
||||
} finally {
|
||||
data.release();
|
||||
|
@ -39,6 +39,7 @@ import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
@ -50,11 +51,6 @@ import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelPromise;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
@ -63,6 +59,10 @@ import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Tests for {@link DefaultHttp2ConnectionEncoder}
|
||||
*/
|
||||
@ -239,7 +239,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void dataLargerThanMaxFrameSizeShouldBeSplit() throws Http2Exception {
|
||||
public void dataLargerThanMaxFrameSizeShouldBeSplit() {
|
||||
when(frameSizePolicy.maxFrameSize()).thenReturn(3);
|
||||
final ByteBuf data = dummyData();
|
||||
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
|
||||
@ -254,7 +254,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void paddingSplitOverFrame() throws Http2Exception {
|
||||
public void paddingSplitOverFrame() {
|
||||
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
|
||||
final ByteBuf data = dummyData();
|
||||
encoder.writeData(ctx, STREAM_ID, data, 5, true, promise);
|
||||
@ -272,7 +272,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void frameShouldSplitPadding() throws Http2Exception {
|
||||
public void frameShouldSplitPadding() {
|
||||
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
|
||||
ByteBuf data = dummyData();
|
||||
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
|
||||
@ -292,18 +292,18 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyFrameShouldSplitPadding() throws Http2Exception {
|
||||
public void emptyFrameShouldSplitPadding() {
|
||||
ByteBuf data = Unpooled.buffer(0);
|
||||
assertSplitPaddingOnEmptyBuffer(data);
|
||||
assertEquals(0, data.refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void singletonEmptyBufferShouldSplitPadding() throws Http2Exception {
|
||||
public void singletonEmptyBufferShouldSplitPadding() {
|
||||
assertSplitPaddingOnEmptyBuffer(Unpooled.EMPTY_BUFFER);
|
||||
}
|
||||
|
||||
private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Http2Exception {
|
||||
private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) {
|
||||
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
|
||||
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
|
||||
assertEquals(payloadCaptor.getValue().size(), 10);
|
||||
|
@ -273,7 +273,7 @@ public class DefaultHttp2LocalFlowControllerTest {
|
||||
controller.consumeBytes(ctx, stream(streamId), numBytes);
|
||||
}
|
||||
|
||||
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception {
|
||||
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) {
|
||||
verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user