diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java index fa71ceebec..a943f776be 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoder.java @@ -19,15 +19,12 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR; import static io.netty.handler.codec.http2.Http2Exception.connectionError; import static io.netty.util.internal.ObjectUtil.checkNotNull; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.SlicedByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import io.netty.channel.CoalescingBufferQueue; import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException; -import io.netty.util.ReferenceCountUtil; import java.util.ArrayDeque; @@ -318,85 +315,48 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { *

*/ private final class FlowControlledData extends FlowControlledBase { - private ByteBuf data; - private int size; + + private final CoalescingBufferQueue queue; private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf buf, int padding, boolean endOfStream, ChannelPromise promise) { super(ctx, stream, padding, endOfStream, promise); - this.data = buf; - size = data.readableBytes() + padding; + queue = new CoalescingBufferQueue(ctx.channel()); + queue.add(buf, promise); } @Override public int size() { - return size; + return queue.readableBytes() + padding; } @Override public void error(Throwable cause) { - ReferenceCountUtil.safeRelease(data); + queue.releaseAndFailAll(cause); lifecycleManager.onException(ctx, cause); - data = null; - size = 0; promise.tryFailure(cause); } @Override public void write(int allowedBytes) { - int bytesWritten = 0; - if (data == null || (allowedBytes == 0 && size != 0)) { - // No point writing an empty DATA frame, wait for a bigger allowance. + if (!endOfStream && (queue.readableBytes() == 0 || allowedBytes == 0)) { + // Nothing to write and we don't have to force a write because of EOS. return; } - try { - int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize(); - do { - int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten); - ByteBuf toWrite; - // Let data consume the frame before padding. - int writeableData = data.readableBytes(); - if (writeableData > allowedFrameSize) { - writeableData = allowedFrameSize; - toWrite = data.readSlice(writeableData).retain(); - } else { - // We're going to write the full buffer which will cause it to be released, for subsequent - // writes just use empty buffer to avoid over-releasing. Have to use an empty buffer - // as we may continue to write padding in subsequent frames. - toWrite = data; - data = Unpooled.EMPTY_BUFFER; - } - int writeablePadding = Math.min(allowedFrameSize - writeableData, padding); - padding -= writeablePadding; - bytesWritten += writeableData + writeablePadding; - ChannelPromise writePromise; - if (size == bytesWritten && !promise.isVoid()) { - // Can use the original promise if it's the last write - writePromise = promise; - } else { - // Create a new promise and listen to it for failure - writePromise = ctx.newPromise(); - writePromise.addListener(this); - } - if (toWrite instanceof SlicedByteBuf && data instanceof CompositeByteBuf) { - // If we're writing a subset of a composite buffer then we want to release - // any underlying buffers that have been consumed. CompositeByteBuf only releases - // underlying buffers on write if all of its data has been consumed and its refCnt becomes - // 0. - final CompositeByteBuf toFree = (CompositeByteBuf) data; - writePromise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - toFree.discardReadComponents(); - } - }); - } - frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding, - size == bytesWritten && endOfStream, writePromise); - } while (size != bytesWritten && allowedBytes > bytesWritten); - } finally { - size -= bytesWritten; - } + int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize(); + do { + int allowedFrameSize = Math.min(maxFrameSize, allowedBytes); + int writeableData = Math.min(queue.readableBytes(), allowedFrameSize); + ChannelPromise writePromise = ctx.newPromise(); + writePromise.addListener(this); + ByteBuf toWrite = queue.remove(writeableData, writePromise); + + int writeablePadding = Math.min(allowedFrameSize - writeableData, padding); + padding -= writeablePadding; + allowedBytes -= writeableData + writeablePadding; + frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding, + endOfStream && size() == 0, writePromise); + } while (size() > 0 && allowedBytes > 0); } @Override @@ -404,39 +364,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { if (FlowControlledData.class != next.getClass()) { return false; } - final FlowControlledData nextData = (FlowControlledData) next; + FlowControlledData nextData = (FlowControlledData) next; + nextData.queue.copyTo(queue); // Given that we're merging data into a frame it doesn't really make sense to accumulate padding. - padding = Math.max(nextData.padding, padding); + padding = Math.max(padding, nextData.padding); endOfStream = nextData.endOfStream; - final CompositeByteBuf compositeByteBuf; - if (data instanceof CompositeByteBuf) { - compositeByteBuf = (CompositeByteBuf) data; - } else { - compositeByteBuf = ctx.alloc().compositeBuffer(Integer.MAX_VALUE); - compositeByteBuf.addComponent(data); - compositeByteBuf.writerIndex(data.readableBytes()); - data = compositeByteBuf; - } - compositeByteBuf.addComponent(nextData.data); - compositeByteBuf.writerIndex(compositeByteBuf.writerIndex() + nextData.data.readableBytes()); - size = data.readableBytes() + padding; - if (!nextData.promise.isVoid()) { - // Replace current promise if void otherwise chain them. - if (promise.isVoid()) { - promise = nextData.promise; - } else { - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - nextData.promise.trySuccess(); - } else { - nextData.promise.tryFailure(future.cause()); - } - } - }); - } - } return true; } } @@ -478,8 +410,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { public void write(int allowedBytes) { if (promise.isVoid()) { promise = ctx.newPromise(); - promise.addListener(this); } + promise.addListener(this); frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive, padding, endOfStream, promise); } @@ -511,9 +443,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder { this.endOfStream = endOfStream; this.stream = stream; this.promise = promise; - if (!promise.isVoid()) { - promise.addListener(this); - } } @Override diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java index 9392f8be27..ddd05b9bba 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/DefaultHttp2ConnectionEncoderTest.java @@ -45,6 +45,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; @@ -219,6 +220,7 @@ public class DefaultHttp2ConnectionEncoderTest { when(ctx.newPromise()).thenReturn(promise); when(ctx.write(any())).thenReturn(future); when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden")); + when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT); encoder = new DefaultHttp2ConnectionEncoder(connection, writer); encoder.lifecycleManager(lifecycleManager); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java index ff21be9287..9b0ddcadf5 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/StreamBufferingEncoderTest.java @@ -109,6 +109,7 @@ public class StreamBufferingEncoderTest { // Set LifeCycleManager on encoder and decoder when(ctx.channel()).thenReturn(channel); when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); + when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); when(ctx.newPromise()).thenReturn(promise); when(channel.isActive()).thenReturn(false); handler.handlerAdded(ctx);