Consolidating HTTP/2 stream state

Motivation:

Http2Stream has several methods that provide state information. We need
to simplify how state is used and consolidate as many of these fields as
possible.

Modifications:

Since we already have a concept of a stream being active or inactive,
I'm now separating the deactivation of a stream from the act of closing
it.  The reason for this is the case of sending a frame with
endOfStream=true. In this case we want to close the stream immediately
in order to disallow further writing, but we don't want to mark the
stream as inactive until the write has completed since the inactive
event triggers the flow controller to cancel any pending writes on the
stream.

With deactivation separated out, we are able to eliminate most of the
additional state methods with the exception of `isResetSent`.  This is
still required because we need to ignore inbound frames in this case (as
per the spec), since the remote endpoint may not yet know that the
stream has been closed.

Result:

Fixes #3382
This commit is contained in:
nmittler 2015-02-02 10:20:56 -08:00
parent 3030b4afe3
commit bc76bfa199
11 changed files with 104 additions and 213 deletions

View File

@ -139,6 +139,11 @@ public class DefaultHttp2Connection implements Http2Connection {
return Collections.unmodifiableSet(activeStreams); return Collections.unmodifiableSet(activeStreams);
} }
@Override
public void deactivate(Http2Stream stream) {
deactivateInternal((DefaultStream) stream);
}
@Override @Override
public Endpoint<Http2LocalFlowController> local() { public Endpoint<Http2LocalFlowController> local() {
return localEndpoint; return localEndpoint;
@ -195,7 +200,7 @@ public class DefaultHttp2Connection implements Http2Connection {
stream.parent().removeChild(stream); stream.parent().removeChild(stream);
} }
private void activate(DefaultStream stream) { private void activateInternal(DefaultStream stream) {
if (activeStreams.add(stream)) { if (activeStreams.add(stream)) {
// Update the number of active streams initiated by the endpoint. // Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams++; stream.createdBy().numActiveStreams++;
@ -207,6 +212,21 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
} }
private void deactivateInternal(DefaultStream stream) {
if (activeStreams.remove(stream)) {
// Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams--;
// Notify the listeners.
for (Listener listener : listeners) {
listener.streamInactive(stream);
}
// Mark this stream for removal.
removalPolicy.markForRemoval(stream);
}
}
/** /**
* Simple stream implementation. Streams can be compared to each other by priority. * Simple stream implementation. Streams can be compared to each other by priority.
*/ */
@ -218,9 +238,6 @@ public class DefaultHttp2Connection implements Http2Connection {
private IntObjectMap<DefaultStream> children = newChildMap(); private IntObjectMap<DefaultStream> children = newChildMap();
private int totalChildWeights; private int totalChildWeights;
private boolean resetSent; private boolean resetSent;
private boolean resetReceived;
private boolean endOfStreamSent;
private boolean endOfStreamReceived;
private PropertyMap data; private PropertyMap data;
DefaultStream(int id) { DefaultStream(int id) {
@ -238,39 +255,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return state; return state;
} }
@Override
public boolean isEndOfStreamReceived() {
return endOfStreamReceived;
}
@Override
public Http2Stream endOfStreamReceived() {
endOfStreamReceived = true;
return this;
}
@Override
public boolean isEndOfStreamSent() {
return endOfStreamSent;
}
@Override
public Http2Stream endOfStreamSent() {
endOfStreamSent = true;
return this;
}
@Override
public boolean isResetReceived() {
return resetReceived;
}
@Override
public Http2Stream resetReceived() {
resetReceived = true;
return this;
}
@Override @Override
public boolean isResetSent() { public boolean isResetSent() {
return resetSent; return resetSent;
@ -282,11 +266,6 @@ public class DefaultHttp2Connection implements Http2Connection {
return this; return this;
} }
@Override
public boolean isReset() {
return resetSent || resetReceived;
}
@Override @Override
public Object setProperty(Object key, Object value) { public Object setProperty(Object key, Object value) {
return data.put(key, value); return data.put(key, value);
@ -409,7 +388,7 @@ public class DefaultHttp2Connection implements Http2Connection {
throw streamError(id, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: " + state); throw streamError(id, PROTOCOL_ERROR, "Attempting to open a stream in an invalid state: " + state);
} }
activate(this); activateInternal(this);
return this; return this;
} }
@ -420,25 +399,10 @@ public class DefaultHttp2Connection implements Http2Connection {
} }
state = CLOSED; state = CLOSED;
deactivate(this); deactivateInternal(this);
// Mark this stream for removal.
removalPolicy.markForRemoval(this);
return this; return this;
} }
private void deactivate(DefaultStream stream) {
if (activeStreams.remove(stream)) {
// Update the number of active streams initiated by the endpoint.
stream.createdBy().numActiveStreams--;
// Notify the listeners.
for (Listener listener : listeners) {
listener.streamInactive(stream);
}
}
}
@Override @Override
public Http2Stream closeLocalSide() { public Http2Stream closeLocalSide() {
switch (state) { switch (state) {

View File

@ -27,12 +27,13 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List; import java.util.List;
/** /**
* Provides the default implementation for processing inbound frame events * Provides the default implementation for processing inbound frame events and delegates to a
* and delegates to a {@link Http2FrameListener} * {@link Http2FrameListener}
* <p> * <p>
* This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener} * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
* <p> * <p>
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController} * This interface enforces inbound flow control functionality through
* {@link Http2LocalFlowController}
*/ */
public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder { public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
private final Http2FrameListener internalFrameListener = new FrameReadListener(); private final Http2FrameListener internalFrameListener = new FrameReadListener();
@ -215,25 +216,22 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// Check if we received a data frame for a stream which is half-closed // Check if we received a data frame for a stream which is half-closed
Http2Stream stream = connection.requireStream(streamId); Http2Stream stream = connection.requireStream(streamId);
verifyEndOfStreamNotReceived(stream);
verifyGoAwayNotReceived(); verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
// We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a // We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a
// lower stream ID. // lower stream ID.
boolean shouldApplyFlowControl = false;
boolean shouldIgnore = shouldIgnoreFrame(stream, false); boolean shouldIgnore = shouldIgnoreFrame(stream, false);
Http2Exception error = null; Http2Exception error = null;
switch (stream.state()) { switch (stream.state()) {
case OPEN: case OPEN:
case HALF_CLOSED_LOCAL: case HALF_CLOSED_LOCAL:
shouldApplyFlowControl = true;
break; break;
case HALF_CLOSED_REMOTE: case HALF_CLOSED_REMOTE:
// Always fail the stream if we've more data after the remote endpoint half-closed.
error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
stream.id(), stream.state());
break;
case CLOSED: case CLOSED:
if (stream.isResetSent()) {
shouldApplyFlowControl = true;
}
if (!shouldIgnore) { if (!shouldIgnore) {
error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s", error = streamError(stream.id(), STREAM_CLOSED, "Stream %d in unexpected state: %s",
stream.id(), stream.state()); stream.id(), stream.state());
@ -252,11 +250,9 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Http2LocalFlowController flowController = flowController(); Http2LocalFlowController flowController = flowController();
try { try {
// If we should apply flow control, do so now. // If we should apply flow control, do so now.
if (shouldApplyFlowControl) {
flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream); flowController.receiveFlowControlledFrame(ctx, stream, data, padding, endOfStream);
// Update the unconsumed bytes after flow control is applied. // Update the unconsumed bytes after flow control is applied.
unconsumedBytes = unconsumedBytes(stream); unconsumedBytes = unconsumedBytes(stream);
}
// If we should ignore this frame, do so now. // If we should ignore this frame, do so now.
if (shouldIgnore) { if (shouldIgnore) {
@ -288,12 +284,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
throw e; throw e;
} finally { } finally {
// If appropriate, returned the processed bytes to the flow controller. // If appropriate, returned the processed bytes to the flow controller.
if (shouldApplyFlowControl && bytesToReturn > 0) { if (bytesToReturn > 0) {
flowController.consumeBytes(ctx, stream, bytesToReturn); flowController.consumeBytes(ctx, stream, bytesToReturn);
} }
if (endOfStream) { if (endOfStream) {
stream.endOfStreamReceived();
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
} }
} }
@ -321,7 +316,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Http2Stream stream = connection.stream(streamId); Http2Stream stream = connection.stream(streamId);
verifyGoAwayNotReceived(); verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (shouldIgnoreFrame(stream, false)) { if (shouldIgnoreFrame(stream, false)) {
// Ignore this frame. // Ignore this frame.
return; return;
@ -330,8 +324,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
if (stream == null) { if (stream == null) {
stream = connection.createRemoteStream(streamId).open(endOfStream); stream = connection.createRemoteStream(streamId).open(endOfStream);
} else { } else {
verifyEndOfStreamNotReceived(stream);
switch (stream.state()) { switch (stream.state()) {
case RESERVED_REMOTE: case RESERVED_REMOTE:
case IDLE: case IDLE:
@ -360,7 +352,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// If the headers completes this stream, close it. // If the headers completes this stream, close it.
if (endOfStream) { if (endOfStream) {
stream.endOfStreamReceived();
lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture()); lifecycleManager.closeRemoteSide(stream, ctx.newSucceededFuture());
} }
} }
@ -395,14 +386,11 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
verifyPrefaceReceived(); verifyPrefaceReceived();
Http2Stream stream = connection.requireStream(streamId); Http2Stream stream = connection.requireStream(streamId);
verifyRstStreamNotReceived(stream);
if (stream.state() == CLOSED) { if (stream.state() == CLOSED) {
// RstStream frames must be ignored for closed streams. // RstStream frames must be ignored for closed streams.
return; return;
} }
stream.resetReceived();
listener.onRstStreamRead(ctx, streamId, errorCode); listener.onRstStreamRead(ctx, streamId, errorCode);
lifecycleManager.closeStream(stream, ctx.newSucceededFuture()); lifecycleManager.closeStream(stream, ctx.newSucceededFuture());
@ -503,12 +491,23 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Http2Stream parentStream = connection.requireStream(streamId); Http2Stream parentStream = connection.requireStream(streamId);
verifyGoAwayNotReceived(); verifyGoAwayNotReceived();
verifyRstStreamNotReceived(parentStream);
if (shouldIgnoreFrame(parentStream, false)) { if (shouldIgnoreFrame(parentStream, false)) {
// Ignore frames for any stream created after we sent a go-away. // Ignore frames for any stream created after we sent a go-away.
return; return;
} }
switch (parentStream.state()) {
case OPEN:
case HALF_CLOSED_LOCAL:
// Allowed to receive push promise in these states.
break;
default:
// Connection error.
throw connectionError(PROTOCOL_ERROR,
"Stream %d in unexpected state for receiving push promise: %s",
parentStream.id(), parentStream.state());
}
// Reserve the push stream based with a priority based on the current stream's priority. // Reserve the push stream based with a priority based on the current stream's priority.
connection.remote().reservePushStream(promisedStreamId, parentStream); connection.remote().reservePushStream(promisedStreamId, parentStream);
@ -531,7 +530,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
Http2Stream stream = connection.requireStream(streamId); Http2Stream stream = connection.requireStream(streamId);
verifyGoAwayNotReceived(); verifyGoAwayNotReceived();
verifyRstStreamNotReceived(stream);
if (stream.state() == CLOSED || shouldIgnoreFrame(stream, false)) { if (stream.state() == CLOSED || shouldIgnoreFrame(stream, false)) {
// Ignore frames for any stream created after we sent a go-away. // Ignore frames for any stream created after we sent a go-away.
return; return;
@ -565,18 +563,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
return stream != null && !allowResetSent && stream.isResetSent(); return stream != null && !allowResetSent && stream.isResetSent();
} }
/**
* Verifies that a frame has not been received from remote endpoint with the
* {@code END_STREAM} flag set. If it was, throws a connection error.
*/
private void verifyEndOfStreamNotReceived(Http2Stream stream) throws Http2Exception {
if (stream.isEndOfStreamReceived()) {
// Connection error.
throw new Http2Exception(STREAM_CLOSED, String.format(
"Received frame for stream %d after receiving END_STREAM", stream.id()));
}
}
/** /**
* Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws a * Verifies that a GO_AWAY frame was not previously received from the remote endpoint. If it was, throws a
* connection error. * connection error.
@ -587,17 +573,5 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
throw connectionError(PROTOCOL_ERROR, "Received frames after receiving GO_AWAY"); throw connectionError(PROTOCOL_ERROR, "Received frames after receiving GO_AWAY");
} }
} }
/**
* Verifies that a RST_STREAM frame was not previously received for the given stream. If it was, throws a
* stream error.
*/
private void verifyRstStreamNotReceived(Http2Stream stream) throws Http2Exception {
if (stream != null && stream.isResetReceived()) {
// Stream error.
throw streamError(stream.id(), STREAM_CLOSED,
"Frame received after receiving RST_STREAM for stream: " + stream.id());
}
}
} }
} }

View File

@ -155,12 +155,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
} }
stream = connection.requireStream(streamId); stream = connection.requireStream(streamId);
if (stream.isResetSent()) {
throw new IllegalStateException("Sending data after sending RST_STREAM.");
}
if (stream.isEndOfStreamSent()) {
throw new IllegalStateException("Sending data after sending END_STREAM.");
}
// Verify that the stream is in the appropriate state for sending DATA frames. // Verify that the stream is in the appropriate state for sending DATA frames.
switch (stream.state()) { switch (stream.state()) {
@ -174,8 +168,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
} }
if (endOfStream) { if (endOfStream) {
// Indicate that we have sent END_STREAM. lifecycleManager.closeLocalSide(stream, promise);
stream.endOfStreamSent();
} }
} catch (Throwable e) { } catch (Throwable e) {
data.release(); data.release();
@ -206,10 +199,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
Http2Stream stream = connection.stream(streamId); Http2Stream stream = connection.stream(streamId);
if (stream == null) { if (stream == null) {
stream = connection.createLocalStream(streamId); stream = connection.createLocalStream(streamId);
} else if (stream.isResetSent()) {
throw new IllegalStateException("Sending headers after sending RST_STREAM.");
} else if (stream.isEndOfStreamSent()) {
throw new IllegalStateException("Sending headers after sending END_STREAM.");
} }
switch (stream.state()) { switch (stream.state()) {
@ -231,9 +220,7 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight, new FlowControlledHeaders(ctx, stream, headers, streamDependency, weight,
exclusive, padding, endOfStream, promise)); exclusive, padding, endOfStream, promise));
if (endOfStream) { if (endOfStream) {
// Flag delivery of EOS synchronously to prevent subsequent frames being enqueued in the flow lifecycleManager.closeLocalSide(stream, promise);
// controller.
stream.endOfStreamSent();
} }
return promise; return promise;
} catch (Http2NoMoreStreamIdsException e) { } catch (Http2NoMoreStreamIdsException e) {
@ -556,10 +543,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
if (future == promise && endOfStream) {
// Special case where we're listening to the original promise and need to close the stream.
lifecycleManager.closeLocalSide(stream, promise);
}
if (!future.isSuccess()) { if (!future.isSuccess()) {
error(future.cause()); error(future.cause());
} }

View File

@ -71,15 +71,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
state(stream).window(initialWindowSize); state(stream).window(initialWindowSize);
} }
@Override
public void streamHalfClosed(Http2Stream stream) {
if (!stream.localSideOpen()) {
// Any pending frames can never be written, clear and
// write errors for any pending frames.
state(stream).clear();
}
}
@Override @Override
public void streamInactive(Http2Stream stream) { public void streamInactive(Http2Stream stream) {
// Any pending frames can never be written, clear and // Any pending frames can never be written, clear and
@ -212,7 +203,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/** /**
* Writes as many pending bytes as possible, according to stream priority. * Writes as many pending bytes as possible, according to stream priority.
*/ */
private void writePendingBytes() throws Http2Exception { private void writePendingBytes() {
Http2Stream connectionStream = connection.connectionStream(); Http2Stream connectionStream = connection.connectionStream();
int connectionWindow = state(connectionStream).window(); int connectionWindow = state(connectionStream).window();
@ -390,10 +381,11 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
/** /**
* Returns the number of pending bytes for this node that will fit within the {@link #window}. This is used for * Returns the number of pending bytes for this node that will fit within the
* the priority algorithm to determine the aggregate total for {@link #priorityBytes} at each node. Each node * {@link #window}. This is used for the priority algorithm to determine the aggregate
* only takes into account it's stream window so that when a change occurs to the connection window, these * number of bytes that can be written at each node. Each node only takes into account its
* values need not change (i.e. no tree traversal is required). * stream window so that when a change occurs to the connection window, these values need
* not change (i.e. no tree traversal is required).
*/ */
int streamableBytes() { int streamableBytes() {
return max(0, min(pendingBytes, window)); return max(0, min(pendingBytes, window));

View File

@ -121,9 +121,8 @@ public interface Http2Connection {
* <li>The connection is marked as going away.</li> * <li>The connection is marked as going away.</li>
* </ul> * </ul>
* <p> * <p>
* The caller is expected to {@link Http2Stream#open()} the stream. * The caller is expected to {@link Http2Stream#open(boolean)} the stream.
* @param streamId The ID of the stream * @param streamId The ID of the stream
* @see Http2Stream#open()
* @see Http2Stream#open(boolean) * @see Http2Stream#open(boolean)
*/ */
Http2Stream createStream(int streamId) throws Http2Exception; Http2Stream createStream(int streamId) throws Http2Exception;
@ -232,16 +231,26 @@ public interface Http2Connection {
Http2Stream connectionStream(); Http2Stream connectionStream();
/** /**
* Gets the number of streams that are currently either open or half-closed. * Gets the number of streams that actively in use. It is possible for a stream to be closed
* but still be considered active (e.g. there is still pending data to be written).
*/ */
int numActiveStreams(); int numActiveStreams();
/** /**
* Gets all streams that are currently either open or half-closed. The returned collection is * Gets all streams that are actively in use. The returned collection is
* sorted by priority. * sorted by priority.
*/ */
Collection<Http2Stream> activeStreams(); Collection<Http2Stream> activeStreams();
/**
* Indicates that the given stream is no longer actively in use. If this stream was active,
* after calling this method it will no longer appear in the list returned by
* {@link #activeStreams()} and {@link #numActiveStreams()} will be decremented. In addition,
* all listeners will be notified of this event via
* {@link Listener#streamInactive(Http2Stream)}.
*/
void deactivate(Http2Stream stream);
/** /**
* Indicates whether or not the local endpoint for this connection is the server. * Indicates whether or not the local endpoint for this connection is the server.
*/ */

View File

@ -23,6 +23,7 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.handler.codec.http2.Http2Exception.isStreamError; import static io.netty.handler.codec.http2.Http2Exception.isStreamError;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -38,12 +39,13 @@ import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
* Provides the default implementation for processing inbound frame events * Provides the default implementation for processing inbound frame events and delegates to a
* and delegates to a {@link Http2FrameListener} * {@link Http2FrameListener}
* <p> * <p>
* This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener} * This class will read HTTP/2 frames and delegate the events to a {@link Http2FrameListener}
* <p> * <p>
* This interface enforces inbound flow control functionality through {@link Http2InboundFlowController} * This interface enforces inbound flow control functionality through
* {@link Http2LocalFlowController}
*/ */
public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager, public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http2LifecycleManager,
ChannelOutboundHandler { ChannelOutboundHandler {
@ -293,15 +295,23 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
* @param future the future after which to close the channel. * @param future the future after which to close the channel.
*/ */
@Override @Override
public void closeStream(Http2Stream stream, ChannelFuture future) { public void closeStream(final Http2Stream stream, ChannelFuture future) {
stream.close(); stream.close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Deactivate this stream.
connection().deactivate(stream);
// If this connection is closing and there are no longer any // If this connection is closing and there are no longer any
// active streams, close after the current operation completes. // active streams, close after the current operation completes.
if (closeListener != null && connection().numActiveStreams() == 0) { if (closeListener != null && connection().numActiveStreams() == 0) {
future.addListener(closeListener); closeListener.operationComplete(future);
} }
} }
});
}
/** /**
* Central handler for all exceptions caught during HTTP/2 processing. * Central handler for all exceptions caught during HTTP/2 processing.

View File

@ -27,7 +27,7 @@ public interface Http2LifecycleManager {
/** /**
* Closes the local side of the given stream. If this causes the stream to be closed, adds a * Closes the local side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes. * hook to deactivate the stream and close the channel after the given future completes.
* *
* @param stream the stream to be half closed. * @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel. * @param future If closing, the future after which to close the channel.
@ -36,7 +36,7 @@ public interface Http2LifecycleManager {
/** /**
* Closes the remote side of the given stream. If this causes the stream to be closed, adds a * Closes the remote side of the given stream. If this causes the stream to be closed, adds a
* hook to close the channel after the given future completes. * hook to deactivate the stream and close the channel after the given future completes.
* *
* @param stream the stream to be half closed. * @param stream the stream to be half closed.
* @param future If closing, the future after which to close the channel. * @param future If closing, the future after which to close the channel.
@ -44,8 +44,8 @@ public interface Http2LifecycleManager {
void closeRemoteSide(Http2Stream stream, ChannelFuture future); void closeRemoteSide(Http2Stream stream, ChannelFuture future);
/** /**
* Closes the given stream and adds a hook to close the channel after the given future * Closes the given stream and adds a hook to deactivate the stream and close the channel after
* completes. * the given future completes.
* *
* @param stream the stream to be closed. * @param stream the stream to be closed.
* @param future the future after which to close the channel. * @param future the future after which to close the channel.

View File

@ -76,41 +76,6 @@ public interface Http2Stream {
*/ */
Http2Stream closeRemoteSide(); Http2Stream closeRemoteSide();
/**
* Indicates whether a frame with {@code END_STREAM} set was received from the remote endpoint
* for this stream.
*/
boolean isEndOfStreamReceived();
/**
* Sets the flag indicating that a frame with {@code END_STREAM} set was received from the
* remote endpoint for this stream.
*/
Http2Stream endOfStreamReceived();
/**
* Indicates whether a frame with {@code END_STREAM} set was sent to the remote endpoint for
* this stream.
*/
boolean isEndOfStreamSent();
/**
* Sets the flag indicating that a frame with {@code END_STREAM} set was sent to the remote
* endpoint for this stream.
*/
Http2Stream endOfStreamSent();
/**
* Indicates whether a {@code RST_STREAM} frame has been received from the remote endpoint for this stream.
*/
boolean isResetReceived();
/**
* Sets the flag indicating that a {@code RST_STREAM} frame has been received from the remote endpoint
* for this stream. This does not affect the stream state.
*/
Http2Stream resetReceived();
/** /**
* Indicates whether a {@code RST_STREAM} frame has been sent from the local endpoint for this stream. * Indicates whether a {@code RST_STREAM} frame has been sent from the local endpoint for this stream.
*/ */
@ -122,12 +87,6 @@ public interface Http2Stream {
*/ */
Http2Stream resetSent(); Http2Stream resetSent();
/**
* Indicates whether or not this stream has been reset. This is a short form for
* {@link #isResetSent()} || {@link #isResetReceived()}.
*/
boolean isReset();
/** /**
* Indicates whether the remote side of this stream is open (i.e. the state is either * Indicates whether the remote side of this stream is open (i.e. the state is either
* {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}). * {@link State#OPEN} or {@link State#HALF_CLOSED_LOCAL}).

View File

@ -225,7 +225,7 @@ public class DefaultHttp2ConnectionDecoderTest {
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
try { try {
decode().onDataRead(ctx, STREAM_ID, data, 10, true); decode().onDataRead(ctx, STREAM_ID, data, 10, true);
verify(localFlow, never()).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true)); verify(localFlow).receiveFlowControlledFrame(eq(ctx), eq(stream), eq(data), eq(10), eq(true));
verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean()); verify(listener, never()).onDataRead(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(), anyBoolean());
} finally { } finally {
data.release(); data.release();

View File

@ -39,6 +39,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
@ -50,11 +51,6 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise; import io.netty.channel.DefaultChannelPromise;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
@ -63,6 +59,10 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** /**
* Tests for {@link DefaultHttp2ConnectionEncoder} * Tests for {@link DefaultHttp2ConnectionEncoder}
*/ */
@ -239,7 +239,7 @@ public class DefaultHttp2ConnectionEncoderTest {
} }
@Test @Test
public void dataLargerThanMaxFrameSizeShouldBeSplit() throws Http2Exception { public void dataLargerThanMaxFrameSizeShouldBeSplit() {
when(frameSizePolicy.maxFrameSize()).thenReturn(3); when(frameSizePolicy.maxFrameSize()).thenReturn(3);
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise); encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
@ -254,7 +254,7 @@ public class DefaultHttp2ConnectionEncoderTest {
} }
@Test @Test
public void paddingSplitOverFrame() throws Http2Exception { public void paddingSplitOverFrame() {
when(frameSizePolicy.maxFrameSize()).thenReturn(5); when(frameSizePolicy.maxFrameSize()).thenReturn(5);
final ByteBuf data = dummyData(); final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 5, true, promise); encoder.writeData(ctx, STREAM_ID, data, 5, true, promise);
@ -272,7 +272,7 @@ public class DefaultHttp2ConnectionEncoderTest {
} }
@Test @Test
public void frameShouldSplitPadding() throws Http2Exception { public void frameShouldSplitPadding() {
when(frameSizePolicy.maxFrameSize()).thenReturn(5); when(frameSizePolicy.maxFrameSize()).thenReturn(5);
ByteBuf data = dummyData(); ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise); encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
@ -292,18 +292,18 @@ public class DefaultHttp2ConnectionEncoderTest {
} }
@Test @Test
public void emptyFrameShouldSplitPadding() throws Http2Exception { public void emptyFrameShouldSplitPadding() {
ByteBuf data = Unpooled.buffer(0); ByteBuf data = Unpooled.buffer(0);
assertSplitPaddingOnEmptyBuffer(data); assertSplitPaddingOnEmptyBuffer(data);
assertEquals(0, data.refCnt()); assertEquals(0, data.refCnt());
} }
@Test @Test
public void singletonEmptyBufferShouldSplitPadding() throws Http2Exception { public void singletonEmptyBufferShouldSplitPadding() {
assertSplitPaddingOnEmptyBuffer(Unpooled.EMPTY_BUFFER); assertSplitPaddingOnEmptyBuffer(Unpooled.EMPTY_BUFFER);
} }
private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) throws Http2Exception { private void assertSplitPaddingOnEmptyBuffer(ByteBuf data) {
when(frameSizePolicy.maxFrameSize()).thenReturn(5); when(frameSizePolicy.maxFrameSize()).thenReturn(5);
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise); encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 10); assertEquals(payloadCaptor.getValue().size(), 10);

View File

@ -273,7 +273,7 @@ public class DefaultHttp2LocalFlowControllerTest {
controller.consumeBytes(ctx, stream(streamId), numBytes); controller.consumeBytes(ctx, stream(streamId), numBytes);
} }
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) throws Http2Exception { private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) {
verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise)); verify(frameWriter).writeWindowUpdate(eq(ctx), eq(streamId), eq(windowSizeIncrement), eq(promise));
} }