Fixing HTTP/2 processed byte accounting during exception

Motivation:
Currently when an exception occurs during a listener.onDataRead
callback, we return all bytes as processed. However, the listener may
choose to return bytes via the InboundFlowState object rather than
returning the integer. If the listener returns a few bytes and then
throws, we will attempt to return too many bytes.

Modifications:
Added InboundFlowState.unProcessedBytes() to indicate how many
unprocessed bytes are outstanding.

Updated DefaultHttp2ConnectionDecoder to compare the unprocessed bytes
before and after the listener.onDataRead callback when an exception was
encountered.  If there is a difference, it is subtracted off the total
processed bytes to be returned to the flow controller.

Result:
HTTP/2 data frame delivery properly accounts for processed bytes through
an exception.
This commit is contained in:
nmittler 2014-11-18 11:16:26 -08:00
parent 3868645d7d
commit e7efe2b929
4 changed files with 93 additions and 9 deletions

View File

@ -192,6 +192,10 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
frameReader.close();
}
private static int unprocessedBytes(Http2Stream stream) {
return stream.inboundFlow().unProcessedBytes();
}
/**
* Handles all inbound frames from the network.
*/
@ -212,7 +216,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// We should ignore this frame if RST_STREAM was sent or if GO_AWAY was sent with a
// lower stream ID.
boolean shouldApplyFlowControl = false;
int processedBytes = data.readableBytes() + padding;
boolean shouldIgnore = shouldIgnoreFrame(stream);
Http2Exception error = null;
switch (stream.state()) {
@ -240,15 +243,19 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
break;
}
int bytesToReturn = data.readableBytes() + padding;
int unprocessedBytes = unprocessedBytes(stream);
try {
// If we should apply flow control, do so now.
if (shouldApplyFlowControl) {
inboundFlow.applyFlowControl(ctx, streamId, data, padding, endOfStream);
// Update the unprocessed bytes after flow control is applied.
unprocessedBytes = unprocessedBytes(stream);
}
// If we should ignore this frame, do so now.
if (shouldIgnore) {
return processedBytes;
return bytesToReturn;
}
// If the stream was in an invalid state to receive the frame, throw the error.
@ -258,12 +265,26 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// Call back the application and retrieve the number of bytes that have been
// immediately processed.
processedBytes = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
return processedBytes;
bytesToReturn = listener.onDataRead(ctx, streamId, data, padding, endOfStream);
return bytesToReturn;
} catch (Http2Exception e) {
// If an exception happened during delivery, the listener may have returned part
// of the bytes before the error occurred. If that's the case, subtract that from
// the total processed bytes so that we don't return too many bytes.
int delta = unprocessedBytes - unprocessedBytes(stream);
bytesToReturn -= delta;
throw e;
} catch (RuntimeException e) {
// If an exception happened during delivery, the listener may have returned part
// of the bytes before the error occurred. If that's the case, subtract that from
// the total processed bytes so that we don't return too many bytes.
int delta = unprocessedBytes - unprocessedBytes(stream);
bytesToReturn -= delta;
throw e;
} finally {
// If appropriate, returned the processed bytes to the flow controller.
if (shouldApplyFlowControl && processedBytes > 0) {
stream.inboundFlow().returnProcessedBytes(ctx, processedBytes);
if (shouldApplyFlowControl && bytesToReturn > 0) {
stream.inboundFlow().returnProcessedBytes(ctx, bytesToReturn);
}
if (endOfStream) {

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.flowControlError;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
@ -208,6 +209,11 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
updateWindowIfAppropriate(ctx);
}
@Override
public int unProcessedBytes() {
return processedWindow - window;
}
/**
* Updates the flow control window for this stream if it is appropriate.
*/
@ -225,10 +231,15 @@ public class DefaultHttp2InboundFlowController implements Http2InboundFlowContro
/**
* Returns the processed bytes for this stream.
*/
void returnProcessedBytes(int delta) {
void returnProcessedBytes(int delta) throws Http2Exception {
if (processedWindow - delta < window) {
throw new IllegalArgumentException(
"Attempting to return too many bytes for stream " + streamId);
if (streamId == CONNECTION_STREAM_ID) {
throw new Http2Exception(INTERNAL_ERROR,
"Attempting to return too many bytes for connection");
} else {
throw new Http2StreamException(streamId, INTERNAL_ERROR,
"Attempting to return too many bytes for stream " + streamId);
}
}
processedWindow -= delta;
}

View File

@ -33,4 +33,9 @@ public interface Http2InboundFlowState extends Http2FlowState {
* @param numBytes the number of bytes to be returned to the flow control window.
*/
void returnProcessedBytes(ChannelHandlerContext ctx, int numBytes) throws Http2Exception;
/**
* The number of bytes that are outstanding and have not yet been returned to the flow controller.
*/
int unProcessedBytes();
}

View File

@ -23,7 +23,9 @@ import static io.netty.handler.codec.http2.Http2Exception.protocolError;
import static io.netty.handler.codec.http2.Http2Stream.State.OPEN;
import static io.netty.handler.codec.http2.Http2Stream.State.RESERVED_REMOTE;
import static io.netty.util.CharsetUtil.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@ -45,6 +47,7 @@ import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
@ -235,6 +238,50 @@ public class DefaultHttp2ConnectionDecoderTest {
}
}
@Test
public void errorDuringDeliveryShouldReturnCorrectNumberOfBytes() throws Exception {
final ByteBuf data = dummyData();
final int padding = 10;
final AtomicInteger unprocessed = new AtomicInteger(data.readableBytes() + padding);
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock in) throws Throwable {
return unprocessed.get();
}
}).when(inFlowState).unProcessedBytes();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock in) throws Throwable {
int delta = (Integer) in.getArguments()[1];
int newValue = unprocessed.addAndGet(-delta);
if (newValue < 0) {
throw new RuntimeException("Returned too many bytes");
}
return null;
}
}).when(inFlowState).returnProcessedBytes(eq(ctx), anyInt());
// When the listener callback is called, process a few bytes and then throw.
doAnswer(new Answer<Integer>() {
@Override
public Integer answer(InvocationOnMock in) throws Throwable {
inFlowState.returnProcessedBytes(ctx, 4);
throw new RuntimeException("Fake Exception");
}
}).when(listener).onDataRead(eq(ctx), eq(STREAM_ID), any(ByteBuf.class), eq(10), eq(true));
try {
decode().onDataRead(ctx, STREAM_ID, data, padding, true);
fail("Expected exception");
} catch (RuntimeException cause) {
verify(inboundFlow)
.applyFlowControl(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
verify(lifecycleManager).closeRemoteSide(eq(stream), eq(future));
verify(listener).onDataRead(eq(ctx), eq(STREAM_ID), eq(data), eq(padding), eq(true));
assertEquals(0, inFlowState.unProcessedBytes());
} finally {
data.release();
}
}
@Test
public void headersReadAfterGoAwayShouldBeIgnored() throws Exception {
when(connection.goAwaySent()).thenReturn(true);