Use CoalescingBufferQueue to merge data writes on a stream in HTTP2 instead of CompositeByteBuf

Motivation:

Slicing a mutable CompositeByteBuf is not the appropriate mechanism to use to track and release buffers that have been written to a channel.
In particular buffers passed over an Embedded or LocalChannel are retained after the ChannelPromise is completed and listening to the
promise to consolidate a CompositeBuffer breaks slices taken from the composite as the offset indices have changed.

In addition CoalescingBufferQueue handles taking arbitrarily sized slices of a sequence of buffers more efficiently.

Modifications:

Convert FlowControlledData to use a CoalescingBufferQueue to handle merging data writes.

Result:

HTTP2 works over LocalChannel and code is considerably simpler.
This commit is contained in:
Louis Ryan 2015-06-30 12:31:08 -07:00 committed by nmittler
parent e949dcd94f
commit 60c59f39af
3 changed files with 30 additions and 98 deletions

View File

@ -19,15 +19,12 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.SlicedByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.CoalescingBufferQueue;
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayDeque;
@ -318,85 +315,48 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
* </p>
*/
private final class FlowControlledData extends FlowControlledBase {
private ByteBuf data;
private int size;
private final CoalescingBufferQueue queue;
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf buf, int padding,
boolean endOfStream, ChannelPromise promise) {
super(ctx, stream, padding, endOfStream, promise);
this.data = buf;
size = data.readableBytes() + padding;
queue = new CoalescingBufferQueue(ctx.channel());
queue.add(buf, promise);
}
@Override
public int size() {
return size;
return queue.readableBytes() + padding;
}
@Override
public void error(Throwable cause) {
ReferenceCountUtil.safeRelease(data);
queue.releaseAndFailAll(cause);
lifecycleManager.onException(ctx, cause);
data = null;
size = 0;
promise.tryFailure(cause);
}
@Override
public void write(int allowedBytes) {
int bytesWritten = 0;
if (data == null || (allowedBytes == 0 && size != 0)) {
// No point writing an empty DATA frame, wait for a bigger allowance.
if (!endOfStream && (queue.readableBytes() == 0 || allowedBytes == 0)) {
// Nothing to write and we don't have to force a write because of EOS.
return;
}
try {
int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
do {
int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten);
ByteBuf toWrite;
// Let data consume the frame before padding.
int writeableData = data.readableBytes();
if (writeableData > allowedFrameSize) {
writeableData = allowedFrameSize;
toWrite = data.readSlice(writeableData).retain();
} else {
// We're going to write the full buffer which will cause it to be released, for subsequent
// writes just use empty buffer to avoid over-releasing. Have to use an empty buffer
// as we may continue to write padding in subsequent frames.
toWrite = data;
data = Unpooled.EMPTY_BUFFER;
}
int writeablePadding = Math.min(allowedFrameSize - writeableData, padding);
padding -= writeablePadding;
bytesWritten += writeableData + writeablePadding;
ChannelPromise writePromise;
if (size == bytesWritten && !promise.isVoid()) {
// Can use the original promise if it's the last write
writePromise = promise;
} else {
// Create a new promise and listen to it for failure
writePromise = ctx.newPromise();
writePromise.addListener(this);
}
if (toWrite instanceof SlicedByteBuf && data instanceof CompositeByteBuf) {
// If we're writing a subset of a composite buffer then we want to release
// any underlying buffers that have been consumed. CompositeByteBuf only releases
// underlying buffers on write if all of its data has been consumed and its refCnt becomes
// 0.
final CompositeByteBuf toFree = (CompositeByteBuf) data;
writePromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
toFree.discardReadComponents();
}
});
}
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
size == bytesWritten && endOfStream, writePromise);
} while (size != bytesWritten && allowedBytes > bytesWritten);
} finally {
size -= bytesWritten;
}
int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
do {
int allowedFrameSize = Math.min(maxFrameSize, allowedBytes);
int writeableData = Math.min(queue.readableBytes(), allowedFrameSize);
ChannelPromise writePromise = ctx.newPromise();
writePromise.addListener(this);
ByteBuf toWrite = queue.remove(writeableData, writePromise);
int writeablePadding = Math.min(allowedFrameSize - writeableData, padding);
padding -= writeablePadding;
allowedBytes -= writeableData + writeablePadding;
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
endOfStream && size() == 0, writePromise);
} while (size() > 0 && allowedBytes > 0);
}
@Override
@ -404,39 +364,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
if (FlowControlledData.class != next.getClass()) {
return false;
}
final FlowControlledData nextData = (FlowControlledData) next;
FlowControlledData nextData = (FlowControlledData) next;
nextData.queue.copyTo(queue);
// Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
padding = Math.max(nextData.padding, padding);
padding = Math.max(padding, nextData.padding);
endOfStream = nextData.endOfStream;
final CompositeByteBuf compositeByteBuf;
if (data instanceof CompositeByteBuf) {
compositeByteBuf = (CompositeByteBuf) data;
} else {
compositeByteBuf = ctx.alloc().compositeBuffer(Integer.MAX_VALUE);
compositeByteBuf.addComponent(data);
compositeByteBuf.writerIndex(data.readableBytes());
data = compositeByteBuf;
}
compositeByteBuf.addComponent(nextData.data);
compositeByteBuf.writerIndex(compositeByteBuf.writerIndex() + nextData.data.readableBytes());
size = data.readableBytes() + padding;
if (!nextData.promise.isVoid()) {
// Replace current promise if void otherwise chain them.
if (promise.isVoid()) {
promise = nextData.promise;
} else {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
nextData.promise.trySuccess();
} else {
nextData.promise.tryFailure(future.cause());
}
}
});
}
}
return true;
}
}
@ -478,8 +410,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
public void write(int allowedBytes) {
if (promise.isVoid()) {
promise = ctx.newPromise();
promise.addListener(this);
}
promise.addListener(this);
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
padding, endOfStream, promise);
}
@ -511,9 +443,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
this.endOfStream = endOfStream;
this.stream = stream;
this.promise = promise;
if (!promise.isVoid()) {
promise.addListener(this);
}
}
@Override

View File

@ -45,6 +45,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
@ -219,6 +220,7 @@ public class DefaultHttp2ConnectionEncoderTest {
when(ctx.newPromise()).thenReturn(promise);
when(ctx.write(any())).thenReturn(future);
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT);
encoder = new DefaultHttp2ConnectionEncoder(connection, writer);
encoder.lifecycleManager(lifecycleManager);

View File

@ -109,6 +109,7 @@ public class StreamBufferingEncoderTest {
// Set LifeCycleManager on encoder and decoder
when(ctx.channel()).thenReturn(channel);
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
when(ctx.newPromise()).thenReturn(promise);
when(channel.isActive()).thenReturn(false);
handler.handlerAdded(ctx);