Fixing HTTP/2 data write completion bug

Motivation:

A recent refactoring of the outbound flow controller interface
introduced a bug when writing data. We're no longer properly handling
the completion of the write (i.e. updating stream state/handling error).

Modifications:

Updated AbstractHttp2ConnectionHandler.writeData to properly handle the
completion of the write future.

Result:

DATA writes now perform post-write cleanup.
This commit is contained in:
nmittler 2014-08-28 09:36:24 -07:00
parent 2128753484
commit 4599b7a0ba
2 changed files with 58 additions and 3 deletions

View File

@ -19,6 +19,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGH
import static io.netty.handler.codec.http2.Http2CodecUtil.HTTP_UPGRADE_STREAM_ID;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toByteBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.toHttp2Exception;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.STREAM_CLOSED;
@ -345,8 +346,8 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
/**
* Writes (and flushes) the given data to the remote endpoint.
*/
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, final ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
public ChannelFuture writeData(final ChannelHandlerContext ctx, final int streamId, final ByteBuf data,
int padding, final boolean endStream, ChannelPromise promise) {
try {
if (connection.isGoAway()) {
throw protocolError("Sending data after connection going away.");
@ -356,7 +357,22 @@ public abstract class AbstractHttp2ConnectionHandler extends ByteToMessageDecode
stream.verifyState(PROTOCOL_ERROR, OPEN, HALF_CLOSED_REMOTE);
// Hand control of the frame to the flow controller.
return outboundFlow.writeData(ctx, streamId, data, padding, endStream, promise);
ChannelFuture future = outboundFlow.writeData(ctx, streamId, data, padding, endStream, promise);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
// The write failed, handle the error.
onHttp2Exception(ctx, toHttp2Exception(future.cause()));
} else if (endStream) {
// Close the local side of the stream if this is the last frame
Http2Stream stream = connection.stream(streamId);
closeLocalSide(stream, ctx.newPromise());
}
}
});
return future;
} catch (Http2Exception e) {
promise.setFailure(e);
return promise;

View File

@ -22,6 +22,7 @@ import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.Http2CodecUtil.connectionPrefaceBuf;
import static io.netty.handler.codec.http2.Http2CodecUtil.emptyPingBuf;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.NO_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.protocolError;
@ -45,12 +46,15 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.util.CharsetUtil;
import java.util.Collections;
@ -59,6 +63,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/**
@ -135,6 +140,8 @@ public class DelegatingHttp2ConnectionHandlerTest {
future);
when(writer.writeGoAway(eq(ctx), anyInt(), anyInt(), any(ByteBuf.class), eq(promise)))
.thenReturn(future);
when(outboundFlow.writeData(eq(ctx), anyInt(), any(ByteBuf.class), anyInt(),
anyBoolean(), eq(promise))) .thenReturn(future);
mockContext();
handler =
@ -458,6 +465,38 @@ public class DelegatingHttp2ConnectionHandlerTest {
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(false), eq(promise));
}
@Test
public void dataWriteShouldHalfCloseStream() throws Exception {
reset(future);
handler.writeData(ctx, STREAM_ID, dummyData(), 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write completed successfully.
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
verify(future).addListener(captor.capture());
when(future.isSuccess()).thenReturn(true);
captor.getValue().operationComplete(future);
verify(stream).closeLocalSide();
}
@Test
public void dataWriteWithFailureShouldHandleException() throws Exception {
reset(future);
handler.writeData(ctx, STREAM_ID, dummyData(), 0, true, promise);
verify(outboundFlow).writeData(eq(ctx), eq(STREAM_ID), eq(dummyData()), eq(0), eq(true), eq(promise));
// Invoke the listener callback indicating that the write failed.
String msg = "fake exception";
ArgumentCaptor<ChannelFutureListener> captor = ArgumentCaptor.forClass(ChannelFutureListener.class);
verify(future).addListener(captor.capture());
when(future.isSuccess()).thenReturn(false);
when(future.cause()).thenReturn(new RuntimeException(msg));
captor.getValue().operationComplete(future);
verify(writer).writeGoAway(eq(ctx), eq(0), eq((long) INTERNAL_ERROR.code()),
eq(Unpooled.wrappedBuffer(msg.getBytes(UTF_8))), eq(promise));
verify(remote).goAwayReceived(0);
}
@Test
public void headersWriteAfterGoAwayShouldFail() throws Exception {
when(connection.isGoAway()).thenReturn(true);