Use CoalescingBufferQueue to merge data writes on a stream in HTTP2 instead of CompositeByteBuf
Motivation: Slicing a mutable CompositeByteBuf is not the appropriate mechanism to use to track and release buffers that have been written to a channel. In particular buffers passed over an Embedded or LocalChannel are retained after the ChannelPromise is completed and listening to the promise to consolidate a CompositeBuffer breaks slices taken from the composite as the offset indices have changed. In addition CoalescingBufferQueue handles taking arbitrarily sized slices of a sequence of buffers more efficiently. Modifications: Convert FlowControlledData to use a CoalescingBufferQueue to handle merging data writes. Result: HTTP2 works over LocalChannel and code is considerably simpler.
This commit is contained in:
parent
508d6e3b31
commit
0b12592acb
@ -19,15 +19,12 @@ import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.SlicedByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.CoalescingBufferQueue;
|
||||
import io.netty.handler.codec.http2.Http2Exception.ClosedStreamCreationException;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
|
||||
@ -318,85 +315,48 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
* </p>
|
||||
*/
|
||||
private final class FlowControlledData extends FlowControlledBase {
|
||||
private ByteBuf data;
|
||||
private int size;
|
||||
|
||||
private final CoalescingBufferQueue queue;
|
||||
|
||||
private FlowControlledData(ChannelHandlerContext ctx, Http2Stream stream, ByteBuf buf, int padding,
|
||||
boolean endOfStream, ChannelPromise promise) {
|
||||
super(ctx, stream, padding, endOfStream, promise);
|
||||
this.data = buf;
|
||||
size = data.readableBytes() + padding;
|
||||
queue = new CoalescingBufferQueue(ctx.channel());
|
||||
queue.add(buf, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return size;
|
||||
return queue.readableBytes() + padding;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(Throwable cause) {
|
||||
ReferenceCountUtil.safeRelease(data);
|
||||
queue.releaseAndFailAll(cause);
|
||||
lifecycleManager.onException(ctx, cause);
|
||||
data = null;
|
||||
size = 0;
|
||||
promise.tryFailure(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int allowedBytes) {
|
||||
int bytesWritten = 0;
|
||||
if (data == null || (allowedBytes == 0 && size != 0)) {
|
||||
// No point writing an empty DATA frame, wait for a bigger allowance.
|
||||
if (!endOfStream && (queue.readableBytes() == 0 || allowedBytes == 0)) {
|
||||
// Nothing to write and we don't have to force a write because of EOS.
|
||||
return;
|
||||
}
|
||||
try {
|
||||
int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
|
||||
do {
|
||||
int allowedFrameSize = Math.min(maxFrameSize, allowedBytes - bytesWritten);
|
||||
ByteBuf toWrite;
|
||||
// Let data consume the frame before padding.
|
||||
int writeableData = data.readableBytes();
|
||||
if (writeableData > allowedFrameSize) {
|
||||
writeableData = allowedFrameSize;
|
||||
toWrite = data.readSlice(writeableData).retain();
|
||||
} else {
|
||||
// We're going to write the full buffer which will cause it to be released, for subsequent
|
||||
// writes just use empty buffer to avoid over-releasing. Have to use an empty buffer
|
||||
// as we may continue to write padding in subsequent frames.
|
||||
toWrite = data;
|
||||
data = Unpooled.EMPTY_BUFFER;
|
||||
}
|
||||
int writeablePadding = Math.min(allowedFrameSize - writeableData, padding);
|
||||
padding -= writeablePadding;
|
||||
bytesWritten += writeableData + writeablePadding;
|
||||
ChannelPromise writePromise;
|
||||
if (size == bytesWritten && !promise.isVoid()) {
|
||||
// Can use the original promise if it's the last write
|
||||
writePromise = promise;
|
||||
} else {
|
||||
// Create a new promise and listen to it for failure
|
||||
writePromise = ctx.newPromise();
|
||||
writePromise.addListener(this);
|
||||
}
|
||||
if (toWrite instanceof SlicedByteBuf && data instanceof CompositeByteBuf) {
|
||||
// If we're writing a subset of a composite buffer then we want to release
|
||||
// any underlying buffers that have been consumed. CompositeByteBuf only releases
|
||||
// underlying buffers on write if all of its data has been consumed and its refCnt becomes
|
||||
// 0.
|
||||
final CompositeByteBuf toFree = (CompositeByteBuf) data;
|
||||
writePromise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
toFree.discardReadComponents();
|
||||
}
|
||||
});
|
||||
}
|
||||
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
|
||||
size == bytesWritten && endOfStream, writePromise);
|
||||
} while (size != bytesWritten && allowedBytes > bytesWritten);
|
||||
} finally {
|
||||
size -= bytesWritten;
|
||||
}
|
||||
int maxFrameSize = frameWriter().configuration().frameSizePolicy().maxFrameSize();
|
||||
do {
|
||||
int allowedFrameSize = Math.min(maxFrameSize, allowedBytes);
|
||||
int writeableData = Math.min(queue.readableBytes(), allowedFrameSize);
|
||||
ChannelPromise writePromise = ctx.newPromise();
|
||||
writePromise.addListener(this);
|
||||
ByteBuf toWrite = queue.remove(writeableData, writePromise);
|
||||
|
||||
int writeablePadding = Math.min(allowedFrameSize - writeableData, padding);
|
||||
padding -= writeablePadding;
|
||||
allowedBytes -= writeableData + writeablePadding;
|
||||
frameWriter().writeData(ctx, stream.id(), toWrite, writeablePadding,
|
||||
endOfStream && size() == 0, writePromise);
|
||||
} while (size() > 0 && allowedBytes > 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -404,39 +364,11 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
if (FlowControlledData.class != next.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final FlowControlledData nextData = (FlowControlledData) next;
|
||||
FlowControlledData nextData = (FlowControlledData) next;
|
||||
nextData.queue.copyTo(queue);
|
||||
// Given that we're merging data into a frame it doesn't really make sense to accumulate padding.
|
||||
padding = Math.max(nextData.padding, padding);
|
||||
padding = Math.max(padding, nextData.padding);
|
||||
endOfStream = nextData.endOfStream;
|
||||
final CompositeByteBuf compositeByteBuf;
|
||||
if (data instanceof CompositeByteBuf) {
|
||||
compositeByteBuf = (CompositeByteBuf) data;
|
||||
} else {
|
||||
compositeByteBuf = ctx.alloc().compositeBuffer(Integer.MAX_VALUE);
|
||||
compositeByteBuf.addComponent(data);
|
||||
compositeByteBuf.writerIndex(data.readableBytes());
|
||||
data = compositeByteBuf;
|
||||
}
|
||||
compositeByteBuf.addComponent(nextData.data);
|
||||
compositeByteBuf.writerIndex(compositeByteBuf.writerIndex() + nextData.data.readableBytes());
|
||||
size = data.readableBytes() + padding;
|
||||
if (!nextData.promise.isVoid()) {
|
||||
// Replace current promise if void otherwise chain them.
|
||||
if (promise.isVoid()) {
|
||||
promise = nextData.promise;
|
||||
} else {
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
nextData.promise.trySuccess();
|
||||
} else {
|
||||
nextData.promise.tryFailure(future.cause());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -478,8 +410,8 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
public void write(int allowedBytes) {
|
||||
if (promise.isVoid()) {
|
||||
promise = ctx.newPromise();
|
||||
promise.addListener(this);
|
||||
}
|
||||
promise.addListener(this);
|
||||
frameWriter().writeHeaders(ctx, stream.id(), headers, streamDependency, weight, exclusive,
|
||||
padding, endOfStream, promise);
|
||||
}
|
||||
@ -511,9 +443,6 @@ public class DefaultHttp2ConnectionEncoder implements Http2ConnectionEncoder {
|
||||
this.endOfStream = endOfStream;
|
||||
this.stream = stream;
|
||||
this.promise = promise;
|
||||
if (!promise.isVoid()) {
|
||||
promise.addListener(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -45,6 +45,7 @@ import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
@ -219,6 +220,7 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
when(ctx.newPromise()).thenReturn(promise);
|
||||
when(ctx.write(any())).thenReturn(future);
|
||||
when(ctx.flush()).thenThrow(new AssertionFailedError("forbidden"));
|
||||
when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT);
|
||||
|
||||
encoder = new DefaultHttp2ConnectionEncoder(connection, writer);
|
||||
encoder.lifecycleManager(lifecycleManager);
|
||||
|
@ -109,6 +109,7 @@ public class StreamBufferingEncoderTest {
|
||||
// Set LifeCycleManager on encoder and decoder
|
||||
when(ctx.channel()).thenReturn(channel);
|
||||
when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
|
||||
when(channel.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
|
||||
when(ctx.newPromise()).thenReturn(promise);
|
||||
when(channel.isActive()).thenReturn(false);
|
||||
handler.handlerAdded(ctx);
|
||||
|
Loading…
Reference in New Issue
Block a user