Remove explicit flushes from HTTP2 encoders, decoders & flow-controllers

Motivation:

Allow users of HTTP2 to control when flushes occur so they can optimize network writes.

Modifications:

Removed explicit calls to flush in encoder, decoder & flow-controller
Connection handler now calls flush on read-complete to enable batching writes in response to reads

Result:

Much less flushing occurs for normal HTTP2 request and response patterns.
This commit is contained in:
Louis Ryan 2015-04-23 14:23:23 -07:00 committed by Scott Mitchell
parent ce8c916f1a
commit 8271c8afcc
12 changed files with 79 additions and 92 deletions

View File

@ -443,7 +443,6 @@ public class DefaultHttp2ConnectionDecoder implements Http2ConnectionDecoder {
// Send an ack back to the remote client.
// Need to retain the buffer here since it will be released after the write completes.
encoder.writePing(ctx, true, data.retain(), ctx.newPromise());
ctx.flush();
listener.onPingRead(ctx, data);
}

View File

@ -199,7 +199,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
ChannelFuture future = frameWriter.writePriority(ctx, streamId, streamDependency, weight, exclusive, promise);
ctx.flush();
return future;
}
@ -224,21 +223,18 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
ChannelFuture future = frameWriter.writeSettings(ctx, settings, promise);
ctx.flush();
return future;
}
@Override
public ChannelFuture writeSettingsAck(ChannelHandlerContext ctx, ChannelPromise promise) {
ChannelFuture future = frameWriter.writeSettingsAck(ctx, promise);
ctx.flush();
return future;
}
@Override
public ChannelFuture writePing(ChannelHandlerContext ctx, boolean ack, ByteBuf data, ChannelPromise promise) {
ChannelFuture future = frameWriter.writePing(ctx, ack, data, promise);
ctx.flush();
return future;
}
@ -258,7 +254,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
ChannelFuture future = frameWriter.writePushPromise(ctx, streamId, promisedStreamId, headers, padding, promise);
ctx.flush();
return future;
}
@ -345,16 +340,13 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
@Override
public boolean write(int allowedBytes) {
public void write(int allowedBytes) {
int bytesWritten = 0;
if (data == null || (allowedBytes == 0 && size != 0)) {
// No point writing an empty DATA frame, wait for a bigger allowance.
return;
}
try {
if (data == null) {
return false;
}
if (allowedBytes == 0 && size != 0) {
// No point writing an empty DATA frame, wait for a bigger allowance.
return false;
}
int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
do {
int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten);
@ -386,7 +378,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
size == bytesWritten && endOfStream, writePromise);
} while (size != bytesWritten && allowedBytes > bytesWritten);
return true;
} finally {
size -= bytesWritten;
}
@ -427,10 +418,9 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
}
@Override
public boolean write(int allowedBytes) {
public void write(int allowedBytes) {
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
return true;
}
}

View File

@ -413,7 +413,6 @@ public class DefaultHttp2LocalFlowController implements Http2LocalFlowController
// Send a window update for the stream/connection.
frameWriter.writeWindowUpdate(ctx, stream.id(), deltaWindowSize, ctx.newPromise());
ctx.flush();
}
}

View File

@ -46,7 +46,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
private final Http2Connection.PropertyKey stateKey;
private int initialWindowSize = DEFAULT_WINDOW_SIZE;
private ChannelHandlerContext ctx;
private boolean needFlush;
public DefaultHttp2RemoteFlowController(Http2Connection connection) {
this.connection = checkNotNull(connection, "connection");
@ -185,7 +184,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
AbstractState state = state(stream);
state.incrementStreamWindow(delta);
state.writeBytes(state.writableWindow());
flush();
}
}
@ -207,11 +205,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return;
}
state.writeBytes(state.writableWindow());
try {
flush();
} catch (Throwable t) {
frame.error(t);
}
}
/**
@ -237,16 +230,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
return connectionState().windowSize();
}
/**
* Flushes the {@link ChannelHandlerContext} if we've received any data frames.
*/
private void flush() {
if (needFlush) {
ctx.flush();
needFlush = false;
}
}
/**
* Writes as many pending bytes as possible, according to stream priority.
*/
@ -260,7 +243,6 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Now write all of the allocated bytes.
connection.forEachActiveStream(WRITE_ALLOCATED_BYTES);
flush();
}
}
@ -604,13 +586,10 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
/**
* Writes the frame and decrements the stream and connection window sizes. If the frame is in the pending
* queue, the written bytes are removed from this branch of the priority tree.
* <p>
* Note: this does not flush the {@link ChannelHandlerContext}.
* </p>
*/
private int write(FlowControlled frame, int allowedBytes) {
int before = frame.size();
int writtenBytes = 0;
int writtenBytes;
// In case an exception is thrown we want to remember it and pass it to cancel(Throwable).
Throwable cause = null;
try {
@ -618,7 +597,7 @@ public class DefaultHttp2RemoteFlowController implements Http2RemoteFlowControll
// Write the portion of the frame.
writing = true;
needFlush |= frame.write(max(0, allowedBytes));
frame.write(max(0, allowedBytes));
if (!cancelled && frame.size() == 0) {
// This frame has been fully written, remove this frame and notify it. Since we remove this frame
// first, we're guaranteed that its error method will not be called when we call cancel.

View File

@ -359,6 +359,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
}
ChannelFuture future = goAway(ctx, null);
ctx.flush();
// If there are no active streams, close immediately after the send is complete.
// Otherwise wait until all streams are inactive.
@ -389,6 +390,13 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
ctx.flush();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Trigger flush after read on the assumption that flush is cheap if there is nothing to write and that
// for flow-control the read may release window that causes data to be written that can now be flushed.
ctx.flush();
}
/**
* Handles {@link Http2Exception} objects that were thrown from other handlers. Ignores all other exceptions.
*/
@ -478,6 +486,7 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
} else {
onConnectionError(ctx, cause, embedded);
}
ctx.flush();
}
/**
@ -522,7 +531,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
}
ChannelFuture future = frameWriter().writeRstStream(ctx, streamId, errorCode, promise);
ctx.flush();
// Synchronously set the resetSent flag to prevent any subsequent calls
// from resulting in multiple reset frames being sent.
@ -557,7 +565,6 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
connection.goAwaySent(lastStreamId, errorCode, debugData);
ChannelFuture future = frameWriter().writeGoAway(ctx, lastStreamId, errorCode, debugData, promise);
ctx.flush();
future.addListener(new GenericFutureListener<ChannelFuture>() {
@Override
@ -585,7 +592,8 @@ public class Http2ConnectionHandler extends ByteToMessageDecoder implements Http
}
/**
* Close the remote endpoint with with a {@code GO_AWAY} frame.
* Close the remote endpoint with with a {@code GO_AWAY} frame. Does <strong>not</strong> flush
* immediately, this is the responsibility of the caller.
*/
private ChannelFuture goAway(ChannelHandlerContext ctx, Http2Exception cause) {
long errorCode = cause != null ? cause.error().code() : NO_ERROR.code();

View File

@ -27,8 +27,8 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* guarantee when the data will be written or whether it will be split into multiple frames
* before sending.
* <p>
* Manually flushing the {@link ChannelHandlerContext} is not required, since the flow
* controller will flush as appropriate.
* Manually flushing the {@link ChannelHandlerContext} is required for writes as the flow controller will
* <strong>not</strong> flush by itself.
*
* @param ctx the context from the handler.
* @param stream the subject stream. Must not be the connection stream object.
@ -75,15 +75,14 @@ public interface Http2RemoteFlowController extends Http2FlowController {
* Writes up to {@code allowedBytes} of the encapsulated payload to the stream. Note that
* a value of 0 may be passed which will allow payloads with flow-control size == 0 to be
* written. The flow-controller may call this method multiple times with different values until
* the payload is fully written.
* the payload is fully written, i.e it's size after the write is 0.
* <p>
* When an exception is thrown the {@link Http2RemoteFlowController} will make a call to
* {@link #error(Throwable)}.
* </p>
*
* @param allowedBytes an upper bound on the number of bytes the payload can write at this time.
* @return {@code true} if a flush is required, {@code false} otherwise.
*/
boolean write(int allowedBytes);
void write(int allowedBytes);
}
}

View File

@ -35,6 +35,7 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -48,6 +49,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -161,6 +163,9 @@ public class DefaultHttp2ConnectionDecoderTest {
// Simulate receiving the SETTINGS ACK for the initial settings.
decode().onSettingsAckRead(ctx);
// Disallow any further flushes now that settings ACK has been sent
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
}
@Test
@ -605,7 +610,6 @@ public class DefaultHttp2ConnectionDecoderTest {
@Test
public void settingsReadShouldSetValues() throws Exception {
when(connection.isServer()).thenReturn(true);
Http2Settings settings = new Http2Settings();
settings.pushEnabled(true);
settings.initialWindowSize(123);

View File

@ -60,6 +60,7 @@ import io.netty.util.concurrent.ImmediateEventExecutor;
import java.util.ArrayList;
import java.util.List;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@ -207,6 +208,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(ctx.newSucceededFuture()).thenReturn(future);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
encoder = new DefaultHttp2ConnectionEncoder(connection, writer);
encoder.lifecycleManager(lifecycleManager);
@ -217,7 +219,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
assertTrue(payloadCaptor.getValue().write(8));
payloadCaptor.getValue().write(8);
assertEquals(0, payloadCaptor.getValue().size());
assertEquals("abcdefgh", writtenData.get(0));
assertEquals(0, data.refCnt());
@ -229,7 +231,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 0, true, promise);
assertEquals(payloadCaptor.getValue().size(), 8);
assertTrue(payloadCaptor.getValue().write(8));
payloadCaptor.getValue().write(8);
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abc", writtenData.get(0));
@ -244,7 +246,7 @@ public class DefaultHttp2ConnectionEncoderTest {
final ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 5, true, promise);
assertEquals(payloadCaptor.getValue().size(), 13);
assertTrue(payloadCaptor.getValue().write(13));
payloadCaptor.getValue().write(13);
// writer was called 3 times
assertEquals(3, writtenData.size());
assertEquals("abcde", writtenData.get(0));
@ -262,7 +264,7 @@ public class DefaultHttp2ConnectionEncoderTest {
ByteBuf data = dummyData();
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 18);
assertTrue(payloadCaptor.getValue().write(18));
payloadCaptor.getValue().write(18);
// writer was called 4 times
assertEquals(4, writtenData.size());
assertEquals("abcde", writtenData.get(0));
@ -292,7 +294,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(frameSizePolicy.maxFrameSize()).thenReturn(5);
encoder.writeData(ctx, STREAM_ID, data, 10, true, promise);
assertEquals(payloadCaptor.getValue().size(), 10);
assertTrue(payloadCaptor.getValue().write(10));
payloadCaptor.getValue().write(10);
// writer was called 2 times
assertEquals(2, writtenData.size());
assertEquals("", writtenData.get(0));

View File

@ -30,6 +30,7 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@ -64,6 +65,7 @@ public class DefaultHttp2LocalFlowControllerTest {
MockitoAnnotations.initMocks(this);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2LocalFlowController(connection, frameWriter, updateRatio);

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
@ -40,7 +41,9 @@ import io.netty.util.collection.IntObjectMap;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.AssertionFailedError;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@ -83,6 +86,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
MockitoAnnotations.initMocks(this);
when(ctx.newPromise()).thenReturn(promise);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
connection = new DefaultHttp2Connection(false);
controller = new DefaultHttp2RemoteFlowController(connection);
@ -127,19 +131,17 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
@Test
public void payloadSmallerThanWindowShouldBeSentImmediately() throws Http2Exception {
public void payloadSmallerThanWindowShouldBeWrittenImmediately() throws Http2Exception {
FakeFlowControlled data = new FakeFlowControlled(5);
sendData(STREAM_A, data);
data.assertFullyWritten();
verify(ctx, times(1)).flush();
}
@Test
public void emptyPayloadShouldBeSentImmediately() throws Http2Exception {
public void emptyPayloadShouldBeWrittenImmediately() throws Http2Exception {
FakeFlowControlled data = new FakeFlowControlled(0);
sendData(STREAM_A, data);
data.assertFullyWritten();
verify(ctx, times(1)).flush();
}
@Test
@ -152,7 +154,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
data.assertNotWritten();
sendData(STREAM_A, moreData);
moreData.assertNotWritten();
verify(ctx, never()).flush();
}
@Test
@ -165,7 +166,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
data.assertNotWritten();
sendData(STREAM_A, moreData);
moreData.assertNotWritten();
verify(ctx, never()).flush();
connection.stream(STREAM_A).close();
data.assertError();
@ -180,7 +180,6 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_A, data);
// Verify that a partial frame of 5 remains to be sent
data.assertPartiallyWritten(5);
verify(ctx, times(1)).flush();
}
@Test
@ -193,14 +192,12 @@ public class DefaultHttp2RemoteFlowControllerTest {
sendData(STREAM_A, moreData);
data.assertPartiallyWritten(10);
moreData.assertNotWritten();
verify(ctx, times(1)).flush();
reset(ctx);
// Update the window and verify that the rest of data and some of moreData are written
incrementWindowSize(STREAM_A, 15);
data.assertFullyWritten();
moreData.assertPartiallyWritten(5);
verify(ctx, times(1)).flush();
assertEquals(DEFAULT_WINDOW_SIZE - 25, window(CONNECTION_STREAM_ID));
assertEquals(0, window(STREAM_A));
@ -1109,26 +1106,21 @@ public class DefaultHttp2RemoteFlowControllerTest {
public void flowControlledWriteCompleteThrowsAnException() throws Exception {
final Http2RemoteFlowController.FlowControlled flowControlled =
Mockito.mock(Http2RemoteFlowController.FlowControlled.class);
when(flowControlled.size()).thenReturn(100);
when(flowControlled.write(anyInt())).thenAnswer(new Answer<Boolean>() {
private int invocationCount;
final AtomicInteger size = new AtomicInteger(150);
doAnswer(new Answer<Integer>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
switch(invocationCount) {
case 0:
when(flowControlled.size()).thenReturn(50);
invocationCount = 1;
return true;
case 1:
when(flowControlled.size()).thenReturn(20);
invocationCount = 2;
return true;
default:
when(flowControlled.size()).thenReturn(0);
return false;
}
public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
return size.get();
}
});
}).when(flowControlled).size();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
size.addAndGet(-50);
return null;
}
}).when(flowControlled).write(anyInt());
final Http2Stream stream = stream(STREAM_A);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocationOnMock) {
@ -1148,7 +1140,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
verify(flowControlled, never()).error(any(Throwable.class));
verify(flowControlled).writeComplete();
assertEquals(100, windowBefore - window(STREAM_A));
assertEquals(150, windowBefore - window(STREAM_A));
}
@Test
@ -1157,7 +1149,7 @@ public class DefaultHttp2RemoteFlowControllerTest {
Mockito.mock(Http2RemoteFlowController.FlowControlled.class);
final Http2Stream stream = stream(STREAM_A);
when(flowControlled.size()).thenReturn(100);
when(flowControlled.write(anyInt())).thenThrow(new RuntimeException("write failed"));
doThrow(new RuntimeException("write failed")).when(flowControlled).write(anyInt());
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocationOnMock) {
stream.close();
@ -1176,25 +1168,25 @@ public class DefaultHttp2RemoteFlowControllerTest {
final Http2RemoteFlowController.FlowControlled flowControlled =
Mockito.mock(Http2RemoteFlowController.FlowControlled.class);
when(flowControlled.size()).thenReturn(100);
when(flowControlled.write(anyInt())).thenAnswer(new Answer<Boolean>() {
doAnswer(new Answer<Void>() {
private int invocationCount;
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
switch(invocationCount) {
case 0:
when(flowControlled.size()).thenReturn(50);
invocationCount = 1;
return true;
return null;
case 1:
when(flowControlled.size()).thenReturn(20);
invocationCount = 2;
return true;
return null;
default:
when(flowControlled.size()).thenReturn(10);
throw new RuntimeException("Write failed");
}
}
});
}).when(flowControlled).write(anyInt());
return flowControlled;
}
@ -1265,15 +1257,14 @@ public class DefaultHttp2RemoteFlowControllerTest {
}
@Override
public boolean write(int allowedBytes) {
public void write(int allowedBytes) {
if (allowedBytes <= 0 && currentSize != 0) {
// Write has been called but no data can be written
return false;
return;
}
writeCalled = true;
int written = Math.min(currentSize, allowedBytes);
currentSize -= written;
return true;
}
public int written() {

View File

@ -358,4 +358,11 @@ public class Http2ConnectionHandlerTest {
verify(data).release();
verifyNoMoreInteractions(frameWriter);
}
@Test
public void channelReadCompleteTriggersFlush() throws Exception {
handler = newHandler();
handler.channelReadComplete(ctx);
verify(ctx, times(1)).flush();
}
}

View File

@ -130,6 +130,7 @@ public class Http2ConnectionRoundtripTest {
public void run() {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
ctx().flush();
}
});
@ -174,6 +175,7 @@ public class Http2ConnectionRoundtripTest {
public void run() {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
ctx().flush();
}
});
@ -206,6 +208,7 @@ public class Http2ConnectionRoundtripTest {
public void run() {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0, false,
newPromise());
ctx().flush();
}
});
@ -237,6 +240,7 @@ public class Http2ConnectionRoundtripTest {
public void run() {
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
true, newPromise());
ctx().flush();
}
});
@ -247,6 +251,7 @@ public class Http2ConnectionRoundtripTest {
public void run() {
http2Client.encoder().writeHeaders(ctx(), Integer.MAX_VALUE + 1, headers, 0, (short) 16, false, 0,
true, newPromise());
ctx().flush();
}
});
@ -292,6 +297,7 @@ public class Http2ConnectionRoundtripTest {
// Write trailers.
http2Client.encoder().writeHeaders(ctx(), 3, headers, 0, (short) 16, false, 0,
true, newPromise());
ctx().flush();
}
});
@ -376,6 +382,7 @@ public class Http2ConnectionRoundtripTest {
// Write trailers.
http2Client.encoder().writeHeaders(ctx(), streamId, headers, 0, (short) 16,
false, 0, true, newPromise());
ctx().flush();
}
}
});