Remove the concept of pending streams. The close future can only be accessed once a stream is active.

This commit is contained in:
buchgr 2016-10-19 12:12:14 +02:00 committed by Norman Maurer
parent 5380c7c3e3
commit 3a2b462a67
5 changed files with 93 additions and 143 deletions

View File

@ -23,8 +23,10 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelPromiseNotifier;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.handler.codec.UnsupportedMessageTypeException;
@ -136,7 +138,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
*
* <p>The HTTP/2 standard allows for an endpoint to limit the maximum number of concurrently active streams via the
* {@code SETTINGS_MAX_CONCURRENT_STREAMS} setting. When this limit is reached, no new streams can be created. However,
* the {@link Http2FrameCodec} can be build with {@link Http2FrameCodecBuilder#bufferOutgoingStreams} enabled, in which
* the {@link Http2FrameCodec} can be build with {@link Http2FrameCodecBuilder#bufferOutboundStreams} enabled, in which
* case a new stream and its associated frames will be buffered until either the limit is increased or an active
* stream is closed. It's, however, possible that a buffered stream will never become active. That is, the channel might
* get closed or a GO_AWAY frame might be received. In the first case, all writes of buffered streams will fail with a
@ -206,9 +208,6 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
/**
* Creates a new outbound/local stream.
*
* <p>The object is added to a list of idle streams, so that in case the stream object is never made active, the
* {@link Http2Stream2#closeFuture()} still completes.
*
* <p>This method may only be called after the handler has been added to a {@link io.netty.channel.ChannelPipeline}.
*
* <p>This method is thread-safe.
@ -244,11 +243,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
throw new IllegalStateException("Channel handler not added to a channel pipeline.");
}
Http2Stream2Impl stream = new Http2Stream2Impl(ctx0.channel());
addPendingStream(stream);
return stream;
return new Http2Stream2Impl(ctx0.channel());
}
/**
@ -269,10 +264,8 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
* stream is active without a {@link Http2Stream2} object attached, as it's set in a listener of
* the HEADERS frame write.
*/
stream2 = findPendingStream(stream.id());
if (stream2 == null) {
throw new AssertionError("All active streams must have a stream object attached.");
}
// TODO(buchgr): Remove once Http2Stream2 and Http2Stream are merged.
return true;
}
try {
return streamVisitor.visit(stream2);
@ -306,13 +299,11 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
cleanupPendingStreams();
ctx.pipeline().remove(http2Handler);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
cleanupPendingStreams();
super.channelInactive(ctx);
}
@ -473,6 +464,10 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
// Set the stream id before completing the promise, as any listener added by a user will be executed
// before the below listener, and so the stream identifier is accessible in a user's listener.
stream.id(streamId);
// Ensure that the listener gets executed before any listeners a user might have attached.
// TODO(buchgr): Once Http2Stream2 and Http2Stream are merged this is no longer necessary.
ChannelPromiseNotifier promiseNotifier = new ChannelPromiseNotifier(promise);
promise = ctx.newPromise();
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
@ -481,13 +476,13 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
Http2Stream connectionStream = connection.stream(streamId);
if (future.isSuccess() && connectionStream != null) {
connectionStream.setProperty(streamKey, stream);
stream.legacyStream = connectionStream;
} else {
stream.setClosed();
}
removePendingStream(stream);
}
});
promise.addListener(promiseNotifier);
}
http2Handler.encoder().writeHeaders(http2HandlerCtx, streamId, headersFrame.headers(), headersFrame.padding(),
headersFrame.endStream(), promise);
@ -501,7 +496,9 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
return;
}
stream.setProperty(streamKey, new Http2Stream2Impl(ctx.channel()).id(stream.id()));
Http2Stream2Impl stream2 = new Http2Stream2Impl(ctx.channel()).id(stream.id());
stream2.legacyStream = stream;
stream.setProperty(streamKey, stream2);
}
@Override
@ -530,7 +527,7 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
}
/**
* Exceptions for streams unknown streams, that is streams that have no {@link Http2Stream2} object attached
* Exceptions for unknown streams, that is streams that have no {@link Http2Stream2} object attached
* are simply logged and replied to by sending a RST_STREAM frame. There is not much value in propagating such
* exceptions through the pipeline, as a user will not have any additional information / state about this
* stream and thus can't do any meaningful error handling.
@ -541,26 +538,21 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
int streamId = streamException.streamId();
Http2Stream connectionStream = connection().stream(streamId);
if (connectionStream == null) {
Http2Stream2 stream2 = findPendingStream(streamId);
if (stream2 == null) {
LOG.warn("Stream exception thrown for unkown stream.", cause);
// Write a RST_STREAM
super.onStreamError(ctx, cause, streamException);
return;
}
fireHttp2Stream2Exception(stream2, streamException.error(), cause);
} else {
Http2Stream2 stream2 = connectionStream.getProperty(streamKey);
if (stream2 == null) {
LOG.warn("Stream exception thrown without stream object attached.", cause);
// Write a RST_STREAM
super.onStreamError(ctx, cause, streamException);
return;
}
fireHttp2Stream2Exception(stream2, streamException.error(), cause);
LOG.warn("Stream exception thrown for unkown stream.", cause);
// Write a RST_STREAM
super.onStreamError(ctx, cause, streamException);
return;
}
Http2Stream2 stream2 = connectionStream.getProperty(streamKey);
if (stream2 == null) {
LOG.warn("Stream exception thrown without stream object attached.", cause);
// Write a RST_STREAM
super.onStreamError(ctx, cause, streamException);
return;
}
fireHttp2Stream2Exception(stream2, streamException.error(), cause);
}
@Override
@ -638,13 +630,12 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
/**
* {@link Http2Stream2} implementation.
*/
// TODO(buchgr): Merge Http2Stream2 and Http2Stream.
static final class Http2Stream2Impl extends DefaultChannelPromise implements Http2Stream2 {
private Http2Stream2Impl prev;
private Http2Stream2Impl next;
private volatile int id = -1;
private volatile Object managedState;
private volatile Http2Stream legacyStream;
Http2Stream2Impl(Channel channel) {
super(channel);
@ -676,8 +667,19 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
return managedState;
}
@Override
public State state() {
Http2Stream stream0 = legacyStream;
return stream0 == null
? State.IDLE
: stream0.state();
}
@Override
public ChannelFuture closeFuture() {
if (state() == State.IDLE) {
throw new IllegalStateException("This method may not be called on IDLE streams.");
}
return this;
}
@ -720,63 +722,4 @@ public class Http2FrameCodec extends ChannelDuplexHandler {
return String.valueOf(id);
}
}
private void addPendingStream(Http2Stream2Impl stream) {
synchronized (lock) {
if (pendingOutboundStreamsTail == null) {
pendingOutboundStreamsTail = stream;
return;
}
pendingOutboundStreamsTail.next = stream;
stream.prev = pendingOutboundStreamsTail;
}
}
private void removePendingStream(Http2Stream2Impl stream) {
try {
synchronized (lock) {
if (pendingOutboundStreamsTail == null) {
return;
}
if (pendingOutboundStreamsTail == stream) {
pendingOutboundStreamsTail = null;
}
stream.prev = stream.next;
if (stream.next != null) {
stream.next.prev = stream.prev;
}
}
} finally {
// Avoid GC nepotism
stream.next = null;
stream.prev = null;
}
}
private Http2Stream2 findPendingStream(int streamId) {
if (isOutboundStream(server, streamId)) {
synchronized (lock) {
Http2Stream2Impl idleStream = pendingOutboundStreamsTail;
while (idleStream != null) {
if (idleStream.id() == streamId) {
return idleStream;
}
idleStream = idleStream.prev;
}
}
}
return null;
}
private void cleanupPendingStreams() {
synchronized (lock) {
while (pendingOutboundStreamsTail != null) {
pendingOutboundStreamsTail.setClosed();
pendingOutboundStreamsTail = pendingOutboundStreamsTail.prev;
}
}
}
}

View File

@ -174,6 +174,14 @@ public class Http2MultiplexCodec extends Http2ChannelDuplexHandler {
childChannel = requireChildChannel(stream);
}
stream.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
childChannel.streamClosedWithoutError = true;
childChannel.fireChildRead(AbstractHttp2StreamChannel.CLOSE_MESSAGE);
}
});
assert !childChannel.isWritable();
childChannel.incrementOutboundFlowControlWindow(initialOutboundStreamWindow);
childChannel.pipeline().fireChannelWritabilityChanged();
@ -329,13 +337,6 @@ public class Http2MultiplexCodec extends Http2ChannelDuplexHandler {
Http2StreamChannel(Channel parentChannel, Http2Stream2 stream) {
super(parentChannel, stream);
stream.managedState(this);
stream.closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
streamClosedWithoutError = true;
fireChildRead(CLOSE_MESSAGE);
}
});
}
@Override

View File

@ -17,6 +17,7 @@
package io.netty.handler.codec.http2;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.internal.UnstableApi;
/**
@ -50,6 +51,11 @@ public interface Http2Stream2 {
throw new UnsupportedOperationException();
}
@Override
public State state() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture closeFuture() {
throw new UnsupportedOperationException();
@ -85,10 +91,17 @@ public interface Http2Stream2 {
Object managedState();
/**
* A {@link ChannelFuture} that will complete when a stream or the channel are closed (whatever happens first).
* Returns the state of this stream.
*/
State state();
/**
* A {@link ChannelFuture} that will complete when the stream or the {@link io.netty.channel.Channel} are closed
* (whatever happens first).
*
* <p>The {@link ChannelFuture} is guaranteed to be completed eventually, even if the stream never became active,
* and will always succeed.
* <p><strong>NOTE:</strong> It's not safe to call this method on a stream in {@link State#IDLE} state.
*
* @throws IllegalStateException if this method is called on a stream in {@link State#IDLE} state.
*/
ChannelFuture closeFuture();
}

View File

@ -47,6 +47,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.handler.codec.http2.Http2CodecUtil.isStreamIdValid;
import static io.netty.handler.codec.http2.Http2Stream2.CONNECTION_STREAM;
@ -440,20 +441,6 @@ public class Http2FrameCodecTest {
verify(frameWriter).writeSettings(eq(http2HandlerCtx), same(settings), any(ChannelPromise.class));
}
@Test(timeout = 1000)
public void createAndCloseIdleStreamObject() {
Http2Stream2 stream = frameCodec.newStream();
assertNotNull(stream);
assertFalse(isStreamIdValid(stream.id()));
assertFalse(stream.closeFuture().isDone());
assertFalse(stream.closeFuture().isCancellable());
channel.close().syncUninterruptibly();
assertTrue(stream.closeFuture().isDone());
}
@Test(timeout = 1000)
public void newOutboundStream() {
final Http2Stream2 stream = frameCodec.newStream();
@ -511,25 +498,6 @@ public class Http2FrameCodecTest {
assertTrue(promise2.syncUninterruptibly().isSuccess());
}
@Test
public void closeFutureShouldCompleteIfStreamFailsToBecomeActive() throws Exception {
setUp(Http2FrameCodecBuilder.forServer().bufferOutboundStreams(true),
new Http2Settings().maxConcurrentStreams(0));
Http2Stream2 stream = frameCodec.newStream();
ChannelPromise promise = channel.newPromise();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream), promise);
assertTrue(isStreamIdValid(stream.id()));
assertFalse(promise.isDone());
assertFalse(stream.closeFuture().isDone());
promise.setFailure(new Exception());
assertTrue(stream.closeFuture().isDone());
}
@Test
public void streamIdentifiersExhausted() throws Http2Exception {
int maxServerStreamId = Integer.MAX_VALUE - 1;
@ -657,6 +625,25 @@ public class Http2FrameCodecTest {
assertEquals(expectedStreams, activeStreams);
}
@Test
public void streamShouldBeOpenInListener() {
final Http2Stream2 stream2 = frameCodec.newStream();
assertEquals(State.IDLE, stream2.state());
final AtomicBoolean listenerExecuted = new AtomicBoolean();
channel.writeAndFlush(new DefaultHttp2HeadersFrame(new DefaultHttp2Headers()).stream(stream2))
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assertTrue(future.isSuccess());
assertEquals(State.OPEN, stream2.state());
listenerExecuted.set(true);
}
});
assertTrue(listenerExecuted.get());
}
private static ChannelPromise anyChannelPromise() {
return any(ChannelPromise.class);
}

View File

@ -30,6 +30,7 @@ import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http2.Http2Exception.StreamException;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.AsciiString;
import io.netty.util.AttributeKey;
@ -636,6 +637,11 @@ public class Http2MultiplexCodecTest {
return managedState;
}
@Override
public State state() {
return State.OPEN;
}
@Override
public ChannelFuture closeFuture() {
return closeFuture;