Adding UniformStreamByteDistributor
Motivation: The current priority algorithm can yield poor per-stream goodput when either the number of streams is high or the connection window is small. When all priorities are the same (i.e. priority is disabled), we should be able to do better. Modifications: Added a new UniformStreamByteDistributor that ignores priority entirely and manages a queue of streams. Each stream is allocated a minimum of 1KiB on each iteration. Result: Improved goodput when priority is not used.
This commit is contained in:
parent
b640de2d94
commit
2a2059d976
@ -0,0 +1,222 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
|
||||
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
|
||||
import static io.netty.util.internal.ObjectUtil.checkNotNull;
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Deque;
|
||||
|
||||
/**
|
||||
* A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
|
||||
* streams. This class uses a minimum chunk size that will be allocated to each stream. While
|
||||
* fewer streams may be written to in each call to {@link #distribute(int, Writer)}, doing this
|
||||
* should improve the goodput on each written stream.
|
||||
*/
|
||||
public final class UniformStreamByteDistributor implements StreamByteDistributor {
|
||||
static final int DEFAULT_MIN_ALLOCATION_CHUNK = 1024;
|
||||
|
||||
private final Http2Connection.PropertyKey stateKey;
|
||||
private final Deque<State> queue = new ArrayDeque<State>(4);
|
||||
private Deque<State> emptyFrameQueue;
|
||||
|
||||
/**
|
||||
* The minimum number of bytes that we will attempt to allocate to a stream. This is to
|
||||
* help improve goodput on a per-stream basis.
|
||||
*/
|
||||
private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
|
||||
private long totalStreamableBytes;
|
||||
|
||||
public UniformStreamByteDistributor(Http2Connection connection) {
|
||||
checkNotNull(connection, "connection");
|
||||
|
||||
// Add a state for the connection.
|
||||
stateKey = connection.newKey();
|
||||
Http2Stream connectionStream = connection.connectionStream();
|
||||
connectionStream.setProperty(stateKey, new State(connectionStream));
|
||||
|
||||
// Register for notification of new streams.
|
||||
connection.addListener(new Http2ConnectionAdapter() {
|
||||
@Override
|
||||
public void onStreamAdded(Http2Stream stream) {
|
||||
stream.setProperty(stateKey, new State(stream));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onStreamClosed(Http2Stream stream) {
|
||||
state(stream).close();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the minimum allocation chunk that will be allocated to each stream. Defaults to 1KiB.
|
||||
*
|
||||
* @param minAllocationChunk the minimum number of bytes that will be allocated to each stream.
|
||||
* Must be > 0.
|
||||
*/
|
||||
public void minAllocationChunk(int minAllocationChunk) {
|
||||
if (minAllocationChunk <= 0) {
|
||||
throw new IllegalArgumentException("minAllocationChunk must be > 0");
|
||||
}
|
||||
this.minAllocationChunk = minAllocationChunk;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateStreamableBytes(StreamState streamState) {
|
||||
State state = state(streamState.stream());
|
||||
state.updateStreamableBytes(streamState.streamableBytes(), streamState.hasFrame());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
|
||||
checkNotNull(writer, "writer");
|
||||
|
||||
// First, write out any empty frames.
|
||||
if (emptyFrameQueue != null) {
|
||||
while (!emptyFrameQueue.isEmpty()) {
|
||||
State state = emptyFrameQueue.remove();
|
||||
state.enqueued = false;
|
||||
if (state.streamableBytes > 0) {
|
||||
// Bytes have been added since it was queued. Add it to the regular queue.
|
||||
state.addToQueue();
|
||||
} else {
|
||||
// Still an empty frame, just write it.
|
||||
state.write(0, writer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final int size = queue.size();
|
||||
if (size == 0 || maxBytes <= 0) {
|
||||
return totalStreamableBytes > 0;
|
||||
}
|
||||
|
||||
final int chunkSize = max(minAllocationChunk, maxBytes / size);
|
||||
|
||||
// Write until the queue is empty or we've exhausted maxBytes. We need to check queue.isEmpty()
|
||||
// here since the Writer could call updateStreamableBytes, which may alter the queue.
|
||||
do {
|
||||
// Remove the head of the queue.
|
||||
State state = queue.remove();
|
||||
state.enqueued = false;
|
||||
|
||||
// Allocate as much data as we can for this stream.
|
||||
int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
|
||||
maxBytes -= chunk;
|
||||
|
||||
// Write the allocated bytes and enqueue as necessary.
|
||||
state.write(chunk, writer);
|
||||
} while (maxBytes > 0 && !queue.isEmpty());
|
||||
|
||||
return totalStreamableBytes > 0;
|
||||
}
|
||||
|
||||
private State state(Http2Stream stream) {
|
||||
return checkNotNull(stream, "stream").getProperty(stateKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* The remote flow control state for a single stream.
|
||||
*/
|
||||
private final class State {
|
||||
final Http2Stream stream;
|
||||
int streamableBytes;
|
||||
boolean enqueued;
|
||||
boolean previouslyOnMainQueue;
|
||||
|
||||
State(Http2Stream stream) {
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) {
|
||||
assert hasFrame || newStreamableBytes == 0;
|
||||
|
||||
int delta = newStreamableBytes - streamableBytes;
|
||||
if (delta != 0) {
|
||||
streamableBytes = newStreamableBytes;
|
||||
totalStreamableBytes += delta;
|
||||
}
|
||||
if (hasFrame) {
|
||||
// It's not in the queue but has data to send, add it.
|
||||
addToQueue();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write any allocated bytes for the given stream and updates the streamable bytes,
|
||||
* assuming all of the bytes will be written.
|
||||
*/
|
||||
void write(int numBytes, Writer writer) throws Http2Exception {
|
||||
// Update the streamable bytes, assuming that all the bytes will be written.
|
||||
int newStreamableBytes = streamableBytes - numBytes;
|
||||
updateStreamableBytes(newStreamableBytes, newStreamableBytes > 0);
|
||||
|
||||
try {
|
||||
// Write the allocated bytes.
|
||||
writer.write(stream, numBytes);
|
||||
} catch (Throwable t) {
|
||||
throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
|
||||
}
|
||||
}
|
||||
|
||||
void addToQueue() {
|
||||
if (!enqueued) {
|
||||
enqueued = true;
|
||||
if (streamableBytes == 0) {
|
||||
// Add empty frames to the empty frame queue.
|
||||
getOrCreateEmptyFrameQueue().addLast(this);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!previouslyOnMainQueue) {
|
||||
// Add to the head the first time it's on the main queue.
|
||||
previouslyOnMainQueue = true;
|
||||
queue.addFirst(this);
|
||||
} else {
|
||||
queue.addLast(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void removeFromQueue() {
|
||||
if (enqueued) {
|
||||
enqueued = false;
|
||||
if (emptyFrameQueue != null && !emptyFrameQueue.remove(this)) {
|
||||
queue.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void close() {
|
||||
// Remove this state from the queue.
|
||||
removeFromQueue();
|
||||
|
||||
// Clear the streamable bytes.
|
||||
updateStreamableBytes(0, false);
|
||||
}
|
||||
|
||||
private Deque<State> getOrCreateEmptyFrameQueue() {
|
||||
if (emptyFrameQueue == null) {
|
||||
emptyFrameQueue = new ArrayDeque<State>(2);
|
||||
}
|
||||
return emptyFrameQueue;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,235 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.handler.codec.http2;
|
||||
|
||||
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
|
||||
import static io.netty.handler.codec.http2.UniformStreamByteDistributor.DEFAULT_MIN_ALLOCATION_CHUNK;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.same;
|
||||
import static org.mockito.Mockito.atMost;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.reset;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
import org.mockito.verification.VerificationMode;
|
||||
|
||||
/**
|
||||
* Tests for {@link UniformStreamByteDistributor}.
|
||||
*/
|
||||
public class UniformStreamByteDistributorTest {
|
||||
private static int CHUNK_SIZE = DEFAULT_MIN_ALLOCATION_CHUNK;
|
||||
|
||||
private static final int STREAM_A = 1;
|
||||
private static final int STREAM_B = 3;
|
||||
private static final int STREAM_C = 5;
|
||||
private static final int STREAM_D = 7;
|
||||
|
||||
private Http2Connection connection;
|
||||
private UniformStreamByteDistributor distributor;
|
||||
|
||||
@Mock
|
||||
private StreamByteDistributor.Writer writer;
|
||||
|
||||
@Before
|
||||
public void setup() throws Http2Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
distributor = new UniformStreamByteDistributor(connection);
|
||||
|
||||
connection.local().createStream(STREAM_A, false);
|
||||
connection.local().createStream(STREAM_B, false);
|
||||
Http2Stream streamC = connection.local().createStream(STREAM_C, false);
|
||||
Http2Stream streamD = connection.local().createStream(STREAM_D, false);
|
||||
streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
|
||||
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bytesUnassignedAfterProcessing() throws Http2Exception {
|
||||
updateStream(STREAM_A, 1, true);
|
||||
updateStream(STREAM_B, 2, true);
|
||||
updateStream(STREAM_C, 3, true);
|
||||
updateStream(STREAM_D, 4, true);
|
||||
|
||||
assertFalse(write(10));
|
||||
verifyWrite(STREAM_A, 1);
|
||||
verifyWrite(STREAM_B, 2);
|
||||
verifyWrite(STREAM_C, 3);
|
||||
verifyWrite(STREAM_D, 4);
|
||||
verifyNoMoreInteractions(writer);
|
||||
|
||||
assertFalse(write(10));
|
||||
verifyNoMoreInteractions(writer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void connectionErrorForWriterException() throws Http2Exception {
|
||||
updateStream(STREAM_A, 1, true);
|
||||
updateStream(STREAM_B, 2, true);
|
||||
updateStream(STREAM_C, 3, true);
|
||||
updateStream(STREAM_D, 4, true);
|
||||
|
||||
Exception fakeException = new RuntimeException("Fake exception");
|
||||
doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3));
|
||||
|
||||
try {
|
||||
write(10);
|
||||
fail("Expected an exception");
|
||||
} catch (Http2Exception e) {
|
||||
assertFalse(Http2Exception.isStreamError(e));
|
||||
assertEquals(Http2Error.INTERNAL_ERROR, e.error());
|
||||
assertSame(fakeException, e.getCause());
|
||||
}
|
||||
|
||||
verifyWrite(atMost(1), STREAM_A, 1);
|
||||
verifyWrite(atMost(1), STREAM_B, 2);
|
||||
verifyWrite(STREAM_C, 3);
|
||||
verifyWrite(atMost(1), STREAM_D, 4);
|
||||
|
||||
doNothing().when(writer).write(same(stream(STREAM_C)), eq(3));
|
||||
write(10);
|
||||
verifyWrite(STREAM_A, 1);
|
||||
verifyWrite(STREAM_B, 2);
|
||||
verifyWrite(STREAM_C, 3);
|
||||
verifyWrite(STREAM_D, 4);
|
||||
}
|
||||
|
||||
/**
|
||||
* In this test, we verify that each stream is allocated a minimum chunk size. When bytes
|
||||
* run out, the remaining streams will be next in line for the next iteration.
|
||||
*/
|
||||
@Test
|
||||
public void minChunkShouldBeAllocatedPerStream() throws Http2Exception {
|
||||
// Re-assign weights.
|
||||
setPriority(STREAM_A, 0, (short) 50, false);
|
||||
setPriority(STREAM_B, 0, (short) 200, false);
|
||||
setPriority(STREAM_C, STREAM_A, (short) 100, false);
|
||||
setPriority(STREAM_D, STREAM_A, (short) 100, false);
|
||||
|
||||
// Update in reverse order. This will yield a queue in the order A, B, C, D. This
|
||||
// is due to the fact that when a stream is enqueued the first time, it is added to the
|
||||
// head of the queue.
|
||||
updateStream(STREAM_D, CHUNK_SIZE, true);
|
||||
updateStream(STREAM_C, CHUNK_SIZE, true);
|
||||
updateStream(STREAM_B, CHUNK_SIZE, true);
|
||||
updateStream(STREAM_A, CHUNK_SIZE, true);
|
||||
|
||||
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
|
||||
int written = 3 * CHUNK_SIZE;
|
||||
assertTrue(write(written));
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_A));
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_B));
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_C));
|
||||
verifyNoMoreInteractions(writer);
|
||||
|
||||
reset(writer);
|
||||
|
||||
// Now write again and verify that the last stream is written to.
|
||||
assertFalse(write(CHUNK_SIZE));
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_D));
|
||||
verifyNoMoreInteractions(writer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void streamWithMoreDataShouldBeEnqueuedAfterWrite() throws Http2Exception {
|
||||
// Give the stream a bunch of data.
|
||||
updateStream(STREAM_A, 2 * CHUNK_SIZE, true);
|
||||
|
||||
// Write only part of the data.
|
||||
assertTrue(write(CHUNK_SIZE));
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_A));
|
||||
verifyNoMoreInteractions(writer);
|
||||
|
||||
reset(writer);
|
||||
|
||||
// Now write the rest of the data.
|
||||
assertFalse(write(CHUNK_SIZE));
|
||||
assertEquals(CHUNK_SIZE, captureWrite(STREAM_A));
|
||||
verifyNoMoreInteractions(writer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void emptyFrameIsWritten() throws Http2Exception {
|
||||
updateStream(STREAM_B, 0, true);
|
||||
updateStream(STREAM_A, 10, true);
|
||||
|
||||
assertFalse(write(10));
|
||||
verifyWrite(STREAM_A, 10);
|
||||
verifyWrite(STREAM_B, 0);
|
||||
verifyNoMoreInteractions(writer);
|
||||
|
||||
write(10);
|
||||
verifyNoMoreInteractions(writer);
|
||||
}
|
||||
|
||||
private Http2Stream stream(int streamId) {
|
||||
return connection.stream(streamId);
|
||||
}
|
||||
|
||||
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
|
||||
final Http2Stream stream = stream(streamId);
|
||||
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
|
||||
@Override
|
||||
public Http2Stream stream() {
|
||||
return stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int streamableBytes() {
|
||||
return streamableBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasFrame() {
|
||||
return hasFrame;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception {
|
||||
stream(streamId).setPriority(parent, (short) weight, exclusive);
|
||||
}
|
||||
|
||||
private boolean write(int numBytes) throws Http2Exception {
|
||||
return distributor.distribute(numBytes, writer);
|
||||
}
|
||||
|
||||
private void verifyWrite(int streamId, int numBytes) {
|
||||
verify(writer).write(same(stream(streamId)), eq(numBytes));
|
||||
}
|
||||
|
||||
private void verifyWrite(VerificationMode mode, int streamId, int numBytes) {
|
||||
verify(writer, mode).write(same(stream(streamId)), eq(numBytes));
|
||||
}
|
||||
|
||||
private int captureWrite(int streamId) {
|
||||
ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
|
||||
verify(writer).write(same(stream(streamId)), captor.capture());
|
||||
return captor.getValue();
|
||||
}
|
||||
}
|
@ -0,0 +1,583 @@
|
||||
/*
|
||||
* Copyright 2015 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package io.netty.microbench.http2;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.AbstractChannel;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelHandlerInvoker;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelProgressivePromise;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2Connection;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
|
||||
import io.netty.handler.codec.http2.Http2Connection;
|
||||
import io.netty.handler.codec.http2.Http2Exception;
|
||||
import io.netty.handler.codec.http2.Http2RemoteFlowController;
|
||||
import io.netty.handler.codec.http2.Http2Stream;
|
||||
import io.netty.handler.codec.http2.Http2StreamVisitor;
|
||||
import io.netty.handler.codec.http2.PriorityStreamByteDistributor;
|
||||
import io.netty.handler.codec.http2.StreamByteDistributor;
|
||||
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
|
||||
import io.netty.microbench.util.AbstractMicrobenchmark;
|
||||
import io.netty.util.Attribute;
|
||||
import io.netty.util.AttributeKey;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import org.openjdk.jmh.annotations.AuxCounters;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Level;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
/**
|
||||
* Benchmark to compare stream byte distribution algorithms when priorities are identical for
|
||||
* all streams.
|
||||
*/
|
||||
@Fork(1)
|
||||
@State(Scope.Benchmark)
|
||||
public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark {
|
||||
public enum Algorithm {
|
||||
PRIORITY,
|
||||
UNIFORM
|
||||
}
|
||||
|
||||
@Param({ "100", "10000" })
|
||||
private int numStreams;
|
||||
|
||||
@Param({ "1024", "65536", "1048576" })
|
||||
private int windowSize;
|
||||
|
||||
@Param
|
||||
private Algorithm algorithm;
|
||||
|
||||
private Http2Connection connection;
|
||||
|
||||
private Http2Connection.PropertyKey dataRefresherKey;
|
||||
|
||||
private Http2RemoteFlowController controller;
|
||||
|
||||
private StreamByteDistributor distributor;
|
||||
|
||||
private AdditionalCounters counters;
|
||||
|
||||
/**
|
||||
* Additional counters for a single iteration.
|
||||
*/
|
||||
@AuxCounters
|
||||
@State(Scope.Thread)
|
||||
public static class AdditionalCounters {
|
||||
private int minWriteSize;
|
||||
private int maxWriteSize;
|
||||
private long totalBytes;
|
||||
private long numWrites;
|
||||
private int invocations;
|
||||
|
||||
public int minWriteSize() {
|
||||
return minWriteSize;
|
||||
}
|
||||
|
||||
public int avgWriteSize() {
|
||||
return (int) (totalBytes / numWrites);
|
||||
}
|
||||
|
||||
public int maxWriteSize() {
|
||||
return maxWriteSize;
|
||||
}
|
||||
}
|
||||
|
||||
private Http2StreamVisitor invocationVisitor = new Http2StreamVisitor() {
|
||||
@Override
|
||||
public boolean visit(Http2Stream stream) throws Http2Exception {
|
||||
// Restore the connection window.
|
||||
resetWindow(stream);
|
||||
|
||||
// Restore the data to each stream.
|
||||
dataRefresher(stream).refreshData();
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
@Setup(Level.Trial)
|
||||
public void setupTrial() throws Http2Exception {
|
||||
connection = new DefaultHttp2Connection(false);
|
||||
dataRefresherKey = connection.newKey();
|
||||
|
||||
// Create the flow controller
|
||||
switch (algorithm) {
|
||||
case PRIORITY:
|
||||
distributor = new PriorityStreamByteDistributor(connection);
|
||||
break;
|
||||
case UNIFORM:
|
||||
distributor = new UniformStreamByteDistributor(connection);
|
||||
break;
|
||||
}
|
||||
controller = new DefaultHttp2RemoteFlowController(connection, new ByteCounter(distributor));
|
||||
controller.channelHandlerContext(new TestContext());
|
||||
|
||||
// Create the streams, each initialized with MAX_INT bytes.
|
||||
for (int i = 0; i < numStreams; ++i) {
|
||||
Http2Stream stream = connection.local().createStream(toStreamId(i), false);
|
||||
addData(stream, Integer.MAX_VALUE);
|
||||
stream.setProperty(dataRefresherKey, new DataRefresher(stream));
|
||||
}
|
||||
}
|
||||
|
||||
@Setup(Level.Invocation)
|
||||
public void setupInvocation() throws Http2Exception {
|
||||
resetWindow(connection.connectionStream());
|
||||
connection.forEachActiveStream(invocationVisitor);
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void write(AdditionalCounters counters) throws Http2Exception {
|
||||
// Set up for this invocation. Doing this in the benchmark method since this
|
||||
// seems to throw off the counters when run as a setup step for the invocation.
|
||||
this.counters = counters;
|
||||
counters.invocations++;
|
||||
|
||||
// Now run the benchmark method.
|
||||
controller.writePendingBytes();
|
||||
}
|
||||
|
||||
private void resetWindow(Http2Stream stream) throws Http2Exception {
|
||||
controller.incrementWindowSize(stream, windowSize - controller.windowSize(stream));
|
||||
}
|
||||
|
||||
private DataRefresher dataRefresher(Http2Stream stream) {
|
||||
return (DataRefresher) stream.getProperty(dataRefresherKey);
|
||||
}
|
||||
|
||||
private void addData(Http2Stream stream, final int dataSize) {
|
||||
controller.addFlowControlled(stream, new Http2RemoteFlowController.FlowControlled() {
|
||||
int size = dataSize;
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(ChannelHandlerContext ctx, Throwable cause) {
|
||||
cause.printStackTrace();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeComplete() {
|
||||
// Don't care.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, int allowedBytes) {
|
||||
size -= allowedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean merge(ChannelHandlerContext ctx,
|
||||
Http2RemoteFlowController.FlowControlled next) {
|
||||
int nextSize = next.size();
|
||||
if (Integer.MAX_VALUE - nextSize < size) {
|
||||
// Disallow merge to avoid integer overflow.
|
||||
return false;
|
||||
}
|
||||
|
||||
// Merge.
|
||||
size += nextSize;
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static int toStreamId(int i) {
|
||||
return 2 * i + 1;
|
||||
}
|
||||
|
||||
private final class DataRefresher {
|
||||
private final Http2Stream stream;
|
||||
private int data;
|
||||
|
||||
private DataRefresher(Http2Stream stream) {
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
void add(int data) {
|
||||
this.data += data;
|
||||
}
|
||||
|
||||
void refreshData() {
|
||||
if (data > 0) {
|
||||
addData(stream, data);
|
||||
data = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final class ByteCounter implements StreamByteDistributor {
|
||||
private final StreamByteDistributor delegate;
|
||||
|
||||
private ByteCounter(StreamByteDistributor delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateStreamableBytes(StreamState state) {
|
||||
delegate.updateStreamableBytes(state);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean distribute(int maxBytes, Writer writer)
|
||||
throws Http2Exception {
|
||||
return delegate.distribute(maxBytes, new CountingWriter(writer));
|
||||
}
|
||||
|
||||
private final class CountingWriter implements Writer {
|
||||
private final Writer delegate;
|
||||
|
||||
private CountingWriter(Writer delegate) {
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Http2Stream stream, int numBytes) {
|
||||
if (numBytes > 0) {
|
||||
// Add the data to the refresher so that it can be given back to the
|
||||
// stream at the end of the iteration.
|
||||
DataRefresher refresher = dataRefresher(stream);
|
||||
refresher.add(numBytes);
|
||||
|
||||
counters.numWrites++;
|
||||
counters.totalBytes += numBytes;
|
||||
if (counters.minWriteSize == 0 || numBytes < counters.minWriteSize) {
|
||||
counters.minWriteSize = numBytes;
|
||||
}
|
||||
if (numBytes > counters.maxWriteSize) {
|
||||
counters.maxWriteSize = numBytes;
|
||||
}
|
||||
}
|
||||
|
||||
delegate.write(stream, numBytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestContext implements ChannelHandlerContext {
|
||||
|
||||
private Channel channel = new TestChannel();
|
||||
|
||||
@Override
|
||||
public Channel channel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventExecutor executor() {
|
||||
return channel.eventLoop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerInvoker invoker() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandler handler() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRemoved() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelRegistered() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelUnregistered() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelActive() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelInactive() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireUserEventTriggered(Object event) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelRead(Object msg) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelReadComplete() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext fireChannelWritabilityChanged() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture bind(SocketAddress localAddress) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture disconnect() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture close() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture deregister() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
|
||||
ChannelPromise promise) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture disconnect(ChannelPromise promise) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture close(ChannelPromise promise) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture deregister(ChannelPromise promise) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext read() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object msg) {
|
||||
return channel.newSucceededFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext flush() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture writeAndFlush(Object msg) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline pipeline() {
|
||||
return channel.pipeline();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBufAllocator alloc() {
|
||||
return channel.alloc();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise newPromise() {
|
||||
return channel.newPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelProgressivePromise newProgressivePromise() {
|
||||
return channel.newProgressivePromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newSucceededFuture() {
|
||||
return channel.newSucceededFuture();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture newFailedFuture(Throwable cause) {
|
||||
return channel.newFailedFuture(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPromise voidPromise() {
|
||||
return channel.voidPromise();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Attribute<T> attr(AttributeKey<T> key) {
|
||||
return channel.attr(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> boolean hasAttr(AttributeKey<T> key) {
|
||||
return channel.hasAttr(key);
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestChannel extends AbstractChannel {
|
||||
private DefaultChannelConfig config = new DefaultChannelConfig(this);
|
||||
|
||||
private class TestUnsafe extends AbstractUnsafe {
|
||||
@Override
|
||||
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
}
|
||||
}
|
||||
|
||||
public TestChannel() {
|
||||
super(null);
|
||||
config.setWriteBufferHighWaterMark(Integer.MAX_VALUE);
|
||||
config.setWriteBufferLowWaterMark(Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long bytesBeforeUnwritable() {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isWritable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfig config() {
|
||||
return new DefaultChannelConfig(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelMetadata metadata() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractUnsafe newUnsafe() {
|
||||
return new TestUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isCompatible(EventLoop loop) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress localAddress0() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected SocketAddress remoteAddress0() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBind(SocketAddress localAddress) throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doDisconnect() throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doBeginRead() throws Exception { }
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception { }
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user