HTTP/2 Thread Context Interface Clarifications

Motivation:
It is currently assumed that all usages of the HTTP/2 codec will be from the same event loop context. If the methods are used outside of the assumed thread context then unexpected behavior is observed. This assumption should be more clearly communicated and enforced in key areas.

Modifications:
- The flow controller interfaces have assert statements and updated javadocs indicating the assumptions.

Result:
Interfaces more clearly indicate thread context limitations.
This commit is contained in:
Scott Mitchell 2015-06-01 17:24:49 -07:00
parent f1c5f0e0c7
commit 74dd7f85ca
5 changed files with 41 additions and 5 deletions

View File

@ -34,6 +34,9 @@ import io.netty.util.internal.PlatformDependent;
/**
* Basic implementation of {@link Http2LocalFlowController}.
* <p>
* This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
* Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
*/
public class DefaultHttp2LocalFlowController implements Http2LocalFlowController {
/**
@ -46,8 +49,8 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
private final Http2FrameWriter frameWriter;
private final Http2Connection.PropertyKey stateKey;
private ChannelHandlerContext ctx;
private volatile float windowUpdateRatio;
private volatile int initialWindowSize = DEFAULT_WINDOW_SIZE;
private float windowUpdateRatio;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
public DefaultHttp2LocalFlowController(Http2Connection connection, Http2FrameWriter frameWriter) {
this(connection, frameWriter, DEFAULT_WINDOW_UPDATE_RATIO);
@ -110,6 +113,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
int delta = newWindowSize - initialWindowSize;
initialWindowSize = newWindowSize;
@ -135,6 +139,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
FlowState state = state(stream);
// Just add the delta to the stream-specific initial window size so that the next time the window
// expands it will grow to the new initial size.
@ -144,6 +149,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
if (numBytes < 0) {
throw new IllegalArgumentException("numBytes must not be negative");
}
@ -184,6 +190,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
*/
public void windowUpdateRatio(float ratio) {
assert ctx == null || ctx.executor().inEventLoop();
checkValidRatio(ratio);
windowUpdateRatio = ratio;
}
@ -211,6 +218,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
*/
public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
checkValidRatio(ratio);
FlowState state = state(stream);
state.windowUpdateRatio(ratio);
@ -230,6 +238,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
boolean endOfStream) throws Http2Exception {
assert ctx != null && ctx.executor().inEventLoop();
int dataLength = data.readableBytes() + padding;
// Apply the connection-level flow control
@ -283,14 +292,14 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
* This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
* Each stream has their own initial window size.
*/
private volatile int initialStreamWindowSize;
private int initialStreamWindowSize;
/**
* This is used to determine when {@link #processedWindow} is sufficiently far away from
* {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
* Each stream has their own window update ratio.
*/
private volatile float streamWindowUpdateRatio;
private float streamWindowUpdateRatio;
private int lowerBound;
private boolean endOfStream;
@ -303,6 +312,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public void window(int initialWindowSize) {
assert ctx == null || ctx.executor().inEventLoop();
window = processedWindow = initialStreamWindowSize = initialWindowSize;
}
@ -328,6 +338,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
@Override
public void windowUpdateRatio(float ratio) {
assert ctx == null || ctx.executor().inEventLoop();
streamWindowUpdateRatio = ratio;
}

View File

@ -32,6 +32,9 @@ import java.util.Deque;
/**
* Basic implementation of {@link Http2RemoteFlowController}.
* <p>
* This class is <strong>NOT</strong> thread safe. The assumption is all methods must be invoked from a single thread.
* Typically this thread is the event loop thread for the {@link ChannelHandlerContext} managed by this class.
*/
public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowController {
private static final int MIN_WRITABLE_CHUNK = 32 * 1024;
@ -164,6 +167,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
public void initialWindowSize(int newWindowSize) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
}
@ -202,6 +206,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
assert ctx == null || ctx.executor().inEventLoop();
if (stream.id() == CONNECTION_STREAM_ID) {
// Update the connection window
connectionState().incrementStreamWindow(delta);
@ -224,6 +229,8 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
@Override
public void addFlowControlled(Http2Stream stream, FlowControlled frame) {
// The context can be null assuming the frame will be queued and send later when the context is set.
assert ctx == null || ctx.executor().inEventLoop();
checkNotNull(frame, "frame");
final AbstractState state;
try {

View File

@ -31,8 +31,9 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@ -55,6 +56,9 @@ public class DefaultHttp2LocalFlowControllerTest {
@Mock
private ChannelHandlerContext ctx;
@Mock
private EventExecutor executor;
@Mock
private ChannelPromise promise;
@ -75,6 +79,8 @@ public class DefaultHttp2LocalFlowControllerTest {
connection.local().createStream(STREAM_ID, false);
controller.channelHandlerContext(ctx);
when(ctx.executor()).thenReturn(executor);
when(executor.inEventLoop()).thenReturn(true);
}
@Test

View File

@ -43,6 +43,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.Http2FrameWriter.Configuration;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.EventExecutor;
import java.util.Arrays;
import java.util.List;
@ -88,6 +89,9 @@ public class DefaultHttp2RemoteFlowControllerTest {
@Mock
private ChannelConfig config;
@Mock
private EventExecutor executor;
@Mock
private ChannelPromise promise;
@ -104,6 +108,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
setChannelWritability(true);
when(channel.config()).thenReturn(config);
when(executor.inEventLoop()).thenReturn(true);
initConnectionAndController();
@ -1444,6 +1449,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
private void resetCtx() {
reset(ctx);
when(ctx.channel()).thenReturn(channel);
when(ctx.executor()).thenReturn(executor);
}
private void setChannelWritability(boolean isWritable) {

View File

@ -44,6 +44,7 @@ import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2GoAwayException;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.junit.After;
@ -77,6 +78,9 @@ public class StreamBufferingEncoderTest {
@Mock
private ChannelConfig config;
@Mock
private EventExecutor executor;
@Mock
private ChannelPromise promise;
@ -114,7 +118,9 @@ public class StreamBufferingEncoderTest {
when(ctx.channel()).thenReturn(channel);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(executor.inEventLoop()).thenReturn(true);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.executor()).thenReturn(executor);
when(promise.channel()).thenReturn(channel);
when(channel.isActive()).thenReturn(false);
when(channel.config()).thenReturn(config);