HTTP/2 PriorityStreamByteDistributor exceptions and reentry

Motivation:
PriorityStreamByteDistributor saves exception state and attempts to reset state. This could be simplified by just throwing a connection error and closing the connection. PriorityStreamByteDistributor also does not handle or detect re-entry in the distribute method.

Motivation:
- PriorityStreamByteDistributor propagate an INTERNAL_ERROR if an exception occurs during writing
- PriorityStreamByteDistributor to handle re-entry on the write method

Result:
PriorityStreamByteDistributor exception code state simplified, and re-entry is detected.
This commit is contained in:
Scott Mitchell 2015-11-03 10:05:38 -08:00
parent 1b2e43e70c
commit 91b8ef3d10
5 changed files with 49 additions and 47 deletions

View File

@ -738,8 +738,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
} }
} }
protected final boolean initialWindowSize(int newWindowSize, Writer writer) protected final boolean initialWindowSize(int newWindowSize, Writer writer) throws Http2Exception {
throws Http2Exception {
if (newWindowSize < 0) { if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
} }

View File

@ -15,12 +15,14 @@
package io.netty.handler.codec.http2; package io.netty.handler.codec.http2;
import java.util.Arrays;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull; import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
import java.util.Arrays;
/** /**
* A {@link StreamByteDistributor} that implements the HTTP/2 priority tree algorithm for allocating * A {@link StreamByteDistributor} that implements the HTTP/2 priority tree algorithm for allocating
* bytes for all streams in the connection. * bytes for all streams in the connection.
@ -81,7 +83,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
} }
@Override @Override
public boolean distribute(int maxBytes, Writer writer) { public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
checkNotNull(writer, "writer"); checkNotNull(writer, "writer");
if (maxBytes > 0) { if (maxBytes > 0) {
allocateBytesForTree(connection.connectionStream(), maxBytes); allocateBytesForTree(connection.connectionStream(), maxBytes);
@ -379,46 +381,38 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo
/** /**
* A connection stream visitor that delegates to the user provided visitor. * A connection stream visitor that delegates to the user provided visitor.
*/ */
private class WriteVisitor implements Http2StreamVisitor { private final class WriteVisitor implements Http2StreamVisitor {
Writer writer; private boolean iterating;
RuntimeException error; private Writer writer;
void writeAllocatedBytes(Writer writer) { void writeAllocatedBytes(Writer writer) throws Http2Exception {
if (iterating) {
throw connectionError(INTERNAL_ERROR, "byte distribution re-entry error");
}
this.writer = writer;
try { try {
this.writer = writer; iterating = true;
try { connection.forEachActiveStream(this);
connection.forEachActiveStream(this);
} catch (Http2Exception e) {
// Should never happen since the visitor doesn't throw.
throw new IllegalStateException(e);
}
// If an error was caught when calling back the visitor, throw it now.
if (error != null) {
throw error;
}
} finally { } finally {
error = null; iterating = false;
} }
} }
@Override @Override
public boolean visit(Http2Stream stream) { public boolean visit(Http2Stream stream) throws Http2Exception {
PriorityState state = state(stream); PriorityState state = state(stream);
int allocated = state.allocated;
// Unallocate all bytes for this stream.
state.resetAllocated();
try { try {
int allocated = state.allocated;
// Unallocate all bytes for this stream.
state.resetAllocated();
// Write the allocated bytes. // Write the allocated bytes.
if (error == null) { writer.write(stream, allocated);
writer.write(stream, allocated); } catch (Throwable t) { // catch Throwable in case any unchecked re-throw tricks are used.
} // Stop calling the visitor and close the connection as exceptions from the writer are not supported.
} catch (RuntimeException e) { // If we don't close the connection there is risk that our internal state may be corrupted.
// Stop calling the visitor, but continue in the loop to reset the allocated for throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
// all remaining states.
error = e;
} }
// We have to iterate across all streams to ensure that we reset the allocated bytes. // We have to iterate across all streams to ensure that we reset the allocated bytes.

View File

@ -52,6 +52,9 @@ public interface StreamByteDistributor {
interface Writer { interface Writer {
/** /**
* Writes the allocated bytes for this stream. * Writes the allocated bytes for this stream.
* <p>
* Any {@link Throwable} thrown from this method is considered a programming error.
* A {@code GOAWAY} frame will be sent and the will be connection closed.
* @param stream the stream for which to perform the write. * @param stream the stream for which to perform the write.
* @param numBytes the number of bytes to write. * @param numBytes the number of bytes to write.
*/ */
@ -78,6 +81,8 @@ public interface StreamByteDistributor {
* @param maxBytes the maximum number of bytes to write. * @param maxBytes the maximum number of bytes to write.
* @return {@code true} if there are still streamable bytes that have not yet been written, * @return {@code true} if there are still streamable bytes that have not yet been written,
* otherwise {@code false}. * otherwise {@code false}.
* @throws Http2Exception If an internal exception occurs and internal connection state would otherwise be
* corrupted.
*/ */
boolean distribute(int maxBytes, Writer writer); boolean distribute(int maxBytes, Writer writer) throws Http2Exception;
} }

View File

@ -21,6 +21,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
@ -721,23 +722,24 @@ public class DefaultHttp2RemoteFlowControllerTest {
public void flowControlledWriteAndErrorThrowAnException() throws Exception { public void flowControlledWriteAndErrorThrowAnException() throws Exception {
final Http2RemoteFlowController.FlowControlled flowControlled = mockedFlowControlledThatThrowsOnWrite(); final Http2RemoteFlowController.FlowControlled flowControlled = mockedFlowControlledThatThrowsOnWrite();
final Http2Stream stream = stream(STREAM_A); final Http2Stream stream = stream(STREAM_A);
final RuntimeException fakeException = new RuntimeException("error failed");
doAnswer(new Answer<Void>() { doAnswer(new Answer<Void>() {
@Override @Override
public Void answer(InvocationOnMock invocationOnMock) { public Void answer(InvocationOnMock invocationOnMock) {
throw new RuntimeException("error failed"); throw fakeException;
} }
}).when(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class)); }).when(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class));
int windowBefore = window(STREAM_A); int windowBefore = window(STREAM_A);
boolean exceptionThrown = false;
try { try {
controller.addFlowControlled(stream, flowControlled); controller.addFlowControlled(stream, flowControlled);
controller.writePendingBytes(); controller.writePendingBytes();
} catch (RuntimeException e) { fail();
exceptionThrown = true; } catch (Http2Exception e) {
} finally { assertSame(fakeException, e.getCause());
assertTrue(exceptionThrown); } catch (Throwable t) {
fail();
} }
verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt()); verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt());

View File

@ -91,7 +91,7 @@ public class PriorityStreamByteDistributorTest {
} }
@Test @Test
public void bytesUnassignedAfterProcessing() { public void bytesUnassignedAfterProcessing() throws Http2Exception {
updateStream(STREAM_A, 1, true); updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true); updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true); updateStream(STREAM_C, 3, true);
@ -111,7 +111,7 @@ public class PriorityStreamByteDistributorTest {
} }
@Test @Test
public void bytesUnassignedAfterProcessingWithException() { public void connectionErrorForWriterException() throws Http2Exception {
updateStream(STREAM_A, 1, true); updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true); updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true); updateStream(STREAM_C, 3, true);
@ -123,8 +123,10 @@ public class PriorityStreamByteDistributorTest {
try { try {
write(10); write(10);
fail("Expected an exception"); fail("Expected an exception");
} catch (RuntimeException e) { } catch (Http2Exception e) {
assertSame(fakeException, e); assertFalse(Http2Exception.isStreamError(e));
assertEquals(Http2Error.INTERNAL_ERROR, e.error());
assertSame(fakeException, e.getCause());
} }
verifyWrite(atMost(1), STREAM_A, 1); verifyWrite(atMost(1), STREAM_A, 1);
@ -665,7 +667,7 @@ public class PriorityStreamByteDistributorTest {
return distributor.unallocatedStreamableBytesForTree(stream); return distributor.unallocatedStreamableBytesForTree(stream);
} }
private boolean write(int numBytes) { private boolean write(int numBytes) throws Http2Exception {
return distributor.distribute(numBytes, writer); return distributor.distribute(numBytes, writer);
} }