HTTP/2 Thread Context Interface Clarifications

Motivation:
It is currently assumed that all usages of the HTTP/2 codec will be from the same event loop context. If the methods are used outside of the assumed thread context then unexpected behavior is observed. This assumption should be more clearly communicated and enforced in key areas.

Modifications:
- The flow controller interfaces have assert statements and updated javadocs indicating the assumptions.

Result:
Interfaces more clearly indicate thread context limitations.
This commit is contained in:
Scott Mitchell 2015-06-01 17:24:49 -07:00
parent 1be2173596
commit 556d19aaa7
5 changed files with 41 additions and 5 deletions

View File

@ -34,6 +34,9 @@ import io.netty.util.internal.PlatformDependent;
/** /**
* Basic implementation of {@link Http2LocalFlowController}. * Basic implementation of {@link Http2LocalFlowController}.
* <p>
* This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
* Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
*/ */
public class DefaultHttp2LocalFlowController implements Http2LocalFlowController { public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
/** /**
@ -46,8 +49,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
private final Http2FrameWriter frameWriter; private final Http2FrameWriter frameWriter;
private final Http2Connection.PropertyKey stateKey; private final Http2Connection.PropertyKey stateKey;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private volatile float windowUpdateRatio; private float windowUpdateRatio;
private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE; private int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) { public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO); this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO);
@ -110,6 +113,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override @Override
public void initialWindowSize(int newWindowSize) throws Http2Exception { public void initialWindowSize(int newWindowSize) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
int delta = newWindowSize - initialWindowSize; int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize; initialWindowSize = newWindowSize;
@ -135,6 +139,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override @Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
FlowState state = state(stream); FlowState state = state(stream);
// Just add the delta to the stream-specific initial window size so that the next time the window // Just add the delta to the stream-specific initial window size so that the next time the window
// expands it will grow to the new initial size. // expands it will grow to the new initial size.
@ -144,6 +149,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override @Override
public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception { public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
if (numBytes < 0) { if (numBytes < 0) {
throw new IllegalArgumentException("numBytes must not be negative"); throw new IllegalArgumentException("numBytes must not be negative");
} }
@ -184,6 +190,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* @throws IllegalArgumentException If the ratio is out of bounds (0, 1). * @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
*/ */
public void windowUpdateRatio(float ratio) { public void windowUpdateRatio(float ratio) {
assert ctx == null || ctx.executor().inEventLoop();
checkValidRatio(ratio); checkValidRatio(ratio);
windowUpdateRatio = ratio; windowUpdateRatio = ratio;
} }
@ -211,6 +218,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
*/ */
public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception { public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
checkValidRatio(ratio); checkValidRatio(ratio);
FlowState state = state(stream); FlowState state = state(stream);
state.windowUpdateRatio(ratio); state.windowUpdateRatio(ratio);
@ -230,6 +238,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override @Override
public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding, public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception { boolean endOfStream) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
int dataLength = data.readableBytes() + padding; int dataLength = data.readableBytes() + padding;
// Apply the connection-level flow control // Apply the connection-level flow control
@ -283,14 +292,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}. * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
* Each stream has their own initial window size. * Each stream has their own initial window size.
*/ */
private volatile int initialStreamWindowSize; private int initialStreamWindowSize;
/** /**
* This is used to determine when {@link #processedWindow} is sufficiently far away from * This is used to determine when {@link #processedWindow} is sufficiently far away from
* {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent. * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
* Each stream has their own window update ratio. * Each stream has their own window update ratio.
*/ */
private volatile float streamWindowUpdateRatio; private float streamWindowUpdateRatio;
private int lowerBound; private int lowerBound;
private boolean endOfStream; private boolean endOfStream;
@ -303,6 +312,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override @Override
public void window(int initialWindowSize) { public void window(int initialWindowSize) {
assert ctx == null || ctx.executor().inEventLoop();
window = processedWindow = initialStreamWindowSize = initialWindowSize; window = processedWindow = initialStreamWindowSize = initialWindowSize;
} }
@ -328,6 +338,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override @Override
public void windowUpdateRatio(float ratio) { public void windowUpdateRatio(float ratio) {
assert ctx == null || ctx.executor().inEventLoop();
streamWindowUpdateRatio = ratio; streamWindowUpdateRatio = ratio;
} }

View File

@ -32,6 +32,9 @@ import java.util.Deque;
/** /**
* Basic implementation of {@link Http2RemoteFlowController}. * Basic implementation of {@link Http2RemoteFlowController}.
* <p>
* This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
* Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
*/ */
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController { public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final int MIN_WRITABLE_CHUNK = 32 * 1024; private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
@ -164,6 +167,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override @Override
public void initialWindowSize(int newWindowSize) throws Http2Exception { public void initialWindowSize(int newWindowSize) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
if (newWindowSize < 0) { if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
} }
@ -202,6 +206,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override @Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception { public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
if (stream.id() == CONNECTION_STREAM_ID) { if (stream.id() == CONNECTION_STREAM_ID) {
// Update the connection window // Update the connection window
connectionState().incrementStreamWindow(delta); connectionState().incrementStreamWindow(delta);
@ -224,6 +229,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override @Override
public void addFlowControlled(Http2Stream stream, FlowControlled frame) { public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
// The context can be null assuming the frame will be queued and send later when the context is set.
assert ctx == null || ctx.executor().inEventLoop();
checkNotNull(frame, "frame"); checkNotNull(frame, "frame");
final AbstractState state; final AbstractState state;
try { try {

View File

@ -31,8 +31,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import junit.framework.AssertionFailedError; import junit.framework.AssertionFailedError;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mock; import org.mockito.Mock;
@ -55,6 +56,9 @@ public class DefaultHttp2LocalFlowControllerTest {
@Mock @Mock
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
@Mock
private EventExecutor executor;
@Mock @Mock
private ChannelPromise promise; private ChannelPromise promise;
@ -75,6 +79,8 @@ public class DefaultHttp2LocalFlowControllerTest {
connection.local().createStream(STREAM_ID, false); connection.local().createStream(STREAM_ID, false);
controller.channelHandlerContext(ctx); controller.channelHandlerContext(ctx);
when(ctx.executor()).thenReturn(executor);
when(executor.inEventLoop()).thenReturn(true);
} }
@Test @Test

View File

@ -43,6 +43,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2FrameWriter.Configuration; import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.EventExecutor;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -88,6 +89,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
@Mock @Mock
private ChannelConfig config; private ChannelConfig config;
@Mock
private EventExecutor executor;
@Mock @Mock
private ChannelPromise promise; private ChannelPromise promise;
@ -104,6 +108,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
setChannelWritability(true); setChannelWritability(true);
when(channel.config()).thenReturn(config); when(channel.config()).thenReturn(config);
when(executor.inEventLoop()).thenReturn(true);
initConnectionAndController(); initConnectionAndController();
@ -1444,6 +1449,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
private void resetCtx() { private void resetCtx() {
reset(ctx); reset(ctx);
when(ctx.channel()).thenReturn(channel); when(ctx.channel()).thenReturn(channel);
when(ctx.executor()).thenReturn(executor);
} }
private void setChannelWritability(boolean isWritable) { private void setChannelWritability(boolean isWritable) {

View File

@ -44,6 +44,7 @@ import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException; import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException; import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.After; import org.junit.After;
@ -77,6 +78,9 @@ public class StreamBufferingEncoderTest {
@Mock @Mock
private ChannelConfig config; private ChannelConfig config;
@Mock
private EventExecutor executor;
@Mock @Mock
private ChannelPromise promise; private ChannelPromise promise;
@ -114,7 +118,9 @@ public class StreamBufferingEncoderTest {
when(ctx.channel()).thenReturn(channel); when(ctx.channel()).thenReturn(channel);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(executor.inEventLoop()).thenReturn(true);
when(ctx.newPromise()).thenReturn(promise); when(ctx.newPromise()).thenReturn(promise);
when(ctx.executor()).thenReturn(executor);
when(promise.channel()).thenReturn(channel); when(promise.channel()).thenReturn(channel);
when(channel.isActive()).thenReturn(false); when(channel.isActive()).thenReturn(false);
when(channel.config()).thenReturn(config); when(channel.config()).thenReturn(config);