From 96f9b0b91ba63c1f6232c6acca4358afe01fbb7f Mon Sep 17 00:00:00 2001 From: nmittler Date: Tue, 3 Nov 2015 15:05:32 -0800 Subject: [PATCH] Remote flow controller incorrectly updates stream state Motivation: The `DefaultHttp2RemoteFlowController` does not correctly determine `hasFrame` when updating the stream state for the distributor. Adding a check to enforce `hasFrame` when `streamableBytes > 0` causes several test failures. Modifications: Modified `DefaultHttp2RemoteFlowController` to simplify the writing logic and to correct the bookkeeping for `hasFrame`. Result: The distributors are always called with valid arguments. --- .../DefaultHttp2RemoteFlowController.java | 148 ++++++++---------- .../http2/PriorityStreamByteDistributor.java | 1 + .../DefaultHttp2RemoteFlowControllerTest.java | 31 ++-- .../PriorityStreamByteDistributorTest.java | 6 +- 4 files changed, 84 insertions(+), 102 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java index 366adc7006..51d9542c34 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowController.java @@ -315,12 +315,66 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override int writeAllocatedBytes(int allocated) { + final int initialAllocated = allocated; + int writtenBytes; + // In case an exception is thrown we want to remember it and pass it to cancel(Throwable). + Throwable cause = null; + FlowControlled frame; try { - // Perform the write. - return writeBytes(allocated); + assert !writing; + writing = true; + + // Write the remainder of frames that we are allowed to + boolean writeOccurred = false; + while (!cancelled && (frame = peek()) != null) { + int maxBytes = min(allocated, writableWindow()); + if (maxBytes <= 0 && frame.size() > 0) { + // The frame still has data, but the amount of allocated bytes has been exhausted. + // Don't write needless empty frames. + break; + } + writeOccurred = true; + int initialFrameSize = frame.size(); + try { + frame.write(ctx, max(0, maxBytes)); + if (frame.size() == 0) { + // This frame has been fully written, remove this frame and notify it. + // Since we remove this frame first, we're guaranteed that its error + // method will not be called when we call cancel. + pendingWriteQueue.remove(); + frame.writeComplete(); + } + } finally { + // Decrement allocated by how much was actually written. + allocated -= initialFrameSize - frame.size(); + } + } + + if (!writeOccurred) { + // Either there was no frame, or the amount of allocated bytes has been exhausted. + return -1; + } + + } catch (Throwable t) { + // Mark the state as cancelled, we'll clear the pending queue via cancel() below. + cancelled = true; + cause = t; } finally { - streamByteDistributor.updateStreamableBytes(this); + writing = false; + // Make sure we always decrement the flow control windows + // by the bytes written. + writtenBytes = initialAllocated - allocated; + + decrementPendingBytes(writtenBytes, false); + decrementFlowControlWindow(writtenBytes); + + // If a cancellation occurred while writing, call cancel again to + // clear and error all of the pending writes. + if (cancelled) { + cancel(cause); + } } + return writtenBytes; } @Override @@ -354,11 +408,13 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll @Override void enqueueFrame(FlowControlled frame) { - incrementPendingBytes(frame.size()); FlowControlled last = pendingWriteQueue.peekLast(); if (last == null || !last.merge(ctx, frame)) { pendingWriteQueue.offer(frame); } + // This must be called after adding to the queue in order so that hasFrame() is + // updated before updating the stream state. + incrementPendingBytes(frame.size(), true); } @Override @@ -400,89 +456,23 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll streamByteDistributor.updateStreamableBytes(this); } - int writeBytes(int bytes) { - if (!hasFrame()) { - return -1; - } - // Check if the first frame is a "writable" frame to get the "-1" return status out of the way - FlowControlled frame = peek(); - int maxBytes = min(bytes, writableWindow()); - if (maxBytes <= 0 && frame.size() != 0) { - // The frame still has data, but the amount of allocated bytes has been exhausted. - return -1; - } - int originalBytes = bytes; - bytes -= write(frame, maxBytes); - - // Write the remainder of frames that we are allowed to - while (hasFrame()) { - frame = peek(); - maxBytes = min(bytes, writableWindow()); - if (maxBytes <= 0 && frame.size() != 0) { - // The frame still has data, but the amount of allocated bytes has been exhausted. - break; - } - bytes -= write(frame, maxBytes); - } - return originalBytes - bytes; - } - /** - * Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending - * queue, the written bytes are removed from this branch of the priority tree. + * Increments the number of pending bytes for this node and optionally updates the + * {@link StreamByteDistributor}. */ - private int write(FlowControlled frame, int allowedBytes) { - int before = frame.size(); - int writtenBytes; - // In case an exception is thrown we want to remember it and pass it to cancel(Throwable). - Throwable cause = null; - try { - assert !writing; - - // Write the portion of the frame. - writing = true; - frame.write(ctx, max(0, allowedBytes)); - if (!cancelled && frame.size() == 0) { - // This frame has been fully written, remove this frame and notify it. Since we remove this frame - // first, we're guaranteed that its error method will not be called when we call cancel. - pendingWriteQueue.remove(); - frame.writeComplete(); - } - } catch (Throwable t) { - // Mark the state as cancelled, we'll clear the pending queue via cancel() below. - cancelled = true; - cause = t; - } finally { - writing = false; - // Make sure we always decrement the flow control windows - // by the bytes written. - writtenBytes = before - frame.size(); - decrementFlowControlWindow(writtenBytes); - decrementPendingBytes(writtenBytes); - // If a cancellation occurred while writing, call cancel again to - // clear and error all of the pending writes. - if (cancelled) { - cancel(cause); - } - } - return writtenBytes; - } - - /** - * Increments the number of pending bytes for this node and updates the {@link StreamByteDistributor}. - */ - private void incrementPendingBytes(int numBytes) { + private void incrementPendingBytes(int numBytes, boolean updateStreamableBytes) { pendingBytes += numBytes; - - streamByteDistributor.updateStreamableBytes(this); monitor.incrementPendingBytes(numBytes); + if (updateStreamableBytes) { + streamByteDistributor.updateStreamableBytes(this); + } } /** * If this frame is in the pending queue, decrements the number of pending bytes for the stream. */ - private void decrementPendingBytes(int bytes) { - incrementPendingBytes(-bytes); + private void decrementPendingBytes(int bytes, boolean updateStreamableBytes) { + incrementPendingBytes(-bytes, updateStreamableBytes); } /** @@ -505,7 +495,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll */ private void writeError(FlowControlled frame, Http2Exception cause) { assert ctx != null; - decrementPendingBytes(frame.size()); + decrementPendingBytes(frame.size(), true); frame.error(ctx, cause); } } diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java index 202affcbc5..04ab3e73dd 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/PriorityStreamByteDistributor.java @@ -346,6 +346,7 @@ public final class PriorityStreamByteDistributor implements StreamByteDistributo } void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) { + assert hasFrame || newStreamableBytes == 0; this.hasFrame = hasFrame; int delta = newStreamableBytes - streamableBytes; diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java index dc54155ef2..2d93c63ccb 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2RemoteFlowControllerTest.java @@ -26,8 +26,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; @@ -47,7 +49,6 @@ import junit.framework.AssertionFailedError; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -709,7 +710,7 @@ public class DefaultHttp2RemoteFlowControllerTest { controller.addFlowControlled(stream, flowControlled); controller.writePendingBytes(); - verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt()); + verify(flowControlled, atLeastOnce()).write(any(ChannelHandlerContext.class), anyInt()); verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class)); verify(flowControlled, never()).writeComplete(); @@ -742,7 +743,7 @@ public class DefaultHttp2RemoteFlowControllerTest { fail(); } - verify(flowControlled, times(3)).write(any(ChannelHandlerContext.class), anyInt()); + verify(flowControlled, atLeastOnce()).write(any(ChannelHandlerContext.class), anyInt()); verify(flowControlled).error(any(ChannelHandlerContext.class), any(Throwable.class)); verify(flowControlled, never()).writeComplete(); @@ -753,7 +754,7 @@ public class DefaultHttp2RemoteFlowControllerTest { @Test public void flowControlledWriteCompleteThrowsAnException() throws Exception { final Http2RemoteFlowController.FlowControlled flowControlled = - Mockito.mock(Http2RemoteFlowController.FlowControlled.class); + mock(Http2RemoteFlowController.FlowControlled.class); final AtomicInteger size = new AtomicInteger(150); doAnswer(new Answer() { @Override @@ -798,7 +799,7 @@ public class DefaultHttp2RemoteFlowControllerTest { @Test public void closeStreamInFlowControlledError() throws Exception { final Http2RemoteFlowController.FlowControlled flowControlled = - Mockito.mock(Http2RemoteFlowController.FlowControlled.class); + mock(Http2RemoteFlowController.FlowControlled.class); final Http2Stream stream = stream(STREAM_A); when(flowControlled.size()).thenReturn(100); doThrow(new RuntimeException("write failed")) @@ -922,25 +923,15 @@ public class DefaultHttp2RemoteFlowControllerTest { private static Http2RemoteFlowController.FlowControlled mockedFlowControlledThatThrowsOnWrite() throws Exception { final Http2RemoteFlowController.FlowControlled flowControlled = - Mockito.mock(Http2RemoteFlowController.FlowControlled.class); + mock(Http2RemoteFlowController.FlowControlled.class); when(flowControlled.size()).thenReturn(100); doAnswer(new Answer() { private int invocationCount; @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - switch(invocationCount) { - case 0: - when(flowControlled.size()).thenReturn(50); - invocationCount = 1; - return null; - case 1: - when(flowControlled.size()).thenReturn(20); - invocationCount = 2; - return null; - default: - when(flowControlled.size()).thenReturn(10); - throw new RuntimeException("Write failed"); - } + public Void answer(InvocationOnMock in) throws Throwable { + // Write most of the bytes and then fail + when(flowControlled.size()).thenReturn(10); + throw new RuntimeException("Write failed"); } }).when(flowControlled).write(any(ChannelHandlerContext.class), anyInt()); return flowControlled; diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java index d996e3eaa4..849c52e377 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/PriorityStreamByteDistributorTest.java @@ -136,10 +136,10 @@ public class PriorityStreamByteDistributorTest { doNothing().when(writer).write(same(stream(STREAM_C)), eq(3)); write(10); - verifyWrite(atMost(1), STREAM_A, 1); - verifyWrite(atMost(1), STREAM_B, 2); + verifyWrite(STREAM_A, 1); + verifyWrite(STREAM_B, 2); verifyWrite(times(2), STREAM_C, 3); - verifyWrite(atMost(1), STREAM_D, 4); + verifyWrite(STREAM_D, 4); } /**