Have Http2LocalFlowController.consumeBytes indicate whether a WINDOW_UPDATE was written

This commit is contained in:
Louis Ryan 2015-04-23 14:23:23 -07:00 committed by nmittler
parent 37dc6a41a6
commit f6e1f4947d
5 changed files with 47 additions and 27 deletions

View File

@ -140,7 +140,7 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
} }
@Override @Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception { throws Http2Exception {
// Streams automatically consume all remaining bytes when they are closed, so just ignore // Streams automatically consume all remaining bytes when they are closed, so just ignore
// if already closed. // if already closed.
@ -152,9 +152,11 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
throw new IllegalArgumentException("numBytes must be positive"); throw new IllegalArgumentException("numBytes must be positive");
} }
connectionState().consumeBytes(ctx, numBytes); boolean windowUpdateSent = connectionState().consumeBytes(ctx, numBytes);
state(stream).consumeBytes(ctx, numBytes); windowUpdateSent |= state(stream).consumeBytes(ctx, numBytes);
return windowUpdateSent;
} }
return false;
} }
@Override @Override
@ -374,10 +376,10 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
} }
@Override @Override
public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
// Return the bytes processed and update the window. // Return the bytes processed and update the window.
returnProcessedBytes(numBytes); returnProcessedBytes(numBytes);
writeWindowUpdateIfNeeded(ctx); return writeWindowUpdateIfNeeded(ctx);
} }
@Override @Override
@ -386,15 +388,17 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
} }
@Override @Override
public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
if (endOfStream || initialStreamWindowSize <= 0) { if (endOfStream || initialStreamWindowSize <= 0) {
return; return false;
} }
int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio); int threshold = (int) (initialStreamWindowSize * streamWindowUpdateRatio);
if (processedWindow <= threshold) { if (processedWindow <= threshold) {
writeWindowUpdate(ctx); writeWindowUpdate(ctx);
return true;
} }
return false;
} }
/** /**
@ -444,12 +448,13 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
} }
@Override @Override
public void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception { public boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception { public boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception {
return false;
} }
@Override @Override
@ -503,10 +508,21 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
/** /**
* Updates the flow control window for this stream if it is appropriate. * Updates the flow control window for this stream if it is appropriate.
*
* @return true if {@code WINDOW_UPDATE} was written, false otherwise.
*/ */
void writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception; boolean writeWindowUpdateIfNeeded(ChannelHandlerContext ctx) throws Http2Exception;
void consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception; /**
* Indicates that the application has consumed {@code numBytes} from the connection or stream and is
* ready to receive more data.
*
* @param ctx the channel handler context to use when sending a {@code WINDOW_UPDATE} if appropriate
* @param numBytes the number of bytes to be returned to the flow control window.
* @return true if {@code WINDOW_UPDATE} was written, false otherwise.
* @throws Http2Exception
*/
boolean consumeBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception;
int unconsumedBytes(); int unconsumedBytes();

View File

@ -328,7 +328,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
} }
@Override @Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes)
throws Http2Exception { throws Http2Exception {
Http2Decompressor decompressor = decompressor(stream); Http2Decompressor decompressor = decompressor(stream);
Http2Decompressor copy = null; Http2Decompressor copy = null;
@ -339,7 +339,7 @@ public class DelegatingDecompressorFrameListener extends Http2FrameListenerDecor
// Convert the uncompressed consumed bytes to compressed (on the wire) bytes. // Convert the uncompressed consumed bytes to compressed (on the wire) bytes.
numBytes = decompressor.consumeProcessedBytes(numBytes); numBytes = decompressor.consumeProcessedBytes(numBytes);
} }
flowController.consumeBytes(ctx, stream, numBytes); return flowController.consumeBytes(ctx, stream, numBytes);
} catch (Http2Exception e) { } catch (Http2Exception e) {
if (copy != null) { if (copy != null) {
stream.setProperty(propertyKey, copy); stream.setProperty(propertyKey, copy);

View File

@ -57,10 +57,11 @@ public interface Http2LocalFlowController extends Http2FlowController {
* If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link * If {@code stream} is {@code null} or closed (i.e. {@link Http2Stream#state()} method returns {@link
* Http2Stream.State#CLOSED}), calling this method has no effect. * Http2Stream.State#CLOSED}), calling this method has no effect.
* @param numBytes the number of bytes to be returned to the flow control window. * @param numBytes the number of bytes to be returned to the flow control window.
* @return true if a {@code WINDOW_UPDATE} was sent, false otherwise.
* @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the * @throws Http2Exception if the number of bytes returned exceeds the {@link #unconsumedBytes(Http2Stream)} for the
* stream. * stream.
*/ */
void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception; boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception;
/** /**
* The number of bytes for the given stream that have been received but not yet consumed by the * The number of bytes for the given stream that have been received but not yet consumed by the

View File

@ -18,6 +18,8 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID; import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE; import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -86,11 +88,11 @@ public class DefaultHttp2LocalFlowControllerTest {
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
// Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent. // Return only a few bytes and verify that the WINDOW_UPDATE hasn't been sent.
consumeBytes(STREAM_ID, 10); assertFalse(consumeBytes(STREAM_ID, 10));
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
// Return the rest and verify the WINDOW_UPDATE is sent. // Return the rest and verify the WINDOW_UPDATE is sent.
consumeBytes(STREAM_ID, dataSize - 10); assertTrue(consumeBytes(STREAM_ID, dataSize - 10));
verifyWindowUpdateSent(STREAM_ID, dataSize); verifyWindowUpdateSent(STREAM_ID, dataSize);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
} }
@ -110,7 +112,7 @@ public class DefaultHttp2LocalFlowControllerTest {
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID);
consumeBytes(STREAM_ID, dataSize); assertTrue(consumeBytes(STREAM_ID, dataSize));
verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize); verifyWindowUpdateSent(CONNECTION_STREAM_ID, dataSize);
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID);
} }
@ -123,7 +125,7 @@ public class DefaultHttp2LocalFlowControllerTest {
// Don't set end-of-stream so we'll get a window update for the stream as well. // Don't set end-of-stream so we'll get a window update for the stream as well.
receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false); receiveFlowControlledFrame(STREAM_ID, dataSize, 0, false);
consumeBytes(STREAM_ID, dataSize); assertTrue(consumeBytes(STREAM_ID, dataSize));
verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta); verifyWindowUpdateSent(CONNECTION_STREAM_ID, windowDelta);
verifyWindowUpdateSent(STREAM_ID, windowDelta); verifyWindowUpdateSent(STREAM_ID, windowDelta);
} }
@ -150,7 +152,7 @@ public class DefaultHttp2LocalFlowControllerTest {
// Send the next frame and verify that the expected window updates were sent. // Send the next frame and verify that the expected window updates were sent.
receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false); receiveFlowControlledFrame(STREAM_ID, initialWindowSize, 0, false);
consumeBytes(STREAM_ID, initialWindowSize); assertTrue(consumeBytes(STREAM_ID, initialWindowSize));
int delta = newInitialWindowSize - initialWindowSize; int delta = newInitialWindowSize - initialWindowSize;
verifyWindowUpdateSent(STREAM_ID, delta); verifyWindowUpdateSent(STREAM_ID, delta);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta); verifyWindowUpdateSent(CONNECTION_STREAM_ID, delta);
@ -172,7 +174,7 @@ public class DefaultHttp2LocalFlowControllerTest {
verifyWindowUpdateNotSent(CONNECTION_STREAM_ID); verifyWindowUpdateNotSent(CONNECTION_STREAM_ID);
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(CONNECTION_STREAM_ID));
consumeBytes(STREAM_ID, data1); assertTrue(consumeBytes(STREAM_ID, data1));
verifyWindowUpdateSent(STREAM_ID, data1); verifyWindowUpdateSent(STREAM_ID, data1);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1);
@ -191,8 +193,8 @@ public class DefaultHttp2LocalFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(STREAM_ID));
assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId)); assertEquals(DEFAULT_WINDOW_SIZE - data1, window(newStreamId));
assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - (data1 << 1), window(CONNECTION_STREAM_ID));
consumeBytes(STREAM_ID, data1); assertFalse(consumeBytes(STREAM_ID, data1));
consumeBytes(newStreamId, data2); assertTrue(consumeBytes(newStreamId, data2));
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateNotSent(newStreamId); verifyWindowUpdateNotSent(newStreamId);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
@ -266,8 +268,8 @@ public class DefaultHttp2LocalFlowControllerTest {
assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID)); assertEquals(DEFAULT_WINDOW_SIZE - data2, window(STREAM_ID));
assertEquals(newDefaultWindowSize - data1, window(newStreamId)); assertEquals(newDefaultWindowSize - data1, window(newStreamId));
assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID)); assertEquals(newDefaultWindowSize - data2 - data1, window(CONNECTION_STREAM_ID));
consumeBytes(STREAM_ID, data2); assertFalse(consumeBytes(STREAM_ID, data2));
consumeBytes(newStreamId, data1); assertTrue(consumeBytes(newStreamId, data1));
verifyWindowUpdateNotSent(STREAM_ID); verifyWindowUpdateNotSent(STREAM_ID);
verifyWindowUpdateSent(newStreamId, data1); verifyWindowUpdateSent(newStreamId, data1);
verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2); verifyWindowUpdateSent(CONNECTION_STREAM_ID, data1 + data2);
@ -305,8 +307,8 @@ public class DefaultHttp2LocalFlowControllerTest {
return buffer; return buffer;
} }
private void consumeBytes(int streamId, int numBytes) throws Http2Exception { private boolean consumeBytes(int streamId, int numBytes) throws Http2Exception {
controller.consumeBytes(ctx, stream(streamId), numBytes); return controller.consumeBytes(ctx, stream(streamId), numBytes);
} }
private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) { private void verifyWindowUpdateSent(int streamId, int windowSizeIncrement) {

View File

@ -57,7 +57,8 @@ public final class NoopHttp2LocalFlowController implements Http2LocalFlowControl
} }
@Override @Override
public void consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception { public boolean consumeBytes(ChannelHandlerContext ctx, Http2Stream stream, int numBytes) throws Http2Exception {
return false;
} }
@Override @Override