DefaultHttp2FrameWriter reduce object allocation

Motivation:
DefaultHttp2FrameWriter#writeData allocates a DataFrameHeader for each write operation. DataFrameHeader maintains internal state and allocates multiple slices of a buffer which is a maximum of 30 bytes. This 30 byte buffer may not always be necessary and the additional slice operations can utilize retainedSlice to take advantage of pooled objects. We can also save computation and object allocations if there is no padding which is a common case in practice.

Modifications:
- Remove DataFrameHeader
- Add a fast path for padding == 0

Result:
Less object allocation in DefaultHttp2FrameWriter
This commit is contained in:
Scott Mitchell 2017-11-17 23:22:29 -08:00
parent 844d804aba
commit e6126215e0
8 changed files with 351 additions and 678 deletions

View File

@ -135,53 +135,116 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
int padding, boolean endStream, ChannelPromise promise) {
final SimpleChannelPromiseAggregator promiseAggregator =
new SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
final DataFrameHeader header = new DataFrameHeader(ctx, streamId);
boolean needToReleaseHeaders = true;
boolean needToReleaseData = true;
ByteBuf frameHeader = null;
try {
verifyStreamId(streamId, STREAM_ID);
verifyPadding(padding);
boolean lastFrame;
int remainingData = data.readableBytes();
do {
// Determine how much data and padding to write in this frame. Put all padding at the end.
int frameDataBytes = min(remainingData, maxFrameSize);
int framePaddingBytes = min(padding, max(0, (maxFrameSize - 1) - frameDataBytes));
Http2Flags flags = new Http2Flags();
flags.endOfStream(false);
flags.paddingPresent(false);
// Fast path to write frames of payload size maxFrameSize first.
if (remainingData > maxFrameSize) {
frameHeader = ctx.alloc().buffer(FRAME_HEADER_LENGTH);
writeFrameHeaderInternal(frameHeader, maxFrameSize, DATA, flags, streamId);
do {
// Write the header.
ctx.write(frameHeader.retainedSlice(), promiseAggregator.newPromise());
// Decrement the remaining counters.
padding -= framePaddingBytes;
remainingData -= frameDataBytes;
// Write the payload.
ctx.write(data.readRetainedSlice(maxFrameSize), promiseAggregator.newPromise());
// Determine whether or not this is the last frame to be sent.
lastFrame = remainingData == 0 && padding == 0;
remainingData -= maxFrameSize;
// Stop iterating if remainingData == maxFrameSize so we can take care of reference counts below.
} while (remainingData > maxFrameSize);
}
// Only the last frame is not retained. Until then, the outer finally must release.
ByteBuf frameHeader = header.slice(frameDataBytes, framePaddingBytes, lastFrame && endStream);
needToReleaseHeaders = !lastFrame;
ctx.write(lastFrame ? frameHeader : frameHeader.retain(), promiseAggregator.newPromise());
// Write the frame data.
ByteBuf frameData = data.readSlice(frameDataBytes);
// Only the last frame is not retained. Until then, the outer finally must release.
needToReleaseData = !lastFrame;
ctx.write(lastFrame ? frameData : frameData.retain(), promiseAggregator.newPromise());
// Write the frame padding.
if (paddingBytes(framePaddingBytes) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)), promiseAggregator.newPromise());
if (padding == 0) {
// Write the header.
if (frameHeader != null) {
frameHeader.release();
frameHeader = null;
}
} while (!lastFrame);
} catch (Throwable t) {
ByteBuf frameHeader2 = ctx.alloc().buffer(FRAME_HEADER_LENGTH);
flags.endOfStream(endStream);
writeFrameHeaderInternal(frameHeader2, remainingData, DATA, flags, streamId);
ctx.write(frameHeader2, promiseAggregator.newPromise());
// Write the payload.
ByteBuf lastFrame = data.readSlice(remainingData);
data = null;
ctx.write(lastFrame, promiseAggregator.newPromise());
} else {
if (remainingData != maxFrameSize) {
if (frameHeader != null) {
frameHeader.release();
frameHeader = null;
}
} else {
remainingData -= maxFrameSize;
// Write the header.
ByteBuf lastFrame;
if (frameHeader == null) {
lastFrame = ctx.alloc().buffer(FRAME_HEADER_LENGTH);
writeFrameHeaderInternal(lastFrame, maxFrameSize, DATA, flags, streamId);
} else {
lastFrame = frameHeader.slice();
frameHeader = null;
}
ctx.write(lastFrame, promiseAggregator.newPromise());
// Write the payload.
lastFrame = data.readSlice(maxFrameSize);
data = null;
ctx.write(lastFrame, promiseAggregator.newPromise());
}
do {
int frameDataBytes = min(remainingData, maxFrameSize);
int framePaddingBytes = min(padding, max(0, (maxFrameSize - 1) - frameDataBytes));
// Decrement the remaining counters.
padding -= framePaddingBytes;
remainingData -= frameDataBytes;
// Write the header.
ByteBuf frameHeader2 = ctx.alloc().buffer(DATA_FRAME_HEADER_LENGTH);
flags.endOfStream(endStream && remainingData == 0 && padding == 0);
flags.paddingPresent(framePaddingBytes > 0);
writeFrameHeaderInternal(frameHeader2, framePaddingBytes + frameDataBytes, DATA, flags, streamId);
writePaddingLength(frameHeader2, framePaddingBytes);
ctx.write(frameHeader2, promiseAggregator.newPromise());
// Write the payload.
if (frameDataBytes != 0) {
if (remainingData == 0) {
ByteBuf lastFrame = data.readSlice(frameDataBytes);
data = null;
ctx.write(lastFrame, promiseAggregator.newPromise());
} else {
ctx.write(data.readRetainedSlice(frameDataBytes), promiseAggregator.newPromise());
}
}
// Write the frame padding.
if (paddingBytes(framePaddingBytes) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)),
promiseAggregator.newPromise());
}
} while (remainingData != 0 || padding != 0);
}
} catch (Throwable cause) {
if (frameHeader != null) {
frameHeader.release();
}
// Use a try/finally here in case the data has been released before calling this method. This is not
// necessary above because we internally allocate frameHeader.
try {
if (needToReleaseHeaders) {
header.release();
}
if (needToReleaseData) {
if (data != null) {
data.release();
}
} finally {
promiseAggregator.setFailure(t);
promiseAggregator.setFailure(cause);
promiseAggregator.doneAllocatingPromises();
}
return promiseAggregator;
@ -607,53 +670,4 @@ public class DefaultHttp2FrameWriter implements Http2FrameWriter, Http2FrameSize
throw new IllegalArgumentException("Opaque data must be " + PING_FRAME_PAYLOAD_LENGTH + " bytes");
}
}
/**
* Utility class that manages the creation of frame header buffers for {@code DATA} frames. Attempts
* to reuse the same buffer repeatedly when splitting data into multiple frames.
*/
private static final class DataFrameHeader {
private final int streamId;
private final ByteBuf buffer;
private final Http2Flags flags = new Http2Flags();
private int prevData;
private int prevPadding;
private ByteBuf frameHeader;
DataFrameHeader(ChannelHandlerContext ctx, int streamId) {
// All padding will be put at the end, so in the worst case we need 3 headers:
// a repeated no-padding frame of maxFrameSize, a frame that has part data and part
// padding, and a frame that has the remainder of the padding.
buffer = ctx.alloc().buffer(3 * DATA_FRAME_HEADER_LENGTH);
this.streamId = streamId;
}
/**
* Gets the frame header buffer configured for the current frame.
*/
ByteBuf slice(int data, int padding, boolean endOfStream) {
// Since we're reusing the current frame header whenever possible, check if anything changed
// that requires a new header.
if (data != prevData || padding != prevPadding
|| endOfStream != flags.endOfStream() || frameHeader == null) {
// Update the header state.
prevData = data;
prevPadding = padding;
flags.paddingPresent(padding > 0);
flags.endOfStream(endOfStream);
frameHeader = buffer.slice(buffer.readerIndex(), DATA_FRAME_HEADER_LENGTH).writerIndex(0);
buffer.setIndex(buffer.readerIndex() + DATA_FRAME_HEADER_LENGTH,
buffer.writerIndex() + DATA_FRAME_HEADER_LENGTH);
int payloadLength = data + padding;
writeFrameHeaderInternal(frameHeader, payloadLength, DATA, flags, streamId);
writePaddingLength(frameHeader, padding);
}
return frameHeader.slice();
}
void release() {
buffer.release();
}
}
}

View File

@ -31,7 +31,6 @@
<properties>
<!-- Skip tests by default; run only if -DskipTests=false is specified -->
<skipTests>true</skipTests>
<epoll.arch>x86_64</epoll.arch>
<jmh.version>1.19</jmh.version>
</properties>
@ -43,17 +42,6 @@
<family>linux</family>
</os>
</activation>
<properties>
<epoll.arch>${os.detected.arch}</epoll.arch>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${project.version}</version>
<classifier>${jni.classifier}</classifier>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
@ -167,13 +155,6 @@
</executions>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
</build>
</project>

View File

@ -38,8 +38,8 @@ import java.util.Map;
/**
* Utility methods for hpack tests.
*/
public final class HpackUtil {
private HpackUtil() {
public final class HpackBenchmarkUtil {
private HpackBenchmarkUtil() {
}
/**

View File

@ -32,7 +32,6 @@
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
@ -43,6 +42,8 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static io.netty.handler.codec.http2.HpackBenchmarkUtil.http2Headers;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_HEADER_LIST_SIZE;
public class HpackDecoderBenchmark extends AbstractMicrobenchmark {
@ -60,7 +61,7 @@ public class HpackDecoderBenchmark extends AbstractMicrobenchmark {
@Setup(Level.Trial)
public void setup() throws Http2Exception {
input = Unpooled.wrappedBuffer(getSerializedHeaders(HpackUtil.http2Headers(size, limitToAscii), sensitive));
input = wrappedBuffer(getSerializedHeaders(http2Headers(size, limitToAscii), sensitive));
}
@TearDown(Level.Trial)

View File

@ -79,7 +79,7 @@ public class HpackEncoderBenchmark extends AbstractMicrobenchmark {
@Setup(Level.Trial)
public void setup() {
http2Headers = HpackUtil.http2Headers(size, limitToAscii);
http2Headers = HpackBenchmarkUtil.http2Headers(size, limitToAscii);
if (duplicates) {
int size = http2Headers.size();
if (size > 0) {

View File

@ -45,7 +45,7 @@ public class HpackUtilBenchmark extends AbstractMicrobenchmark {
@Setup(Level.Trial)
public void setup() {
hpackHeaders = HpackUtil.headers(size, false);
hpackHeaders = HpackBenchmarkUtil.headers(size, false);
}
@Benchmark

View File

@ -0,0 +1,247 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import java.util.concurrent.TimeUnit;
import static io.netty.buffer.Unpooled.directBuffer;
import static io.netty.buffer.Unpooled.unreleasableBuffer;
import static io.netty.handler.codec.http2.Http2CodecUtil.DATA_FRAME_HEADER_LENGTH;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_BYTE;
import static io.netty.handler.codec.http2.Http2CodecUtil.verifyPadding;
import static io.netty.handler.codec.http2.Http2CodecUtil.writeFrameHeaderInternal;
import static io.netty.handler.codec.http2.Http2FrameTypes.DATA;
import static java.lang.Math.max;
import static java.lang.Math.min;
@Fork(1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class Http2FrameWriterDataBenchmark extends AbstractMicrobenchmark {
@Param({ "64", "1024", "4096", "16384", "1048576", "4194304" })
public int payloadSize;
@Param({ "0", "100", "255" })
public int padding;
@Param({ "true", "false" })
public boolean pooled;
private ByteBuf payload;
private ChannelHandlerContext ctx;
private Http2DataWriter writer;
private Http2DataWriter oldWriter;
@Setup(Level.Trial)
public void setup() {
writer = new DefaultHttp2FrameWriter();
oldWriter = new OldDefaultHttp2FrameWriter();
payload = pooled ? PooledByteBufAllocator.DEFAULT.buffer(payloadSize) : Unpooled.buffer(payloadSize);
payload.writeZero(payloadSize);
ctx = new EmbeddedChannelWriteReleaseHandlerContext(
pooled ? PooledByteBufAllocator.DEFAULT : UnpooledByteBufAllocator.DEFAULT,
new ChannelInboundHandlerAdapter()) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
};
}
@TearDown(Level.Trial)
public void teardown() throws Exception {
if (payload != null) {
payload.release();
}
if (ctx != null) {
ctx.close();
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void newWriter() {
writer.writeData(ctx, 3, payload.retain(), padding, true, ctx.voidPromise());
ctx.flush();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
public void oldWriter() {
oldWriter.writeData(ctx, 3, payload.retain(), padding, true, ctx.voidPromise());
ctx.flush();
}
private static final class OldDefaultHttp2FrameWriter implements Http2DataWriter {
private static final ByteBuf ZERO_BUFFER =
unreleasableBuffer(directBuffer(MAX_UNSIGNED_BYTE).writeZero(MAX_UNSIGNED_BYTE)).asReadOnly();
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
@Override
public ChannelFuture writeData(ChannelHandlerContext ctx, int streamId, ByteBuf data,
int padding, boolean endStream, ChannelPromise promise) {
final Http2CodecUtil.SimpleChannelPromiseAggregator promiseAggregator =
new Http2CodecUtil.SimpleChannelPromiseAggregator(promise, ctx.channel(), ctx.executor());
final DataFrameHeader header = new DataFrameHeader(ctx, streamId);
boolean needToReleaseHeaders = true;
boolean needToReleaseData = true;
try {
verifyStreamId(streamId, "Stream ID");
verifyPadding(padding);
boolean lastFrame;
int remainingData = data.readableBytes();
do {
// Determine how much data and padding to write in this frame. Put all padding at the end.
int frameDataBytes = min(remainingData, maxFrameSize);
int framePaddingBytes = min(padding, max(0, (maxFrameSize - 1) - frameDataBytes));
// Decrement the remaining counters.
padding -= framePaddingBytes;
remainingData -= frameDataBytes;
// Determine whether or not this is the last frame to be sent.
lastFrame = remainingData == 0 && padding == 0;
// Only the last frame is not retained. Until then, the outer finally must release.
ByteBuf frameHeader = header.slice(frameDataBytes, framePaddingBytes, lastFrame && endStream);
needToReleaseHeaders = !lastFrame;
ctx.write(lastFrame ? frameHeader : frameHeader.retain(), promiseAggregator.newPromise());
// Write the frame data.
ByteBuf frameData = data.readSlice(frameDataBytes);
// Only the last frame is not retained. Until then, the outer finally must release.
needToReleaseData = !lastFrame;
ctx.write(lastFrame ? frameData : frameData.retain(), promiseAggregator.newPromise());
// Write the frame padding.
if (paddingBytes(framePaddingBytes) > 0) {
ctx.write(ZERO_BUFFER.slice(0, paddingBytes(framePaddingBytes)),
promiseAggregator.newPromise());
}
} while (!lastFrame);
} catch (Throwable t) {
try {
if (needToReleaseHeaders) {
header.release();
}
if (needToReleaseData) {
data.release();
}
} finally {
promiseAggregator.setFailure(t);
promiseAggregator.doneAllocatingPromises();
}
return promiseAggregator;
}
return promiseAggregator.doneAllocatingPromises();
}
private static void verifyStreamId(int streamId, String argumentName) {
if (streamId <= 0) {
throw new IllegalArgumentException(argumentName + " must be > 0");
}
}
private static int paddingBytes(int padding) {
// The padding parameter contains the 1 byte pad length field as well as the trailing padding bytes.
// Subtract 1, so to only get the number of padding bytes that need to be appended to the end of a frame.
return padding - 1;
}
private static void writePaddingLength(ByteBuf buf, int padding) {
if (padding > 0) {
// It is assumed that the padding length has been bounds checked before this
// Minus 1, as the pad length field is included in the padding parameter and is 1 byte wide.
buf.writeByte(padding - 1);
}
}
/**
* Utility class that manages the creation of frame header buffers for {@code DATA} frames. Attempts
* to reuse the same buffer repeatedly when splitting data into multiple frames.
*/
private static final class DataFrameHeader {
private final int streamId;
private final ByteBuf buffer;
private final Http2Flags flags = new Http2Flags();
private int prevData;
private int prevPadding;
private ByteBuf frameHeader;
DataFrameHeader(ChannelHandlerContext ctx, int streamId) {
// All padding will be put at the end, so in the worst case we need 3 headers:
// a repeated no-padding frame of maxFrameSize, a frame that has part data and part
// padding, and a frame that has the remainder of the padding.
buffer = ctx.alloc().buffer(3 * DATA_FRAME_HEADER_LENGTH);
this.streamId = streamId;
}
/**
* Gets the frame header buffer configured for the current frame.
*/
ByteBuf slice(int data, int padding, boolean endOfStream) {
// Since we're reusing the current frame header whenever possible, check if anything changed
// that requires a new header.
if (data != prevData || padding != prevPadding
|| endOfStream != flags.endOfStream() || frameHeader == null) {
// Update the header state.
prevData = data;
prevPadding = padding;
flags.paddingPresent(padding > 0);
flags.endOfStream(endOfStream);
frameHeader = buffer.slice(buffer.readerIndex(), DATA_FRAME_HEADER_LENGTH).writerIndex(0);
buffer.setIndex(buffer.readerIndex() + DATA_FRAME_HEADER_LENGTH,
buffer.writerIndex() + DATA_FRAME_HEADER_LENGTH);
int payloadLength = data + padding;
writeFrameHeaderInternal(frameHeader, payloadLength, DATA, flags, streamId);
writePaddingLength(frameHeader, padding);
}
return frameHeader.slice();
}
void release() {
buffer.release();
}
}
}
}

View File

@ -1,570 +0,0 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.microbench.http2;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.channel.socket.oio.OioSocketChannel;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder;
import io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder;
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.Http2FrameAdapter;
import io.netty.handler.codec.http2.Http2FrameWriter;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2RemoteFlowController;
import io.netty.microbench.channel.EmbeddedChannelWriteReleaseHandlerContext;
import io.netty.microbench.util.AbstractSharedExecutorMicrobenchmark;
import io.netty.util.AsciiString;
import io.netty.util.concurrent.Future;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_MAX_FRAME_SIZE;
import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_FRAME_SIZE_UPPER_BOUND;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
@State(Scope.Benchmark)
public class Http2FrameWriterBenchmark extends AbstractSharedExecutorMicrobenchmark {
private static final EnvironmentParameters NIO_UNPOOLED_PARAMS =
new NioEnvironmentParametersBase(UnpooledByteBufAllocator.DEFAULT);
private static final EnvironmentParameters NIO_POOLED_PARAMS =
new NioEnvironmentParametersBase(PooledByteBufAllocator.DEFAULT);
private static final EnvironmentParameters EPOLL_UNPOOLED_PARAMS =
new EpollEnvironmentParametersBase(UnpooledByteBufAllocator.DEFAULT);
private static final EnvironmentParameters EPOLL_POOLED_PARAMS =
new EpollEnvironmentParametersBase(PooledByteBufAllocator.DEFAULT);
private static final EnvironmentParameters OIO_UNPOOLED_PARAMS =
new OioEnvironmentParametersBase(UnpooledByteBufAllocator.DEFAULT);
private static final EnvironmentParameters OIO_POOLED_PARAMS =
new OioEnvironmentParametersBase(PooledByteBufAllocator.DEFAULT);
public static enum EnvironmentType {
EMBEDDED_POOLED(NIO_POOLED_PARAMS), EMBEDDED_UNPOOLED(NIO_UNPOOLED_PARAMS),
NIO_POOLED(NIO_POOLED_PARAMS), NIO_UNPOOLED(NIO_UNPOOLED_PARAMS),
EPOLL_POOLED(EPOLL_POOLED_PARAMS), EPOLL_UNPOOLED(EPOLL_UNPOOLED_PARAMS),
OIO_POOLED(OIO_POOLED_PARAMS), OIO_UNPOOLED(OIO_UNPOOLED_PARAMS);
private final EnvironmentParameters params;
private EnvironmentType(EnvironmentParameters params) {
this.params = params;
}
public EnvironmentParameters params() {
return params;
}
}
public static enum DataPayloadType {
SMALL, MEDIUM, LARGE, JUMBO;
}
@Param
public EnvironmentType environmentType;
@Param
public DataPayloadType dataType;
@Param({ "0", "255" })
public int padding;
private Environment environment;
private BenchmarkTestPayload payload;
@Setup(Level.Trial)
public void setup() {
switch (environmentType) {
case EMBEDDED_POOLED:
case EMBEDDED_UNPOOLED:
environment = boostrapEmbeddedEnv(environmentType);
break;
default:
environment = boostrapEnvWithTransport(environmentType);
break;
}
if (environment == null) {
throw new IllegalStateException("Environment type [" + environmentType + "] is not supported.");
}
AbstractSharedExecutorMicrobenchmark.executor(environment.eventLoop());
payload = createPayload(dataType);
}
@TearDown(Level.Trial)
public void teardown() throws Exception {
try {
environment.teardown();
} finally {
payload.release();
}
}
@Benchmark
public void writeData() {
ChannelHandlerContext context = environment.context();
environment.writer().writeData(context, 3, payload.data().retain(), padding, true, context.voidPromise());
context.flush();
}
@Benchmark
public void writeHeaders() {
ChannelHandlerContext context = environment.context();
environment.writer().writeHeaders(context, 3, payload.headers(), padding, true, context.voidPromise());
context.flush();
}
private static Http2Headers createHeaders(int numValues, int nameLength, int valueLength) {
Http2Headers headers = new DefaultHttp2Headers();
Random r = new Random();
for (int i = 0; i < numValues; ++i) {
byte[] tmp = new byte[nameLength];
r.nextBytes(tmp);
AsciiString name = new AsciiString(tmp);
tmp = new byte[valueLength];
r.nextBytes(tmp);
headers.add(name, new AsciiString(tmp));
}
return headers;
}
private static ByteBuf createData(int length) {
byte[] result = new byte[length];
new Random().nextBytes(result);
return Unpooled.wrappedBuffer(result);
}
private static BenchmarkTestPayload createPayload(DataPayloadType type) {
switch (type) {
case SMALL:
return new BenchmarkTestPayload(createData(256), createHeaders(5, 20, 20));
case MEDIUM:
return new BenchmarkTestPayload(createData(DEFAULT_MAX_FRAME_SIZE), createHeaders(20, 40, 40));
case LARGE:
return new BenchmarkTestPayload(createData(MAX_FRAME_SIZE_UPPER_BOUND), createHeaders(100, 100, 100));
case JUMBO:
return new BenchmarkTestPayload(createData(10 * MAX_FRAME_SIZE_UPPER_BOUND), createHeaders(300, 300, 300));
default:
throw new Error();
}
}
private static final class BenchmarkTestPayload {
private final ByteBuf data;
private final Http2Headers headers;
public BenchmarkTestPayload(ByteBuf data, Http2Headers headers) {
this.data = data;
this.headers = headers;
}
public ByteBuf data() {
return data;
}
public Http2Headers headers() {
return headers;
}
public void release() {
data.release();
}
}
private static Environment boostrapEnvWithTransport(final EnvironmentType environmentType) {
final EnvironmentParameters params = environmentType.params();
ServerBootstrap sb = new ServerBootstrap();
Bootstrap cb = new Bootstrap();
final TransportEnvironment environment = new TransportEnvironment(cb, sb);
EventLoopGroup serverEventLoopGroup = params.newEventLoopGroup();
sb.group(serverEventLoopGroup, serverEventLoopGroup);
sb.channel(params.serverChannelClass());
sb.option(ChannelOption.ALLOCATOR, params.serverAllocator());
sb.childOption(ChannelOption.ALLOCATOR, params.serverAllocator());
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
}
});
cb.group(params.newEventLoopGroup());
cb.channel(params.clientChannelClass());
cb.option(ChannelOption.ALLOCATOR, params.clientAllocator());
final CountDownLatch latch = new CountDownLatch(1);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(false);
Http2RemoteFlowController remoteFlowController = params.remoteFlowController();
if (remoteFlowController != null) {
connection.remote().flowController(params.remoteFlowController());
}
Http2LocalFlowController localFlowController = params.localFlowController();
if (localFlowController != null) {
connection.local().flowController(localFlowController);
}
environment.writer(new DefaultHttp2FrameWriter());
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, environment.writer());
Http2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(connection, encoder, new DefaultHttp2FrameReader());
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandlerBuilder()
.encoderEnforceMaxConcurrentStreams(false)
.frameListener(new Http2FrameAdapter())
.codec(decoder, encoder).build();
p.addLast(connectionHandler);
environment.context(p.lastContext());
// Must wait for context to be set.
latch.countDown();
}
});
environment.serverChannel(sb.bind(params.address()));
params.address(environment.serverChannel().localAddress());
environment.clientChannel(cb.connect(params.address()));
try {
if (!latch.await(5, SECONDS)) {
throw new RuntimeException("Channel did not initialize in time");
}
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
return environment;
}
private static Environment boostrapEmbeddedEnv(final EnvironmentType environmentType) {
final ByteBufAllocator alloc = environmentType.params().clientAllocator();
final EmbeddedEnvironment env = new EmbeddedEnvironment(new DefaultHttp2FrameWriter());
final Http2Connection connection = new DefaultHttp2Connection(false);
Http2ConnectionEncoder encoder = new DefaultHttp2ConnectionEncoder(connection, env.writer());
Http2ConnectionDecoder decoder =
new DefaultHttp2ConnectionDecoder(connection, encoder, new DefaultHttp2FrameReader());
Http2ConnectionHandler connectionHandler = new Http2ConnectionHandlerBuilder()
.encoderEnforceMaxConcurrentStreams(false)
.frameListener(new Http2FrameAdapter())
.codec(decoder, encoder).build();
env.context(new EmbeddedChannelWriteReleaseHandlerContext(alloc, connectionHandler) {
@Override
protected void handleException(Throwable t) {
handleUnexpectedException(t);
}
});
return env;
}
private interface Environment {
/**
* Get the event loop that should be shared with JMH to execute the benchmark.
*/
EventLoop eventLoop();
/**
* The context to use during the benchmark.
*/
ChannelHandlerContext context();
/**
* The writer which will be subject to benchmarking.
*/
Http2FrameWriter writer();
/**
* Do any cleanup after environment is no longer needed.
*/
void teardown() throws Exception;
}
private interface EnvironmentParameters {
EventLoopGroup newEventLoopGroup();
Class<? extends ServerChannel> serverChannelClass();
Class<? extends Channel> clientChannelClass();
ByteBufAllocator clientAllocator();
ByteBufAllocator serverAllocator();
SocketAddress address();
void address(SocketAddress address);
Http2RemoteFlowController remoteFlowController();
Http2LocalFlowController localFlowController();
}
private abstract static class EnvironmentParametersBase implements EnvironmentParameters {
private final ByteBufAllocator clientAlloc;
private final ByteBufAllocator serverAlloc;
private final Class<? extends Channel> clientChannelClass;
private final Class<? extends ServerChannel> serverChannelClass;
private final Http2RemoteFlowController remoteFlowController;
private final Http2LocalFlowController localFlowController;
private SocketAddress address;
EnvironmentParametersBase(ByteBufAllocator serverAlloc, ByteBufAllocator clientAlloc,
Class<? extends ServerChannel> serverChannelClass, Class<? extends Channel> clientChannelClass) {
this(serverAlloc, clientAlloc, serverChannelClass, clientChannelClass,
NoopHttp2RemoteFlowController.INSTANCE, NoopHttp2LocalFlowController.INSTANCE);
}
EnvironmentParametersBase(ByteBufAllocator serverAlloc, ByteBufAllocator clientAlloc,
Class<? extends ServerChannel> serverChannelClass, Class<? extends Channel> clientChannelClass,
Http2RemoteFlowController remoteFlowController, Http2LocalFlowController localFlowController) {
this.serverAlloc = checkNotNull(serverAlloc, "serverAlloc");
this.clientAlloc = checkNotNull(clientAlloc, "clientAlloc");
this.clientChannelClass = checkNotNull(clientChannelClass, "clientChannelClass");
this.serverChannelClass = checkNotNull(serverChannelClass, "serverChannelClass");
this.remoteFlowController = remoteFlowController; // OK to be null
this.localFlowController = localFlowController; // OK to be null
}
@Override
public SocketAddress address() {
if (address == null) {
return new InetSocketAddress(0);
}
return address;
}
@Override
public void address(SocketAddress address) {
this.address = address;
}
@Override
public Class<? extends ServerChannel> serverChannelClass() {
return serverChannelClass;
}
@Override
public Class<? extends Channel> clientChannelClass() {
return clientChannelClass;
}
@Override
public ByteBufAllocator clientAllocator() {
return clientAlloc;
}
@Override
public ByteBufAllocator serverAllocator() {
return serverAlloc;
}
@Override
public Http2RemoteFlowController remoteFlowController() {
return remoteFlowController;
}
@Override
public Http2LocalFlowController localFlowController() {
return localFlowController;
}
};
private static class NioEnvironmentParametersBase extends EnvironmentParametersBase {
NioEnvironmentParametersBase(ByteBufAllocator clientAlloc) {
super(UnpooledByteBufAllocator.DEFAULT, clientAlloc, NioServerSocketChannel.class, NioSocketChannel.class);
}
@Override
public EventLoopGroup newEventLoopGroup() {
return new NioEventLoopGroup(1);
}
}
private static class EpollEnvironmentParametersBase extends EnvironmentParametersBase {
EpollEnvironmentParametersBase(ByteBufAllocator clientAlloc) {
super(UnpooledByteBufAllocator.DEFAULT, clientAlloc,
EpollServerSocketChannel.class, EpollSocketChannel.class);
}
@Override
public EventLoopGroup newEventLoopGroup() {
return new EpollEventLoopGroup(1);
}
}
private static class OioEnvironmentParametersBase extends EnvironmentParametersBase {
OioEnvironmentParametersBase(ByteBufAllocator clientAlloc) {
super(UnpooledByteBufAllocator.DEFAULT, clientAlloc, OioServerSocketChannel.class, OioSocketChannel.class);
}
@Override
public EventLoopGroup newEventLoopGroup() {
return new OioEventLoopGroup(1);
}
}
private static final class TransportEnvironment implements Environment {
private final ServerBootstrap sb;
private final Bootstrap cb;
private Channel serverChannel;
private Channel clientChannel;
private ChannelHandlerContext clientContext;
private Http2FrameWriter clientWriter;
public TransportEnvironment(Bootstrap cb, ServerBootstrap sb) {
this.sb = checkNotNull(sb, "sb");
this.cb = checkNotNull(cb, "cb");
}
@Override
public EventLoop eventLoop() {
// It is assumed the channel is registered to the event loop by the time this is called
return clientChannel.eventLoop();
}
public Channel serverChannel() {
return serverChannel;
}
public void serverChannel(ChannelFuture bindFuture) {
// No need to sync or wait by default...local channel immediate executor
serverChannel = checkNotNull(bindFuture, "bindFuture").channel();
}
public void clientChannel(ChannelFuture connectFuture) {
// No need to sync or wait by default...local channel immediate executor
clientChannel = checkNotNull(connectFuture, "connectFuture").channel();
}
public void context(ChannelHandlerContext context) {
clientContext = checkNotNull(context, "context");
}
@Override
public ChannelHandlerContext context() {
return clientContext;
}
@Override
public void teardown() throws InterruptedException {
if (clientChannel != null) {
clientChannel.close();
}
if (serverChannel != null) {
serverChannel.close();
}
Future<?> serverGroup = null;
Future<?> serverChildGroup = null;
Future<?> clientGroup = null;
if (sb != null) {
serverGroup = sb.group().shutdownGracefully(0, 0, MILLISECONDS);
serverChildGroup = sb.childGroup().shutdownGracefully(0, 0, MILLISECONDS);
}
if (cb != null) {
clientGroup = cb.group().shutdownGracefully(0, 0, MILLISECONDS);
}
if (sb != null) {
serverGroup.sync();
serverChildGroup.sync();
}
if (cb != null) {
clientGroup.sync();
}
}
public void writer(Http2FrameWriter writer) {
clientWriter = checkNotNull(writer, "writer");
}
@Override
public Http2FrameWriter writer() {
return clientWriter;
}
}
private static final class EmbeddedEnvironment implements Environment {
private final Http2FrameWriter writer;
private ChannelHandlerContext context;
private EventLoop eventLoop;
public EmbeddedEnvironment(Http2FrameWriter writer) {
this.writer = checkNotNull(writer, "writer");
}
@Override
public EventLoop eventLoop() {
return eventLoop;
}
public void context(EmbeddedChannelWriteReleaseHandlerContext context) {
this.context = checkNotNull(context, "context");
Channel channel = checkNotNull(context.channel(), "context.channel()");
this.eventLoop = checkNotNull(channel.eventLoop(), "channel.eventLoop()");
}
@Override
public ChannelHandlerContext context() {
return context;
}
@Override
public Http2FrameWriter writer() {
return writer;
}
@Override
public void teardown() throws Exception {
}
}
}