From 74dd7f85caa2d70a89049091dc8c59b233ce0248 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 1 Jun 2015 17:24:49 -0700 Subject: [PATCH] HTTP/2 Thread Context Interface Clarifications Motivation: It is currently assumed that all usages of the HTTP/2 codec will be from the same event loop context. If the methods are used outside of the assumed thread context then unexpected behavior is observed. This assumption should be more clearly communicated and enforced in key areas. Modifications: - The flow controller interfaces have assert statements and updated javadocs indicating the assumptions. Result: Interfaces more clearly indicate thread context limitations. --- .../DefaultHttp2LocalFlowController.java | 19 +++++++++++++++---- .../DefaultHttp2RemoteFlowController.java | 7 +++++++ .../DefaultHttp2LocalFlowControllerTest.java | 8 +++++++- .../DefaultHttp2RemoteFlowControllerTest.java | 6 ++++++ .../http2/StreamBufferingEncoderTest.java | 6 ++++++ 5 files changed, 41 insertions(+), 5 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java index 1e726ede21..11fada19aa 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowController.java @@ -34,6 +34,9 @@ import io.netty.util.internal.PlatformDependent; /** * Basic implementation of {@link Http2LocalFlowController}. + *

+ * This class is NOT thread safe. The assumption is all methods must be invoked from a single thread. + * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class. */ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController { /** @@ -46,8 +49,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController private final Http2FrameWriter frameWriter; private final Http2Connection.PropertyKey stateKey; private ChannelHandlerContext ctx; - private volatile float windowUpdateRatio; - private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE; + private float windowUpdateRatio; + private int initialWindowSize = DEFAULT_WINDOW_SIZE; public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO); @@ -110,6 +113,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void initialWindowSize(int newWindowSize) throws Http2Exception { + assert ctx == null || ctx.executor().inEventLoop(); int delta = newWindowSize - initialWindowSize; initialWindowSize = newWindowSize; @@ -135,6 +139,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { + assert ctx != null && ctx.executor().inEventLoop(); FlowState state = state(stream); // Just add the delta to the stream-specific initial window size so that the next time the window // expands it will grow to the new initial size. @@ -144,6 +149,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception { + assert ctx != null && ctx.executor().inEventLoop(); if (numBytes < 0) { throw new IllegalArgumentException("numBytes must not be negative"); } @@ -184,6 +190,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController * @throws IllegalArgumentException If the ratio is out of bounds (0, 1). */ public void windowUpdateRatio(float ratio) { + assert ctx == null || ctx.executor().inEventLoop(); checkValidRatio(ratio); windowUpdateRatio = ratio; } @@ -211,6 +218,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames */ public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception { + assert ctx != null && ctx.executor().inEventLoop(); checkValidRatio(ratio); FlowState state = state(stream); state.windowUpdateRatio(ratio); @@ -230,6 +238,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception { + assert ctx != null && ctx.executor().inEventLoop(); int dataLength = data.readableBytes() + padding; // Apply the connection-level flow control @@ -283,14 +292,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}. * Each stream has their own initial window size. */ - private volatile int initialStreamWindowSize; + private int initialStreamWindowSize; /** * This is used to determine when {@link #processedWindow} is sufficiently far away from * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent. * Each stream has their own window update ratio. */ - private volatile float streamWindowUpdateRatio; + private float streamWindowUpdateRatio; private int lowerBound; private boolean endOfStream; @@ -303,6 +312,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void window(int initialWindowSize) { + assert ctx == null || ctx.executor().inEventLoop(); window = processedWindow = initialStreamWindowSize = initialWindowSize; } @@ -328,6 +338,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController @Override public void windowUpdateRatio(float ratio) { + assert ctx == null || ctx.executor().inEventLoop(); streamWindowUpdateRatio = ratio; } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 02ea21be94..33b4262b00 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -32,6 +32,9 @@ import java.util.Deque; /** * Basic implementation of {@link Http2RemoteFlowController}. + *

+ * This class is NOT thread safe. The assumption is all methods must be invoked from a single thread. + * Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class. */ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { private static final int MIN_WRITABLE_CHUNK = 32 * 1024; @@ -164,6 +167,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override public void initialWindowSize(int newWindowSize) throws Http2Exception { + assert ctx == null || ctx.executor().inEventLoop(); if (newWindowSize < 0) { throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); } @@ -202,6 +206,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { + assert ctx == null || ctx.executor().inEventLoop(); if (stream.id() == CONNECTION_STREAM_ID) { // Update the connection window connectionState().incrementStreamWindow(delta); @@ -224,6 +229,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override public void addFlowControlled(Http2Stream stream, FlowControlled frame) { + // The context can be null assuming the frame will be queued and send later when the context is set. + assert ctx == null || ctx.executor().inEventLoop(); checkNotNull(frame, "frame"); final AbstractState state; try { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java index 772f1886fa..553f1e769b 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2LocalFlowControllerTest.java @@ -31,8 +31,9 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; - +import io.netty.util.concurrent.EventExecutor; import junit.framework.AssertionFailedError; + import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -55,6 +56,9 @@ public class DefaultHttp2LocalFlowControllerTest { @Mock private ChannelHandlerContext ctx; + @Mock + private EventExecutor executor; + @Mock private ChannelPromise promise; @@ -75,6 +79,8 @@ public class DefaultHttp2LocalFlowControllerTest { connection.local().createStream(STREAM_ID, false); controller.channelHandlerContext(ctx); + when(ctx.executor()).thenReturn(executor); + when(executor.inEventLoop()).thenReturn(true); } @Test diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index d909e4344e..327c7c820c 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -43,6 +43,7 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http2.Http2FrameWriter.Configuration; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; +import io.netty.util.concurrent.EventExecutor; import java.util.Arrays; import java.util.List; @@ -88,6 +89,9 @@ public class DefaultHttp2RemoteFlowControllerTest { @Mock private ChannelConfig config; + @Mock + private EventExecutor executor; + @Mock private ChannelPromise promise; @@ -104,6 +108,7 @@ public class DefaultHttp2RemoteFlowControllerTest { when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); setChannelWritability(true); when(channel.config()).thenReturn(config); + when(executor.inEventLoop()).thenReturn(true); initConnectionAndController(); @@ -1444,6 +1449,7 @@ public class DefaultHttp2RemoteFlowControllerTest { private void resetCtx() { reset(ctx); when(ctx.channel()).thenReturn(channel); + when(ctx.executor()).thenReturn(executor); } private void setChannelWritability(boolean isWritable) { diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java index 1010559c29..ed3c884395 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java @@ -44,6 +44,7 @@ import io.netty.channel.DefaultChannelPromise; import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException; import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor; import org.junit.After; @@ -77,6 +78,9 @@ public class StreamBufferingEncoderTest { @Mock private ChannelConfig config; + @Mock + private EventExecutor executor; + @Mock private ChannelPromise promise; @@ -114,7 +118,9 @@ public class StreamBufferingEncoderTest { when(ctx.channel()).thenReturn(channel); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); + when(executor.inEventLoop()).thenReturn(true); when(ctx.newPromise()).thenReturn(promise); + when(ctx.executor()).thenReturn(executor); when(promise.channel()).thenReturn(channel); when(channel.isActive()).thenReturn(false); when(channel.config()).thenReturn(config);