Have Http2LocalFlowController.consumeBytes indicate whether a WINDOW_UPDATE was written

This commit is contained in:
Louis Ryan 2015-04-23 14:23:23 -07:00 committed by nmittler
parent 91e94c956f
commit a3cea186ce
5 changed files with 47 additions and 27 deletions

View File

@ -140,7 +140,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception {
// Streams automatically consume all remaining bytes when they are closed, so just ignore
// if already closed.
@ -152,9 +152,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
throw new IllegalArgumentException("numBytes must be positive");
}
connectionState().consumeBytes(ctx, numBytes);
state(stream).consumeBytes(ctx, numBytes);
boolean windowUpdateSent = connectionState().consumeBytes(ctx, numBytes);
windowUpdateSent |= state(stream).consumeBytes(ctx, numBytes);
return windowUpdateSent;
}
return false;
}
@Override
@ -374,10 +376,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
// Return the bytes processed and update the window.
returnProcessedBytes(numBytes);
writeWindowUpdateIfNeeded(ctx);
return writeWindowUpdateIfNeeded(ctx);
}
@Override
@ -386,15 +388,17 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
if (endOfStream || initialStreamWindowSize <= 0) {
return;
return false;
}
int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
if (processedWindow <= threshold) {
writeWindowUpdate(ctx);
return true;
}
return false;
}
/**
@ -444,12 +448,13 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
}
@Override
public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
throw new UnsupportedOperationException();
}
@Override
public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
return false;
}
@Override
@ -503,10 +508,21 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
/**
* Updates the flow control window for this stream if it is appropriate.
*
* @return true if {@code WINDOW_UPDATE} was written, false otherwise.
*/
void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception;
boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception;
void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception;
/**
* Indicates that the application has consumed {@code numBytes} from the connection or stream and is
* ready to receive more data.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate
* @param numBytes the number of bytes to be returned to the flow control window.
* @return true if {@code WINDOW_UPDATE} was written, false otherwise.
* @throws Http2Exception
*/
boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception;
int unconsumedBytes();

View File

@ -328,7 +328,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
}
@Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception {
Http2Decompressor decompressor = decompressor(stream);
Http2Decompressor copy = null;
@ -339,7 +339,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
// Convert the uncompressed consumed bytes to compressed (on the wire) bytes.
numBytes = decompressor.consumeProcessedBytes(numBytes);
}
flowController.consumeBytes(ctx, stream, numBytes);
return flowController.consumeBytes(ctx, stream, numBytes);
} catch (Http2Exception e) {
if (copy != null) {
stream.setProperty(propertyKey, copy);

View File

@ -57,10 +57,11 @@ public interface Http2LocalFlowController extends Http2FlowController {
* If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
* Http2Stream.State#CLOSED}), calling this method has no effect.
* @param numBytes the number of bytes to be returned to the flow control window.
* @return true if a {@code WINDOW_UPDATE} was sent, false otherwise.
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the
* stream.
*/
void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception;
boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception;
/**
* The number of bytes for the given stream that have been received but not yet consumed by the

View File

@ -18,6 +18,8 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
@ -86,11 +88,11 @@ public class DefaultHttp2LocalFlowControllerTest {
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
// Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent.
consumeBytes(STREAM_ID, 10);
assertFalse(consumeBytes(STREAM_ID, 10));
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
// Return the rest and verify the WINDOW_UPDATE is sent.
consumeBytes(STREAM_ID, dataSize - 10);
assertTrue(consumeBytes(STREAM_ID, dataSize - 10));
verifyWindowUpdateSent(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
}
@ -110,7 +112,7 @@ public class DefaultHttp2LocalFlowControllerTest {
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
verifyWindowUpdateNotSent(STREAM_ID);
consumeBytes(STREAM_ID, dataSize);
assertTrue(consumeBytes(STREAM_ID, dataSize));
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
verifyWindowUpdateNotSent(STREAM_ID);
}
@ -123,7 +125,7 @@ public class DefaultHttp2LocalFlowControllerTest {
// Don't set end-of-stream so we'll get a window update for the stream as well.
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
consumeBytes(STREAM_ID, dataSize);
assertTrue(consumeBytes(STREAM_ID, dataSize));
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateSent(STREAM_ID, windowDelta);
}
@ -150,7 +152,7 @@ public class DefaultHttp2LocalFlowControllerTest {
// Send the next frame and verify that the expected window updates were sent.
receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false);
consumeBytes(STREAM_ID, initialWindowSize);
assertTrue(consumeBytes(STREAM_ID, initialWindowSize));
int delta = newInitialWindowSize - initialWindowSize;
verifyWindowUpdateSent(STREAM_ID, delta);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta);
@ -172,7 +174,7 @@ public class DefaultHttp2LocalFlowControllerTest {
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID));
consumeBytes(STREAM_ID, data1);
assertTrue(consumeBytes(STREAM_ID, data1));
verifyWindowUpdateSent(STREAM_ID, data1);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1);
@ -191,8 +193,8 @@ public class DefaultHttp2LocalFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId));
assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID));
consumeBytes(STREAM_ID, data1);
consumeBytes(newStreamId, data2);
assertFalse(consumeBytes(STREAM_ID, data1));
assertTrue(consumeBytes(newStreamId, data2));
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(newStreamId);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
@ -266,8 +268,8 @@ public class DefaultHttp2LocalFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID));
assertEquals(newDefaultWindowSize - data1, window(newStreamId));
assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID));
consumeBytes(STREAM_ID, data2);
consumeBytes(newStreamId, data1);
assertFalse(consumeBytes(STREAM_ID, data2));
assertTrue(consumeBytes(newStreamId, data1));
verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateSent(newStreamId, data1);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
@ -305,8 +307,8 @@ public class DefaultHttp2LocalFlowControllerTest {
return buffer;
}
private void consumeBytes(int streamId, int numBytes) throws Http2Exception {
controller.consumeBytes(ctx, stream(streamId), numBytes);
private boolean consumeBytes(int streamId, int numBytes) throws Http2Exception {
return controller.consumeBytes(ctx, stream(streamId), numBytes);
}
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) {

View File

@ -57,7 +57,8 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
}
@Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception {
public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception {
return false;
}
@Override