Adding UniformStreamByteDistributor

Motivation:

The current priority algorithm can yield poor per-stream goodput when either the number of streams is high or the connection window is small. When all priorities are the same (i.e. priority is disabled), we should be able to do better.

Modifications:

Added a new UniformStreamByteDistributor that ignores priority entirely and manages a queue of streams.  Each stream is allocated a minimum of 1KiB on each iteration.

Result:

Improved goodput when priority is not used.
This commit is contained in:
nmittler 2015-11-03 15:05:32 -08:00
parent e7bfec3b3b
commit ebfe3d2ba7
3 changed files with 1040 additions and 0 deletions

View File

@ -0,0 +1,222 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.util.ArrayDeque;
import java.util.Deque;
/**
* A {@link StreamByteDistributor} that ignores stream priority and uniformly allocates bytes to all
* streams. This class uses a minimum chunk size that will be allocated to each stream. While
* fewer streams may be written to in each call to {@link #distribute(int, Writer)}, doing this
* should improve the goodput on each written stream.
*/
public final class UniformStreamByteDistributor implements StreamByteDistributor {
static final int DEFAULT_MIN_ALLOCATION_CHUNK = 1024;
private final Http2Connection.PropertyKey stateKey;
private final Deque<State> queue = new ArrayDeque<State>(4);
private Deque<State> emptyFrameQueue;
/**
* The minimum number of bytes that we will attempt to allocate to a stream. This is to
* help improve goodput on a per-stream basis.
*/
private int minAllocationChunk = DEFAULT_MIN_ALLOCATION_CHUNK;
private long totalStreamableBytes;
public UniformStreamByteDistributor(Http2Connection connection) {
checkNotNull(connection, "connection");
// Add a state for the connection.
stateKey = connection.newKey();
Http2Stream connectionStream = connection.connectionStream();
connectionStream.setProperty(stateKey, new State(connectionStream));
// Register for notification of new streams.
connection.addListener(new Http2ConnectionAdapter() {
@Override
public void onStreamAdded(Http2Stream stream) {
stream.setProperty(stateKey, new State(stream));
}
@Override
public void onStreamClosed(Http2Stream stream) {
state(stream).close();
}
});
}
/**
* Sets the minimum allocation chunk that will be allocated to each stream. Defaults to 1KiB.
*
* @param minAllocationChunk the minimum number of bytes that will be allocated to each stream.
* Must be > 0.
*/
public void minAllocationChunk(int minAllocationChunk) {
if (minAllocationChunk <= 0) {
throw new IllegalArgumentException("minAllocationChunk must be > 0");
}
this.minAllocationChunk = minAllocationChunk;
}
@Override
public void updateStreamableBytes(StreamState streamState) {
State state = state(streamState.stream());
state.updateStreamableBytes(streamState.streamableBytes(), streamState.hasFrame());
}
@Override
public boolean distribute(int maxBytes, Writer writer) throws Http2Exception {
checkNotNull(writer, "writer");
// First, write out any empty frames.
if (emptyFrameQueue != null) {
while (!emptyFrameQueue.isEmpty()) {
State state = emptyFrameQueue.remove();
state.enqueued = false;
if (state.streamableBytes > 0) {
// Bytes have been added since it was queued. Add it to the regular queue.
state.addToQueue();
} else {
// Still an empty frame, just write it.
state.write(0, writer);
}
}
}
final int size = queue.size();
if (size == 0 || maxBytes <= 0) {
return totalStreamableBytes > 0;
}
final int chunkSize = max(minAllocationChunk, maxBytes / size);
// Write until the queue is empty or we've exhausted maxBytes. We need to check queue.isEmpty()
// here since the Writer could call updateStreamableBytes, which may alter the queue.
do {
// Remove the head of the queue.
State state = queue.remove();
state.enqueued = false;
// Allocate as much data as we can for this stream.
int chunk = min(chunkSize, min(maxBytes, state.streamableBytes));
maxBytes -= chunk;
// Write the allocated bytes and enqueue as necessary.
state.write(chunk, writer);
} while (maxBytes > 0 && !queue.isEmpty());
return totalStreamableBytes > 0;
}
private State state(Http2Stream stream) {
return checkNotNull(stream, "stream").getProperty(stateKey);
}
/**
* The remote flow control state for a single stream.
*/
private final class State {
final Http2Stream stream;
int streamableBytes;
boolean enqueued;
boolean previouslyOnMainQueue;
State(Http2Stream stream) {
this.stream = stream;
}
void updateStreamableBytes(int newStreamableBytes, boolean hasFrame) {
assert hasFrame || newStreamableBytes == 0;
int delta = newStreamableBytes - streamableBytes;
if (delta != 0) {
streamableBytes = newStreamableBytes;
totalStreamableBytes += delta;
}
if (hasFrame) {
// It's not in the queue but has data to send, add it.
addToQueue();
}
}
/**
* Write any allocated bytes for the given stream and updates the streamable bytes,
* assuming all of the bytes will be written.
*/
void write(int numBytes, Writer writer) throws Http2Exception {
// Update the streamable bytes, assuming that all the bytes will be written.
int newStreamableBytes = streamableBytes - numBytes;
updateStreamableBytes(newStreamableBytes, newStreamableBytes > 0);
try {
// Write the allocated bytes.
writer.write(stream, numBytes);
} catch (Throwable t) {
throw connectionError(INTERNAL_ERROR, t, "byte distribution write error");
}
}
void addToQueue() {
if (!enqueued) {
enqueued = true;
if (streamableBytes == 0) {
// Add empty frames to the empty frame queue.
getOrCreateEmptyFrameQueue().addLast(this);
return;
}
if (!previouslyOnMainQueue) {
// Add to the head the first time it's on the main queue.
previouslyOnMainQueue = true;
queue.addFirst(this);
} else {
queue.addLast(this);
}
}
}
void removeFromQueue() {
if (enqueued) {
enqueued = false;
if (emptyFrameQueue != null && !emptyFrameQueue.remove(this)) {
queue.remove(this);
}
}
}
void close() {
// Remove this state from the queue.
removeFromQueue();
// Clear the streamable bytes.
updateStreamableBytes(0, false);
}
private Deque<State> getOrCreateEmptyFrameQueue() {
if (emptyFrameQueue == null) {
emptyFrameQueue = new ArrayDeque<State>(2);
}
return emptyFrameQueue;
}
}
}

View File

@ -0,0 +1,235 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.handler.codec.http2;
import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT;
import static io.netty.handler.codec.http2.UniformStreamByteDistributor.DEFAULT_MIN_ALLOCATION_CHUNK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
/**
* Tests for {@link UniformStreamByteDistributor}.
*/
public class UniformStreamByteDistributorTest {
private static int CHUNK_SIZE = DEFAULT_MIN_ALLOCATION_CHUNK;
private static final int STREAM_A = 1;
private static final int STREAM_B = 3;
private static final int STREAM_C = 5;
private static final int STREAM_D = 7;
private Http2Connection connection;
private UniformStreamByteDistributor distributor;
@Mock
private StreamByteDistributor.Writer writer;
@Before
public void setup() throws Http2Exception {
MockitoAnnotations.initMocks(this);
connection = new DefaultHttp2Connection(false);
distributor = new UniformStreamByteDistributor(connection);
connection.local().createStream(STREAM_A, false);
connection.local().createStream(STREAM_B, false);
Http2Stream streamC = connection.local().createStream(STREAM_C, false);
Http2Stream streamD = connection.local().createStream(STREAM_D, false);
streamC.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
streamD.setPriority(STREAM_A, DEFAULT_PRIORITY_WEIGHT, false);
}
@Test
public void bytesUnassignedAfterProcessing() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 4);
verifyNoMoreInteractions(writer);
assertFalse(write(10));
verifyNoMoreInteractions(writer);
}
@Test
public void connectionErrorForWriterException() throws Http2Exception {
updateStream(STREAM_A, 1, true);
updateStream(STREAM_B, 2, true);
updateStream(STREAM_C, 3, true);
updateStream(STREAM_D, 4, true);
Exception fakeException = new RuntimeException("Fake exception");
doThrow(fakeException).when(writer).write(same(stream(STREAM_C)), eq(3));
try {
write(10);
fail("Expected an exception");
} catch (Http2Exception e) {
assertFalse(Http2Exception.isStreamError(e));
assertEquals(Http2Error.INTERNAL_ERROR, e.error());
assertSame(fakeException, e.getCause());
}
verifyWrite(atMost(1), STREAM_A, 1);
verifyWrite(atMost(1), STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(atMost(1), STREAM_D, 4);
doNothing().when(writer).write(same(stream(STREAM_C)), eq(3));
write(10);
verifyWrite(STREAM_A, 1);
verifyWrite(STREAM_B, 2);
verifyWrite(STREAM_C, 3);
verifyWrite(STREAM_D, 4);
}
/**
* In this test, we verify that each stream is allocated a minimum chunk size. When bytes
* run out, the remaining streams will be next in line for the next iteration.
*/
@Test
public void minChunkShouldBeAllocatedPerStream() throws Http2Exception {
// Re-assign weights.
setPriority(STREAM_A, 0, (short) 50, false);
setPriority(STREAM_B, 0, (short) 200, false);
setPriority(STREAM_C, STREAM_A, (short) 100, false);
setPriority(STREAM_D, STREAM_A, (short) 100, false);
// Update in reverse order. This will yield a queue in the order A, B, C, D. This
// is due to the fact that when a stream is enqueued the first time, it is added to the
// head of the queue.
updateStream(STREAM_D, CHUNK_SIZE, true);
updateStream(STREAM_C, CHUNK_SIZE, true);
updateStream(STREAM_B, CHUNK_SIZE, true);
updateStream(STREAM_A, CHUNK_SIZE, true);
// Only write 3 * chunkSize, so that we'll only write to the first 3 streams.
int written = 3 * CHUNK_SIZE;
assertTrue(write(written));
assertEquals(CHUNK_SIZE, captureWrite(STREAM_A));
assertEquals(CHUNK_SIZE, captureWrite(STREAM_B));
assertEquals(CHUNK_SIZE, captureWrite(STREAM_C));
verifyNoMoreInteractions(writer);
reset(writer);
// Now write again and verify that the last stream is written to.
assertFalse(write(CHUNK_SIZE));
assertEquals(CHUNK_SIZE, captureWrite(STREAM_D));
verifyNoMoreInteractions(writer);
}
@Test
public void streamWithMoreDataShouldBeEnqueuedAfterWrite() throws Http2Exception {
// Give the stream a bunch of data.
updateStream(STREAM_A, 2 * CHUNK_SIZE, true);
// Write only part of the data.
assertTrue(write(CHUNK_SIZE));
assertEquals(CHUNK_SIZE, captureWrite(STREAM_A));
verifyNoMoreInteractions(writer);
reset(writer);
// Now write the rest of the data.
assertFalse(write(CHUNK_SIZE));
assertEquals(CHUNK_SIZE, captureWrite(STREAM_A));
verifyNoMoreInteractions(writer);
}
@Test
public void emptyFrameIsWritten() throws Http2Exception {
updateStream(STREAM_B, 0, true);
updateStream(STREAM_A, 10, true);
assertFalse(write(10));
verifyWrite(STREAM_A, 10);
verifyWrite(STREAM_B, 0);
verifyNoMoreInteractions(writer);
write(10);
verifyNoMoreInteractions(writer);
}
private Http2Stream stream(int streamId) {
return connection.stream(streamId);
}
private void updateStream(final int streamId, final int streamableBytes, final boolean hasFrame) {
final Http2Stream stream = stream(streamId);
distributor.updateStreamableBytes(new StreamByteDistributor.StreamState() {
@Override
public Http2Stream stream() {
return stream;
}
@Override
public int streamableBytes() {
return streamableBytes;
}
@Override
public boolean hasFrame() {
return hasFrame;
}
});
}
private void setPriority(int streamId, int parent, int weight, boolean exclusive) throws Http2Exception {
stream(streamId).setPriority(parent, (short) weight, exclusive);
}
private boolean write(int numBytes) throws Http2Exception {
return distributor.distribute(numBytes, writer);
}
private void verifyWrite(int streamId, int numBytes) {
verify(writer).write(same(stream(streamId)), eq(numBytes));
}
private void verifyWrite(VerificationMode mode, int streamId, int numBytes) {
verify(writer, mode).write(same(stream(streamId)), eq(numBytes));
}
private int captureWrite(int streamId) {
ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
verify(writer).write(same(stream(streamId)), captor.capture());
return captor.getValue();
}
}

View File

@ -0,0 +1,583 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.microbench.http2;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelHandlerInvoker;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2RemoteFlowController;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2RemoteFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamVisitor;
import io.netty.handler.codec.http2.PriorityStreamByteDistributor;
import io.netty.handler.codec.http2.StreamByteDistributor;
import io.netty.handler.codec.http2.UniformStreamByteDistributor;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.EventExecutor;
import org.openjdk.jmh.annotations.AuxCounters;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import java.net.SocketAddress;
/**
* Benchmark to compare stream byte distribution algorithms when priorities are identical for
* all streams.
*/
@Fork(1)
@State(Scope.Benchmark)
public class NoPriorityByteDistributionBenchmark extends AbstractMicrobenchmark {
public enum Algorithm {
PRIORITY,
UNIFORM
}
@Param({ "100", "10000" })
private int numStreams;
@Param({ "1024", "65536", "1048576" })
private int windowSize;
@Param
private Algorithm algorithm;
private Http2Connection connection;
private Http2Connection.PropertyKey dataRefresherKey;
private Http2RemoteFlowController controller;
private StreamByteDistributor distributor;
private AdditionalCounters counters;
/**
* Additional counters for a single iteration.
*/
@AuxCounters
@State(Scope.Thread)
public static class AdditionalCounters {
private int minWriteSize;
private int maxWriteSize;
private long totalBytes;
private long numWrites;
private int invocations;
public int minWriteSize() {
return minWriteSize;
}
public int avgWriteSize() {
return (int) (totalBytes / numWrites);
}
public int maxWriteSize() {
return maxWriteSize;
}
}
private Http2StreamVisitor invocationVisitor = new Http2StreamVisitor() {
@Override
public boolean visit(Http2Stream stream) throws Http2Exception {
// Restore the connection window.
resetWindow(stream);
// Restore the data to each stream.
dataRefresher(stream).refreshData();
return true;
}
};
@Setup(Level.Trial)
public void setupTrial() throws Http2Exception {
connection = new DefaultHttp2Connection(false);
dataRefresherKey = connection.newKey();
// Create the flow controller
switch (algorithm) {
case PRIORITY:
distributor = new PriorityStreamByteDistributor(connection);
break;
case UNIFORM:
distributor = new UniformStreamByteDistributor(connection);
break;
}
controller = new DefaultHttp2RemoteFlowController(connection, new ByteCounter(distributor));
controller.channelHandlerContext(new TestContext());
// Create the streams, each initialized with MAX_INT bytes.
for (int i = 0; i < numStreams; ++i) {
Http2Stream stream = connection.local().createStream(toStreamId(i), false);
addData(stream, Integer.MAX_VALUE);
stream.setProperty(dataRefresherKey, new DataRefresher(stream));
}
}
@Setup(Level.Invocation)
public void setupInvocation() throws Http2Exception {
resetWindow(connection.connectionStream());
connection.forEachActiveStream(invocationVisitor);
}
@Benchmark
public void write(AdditionalCounters counters) throws Http2Exception {
// Set up for this invocation. Doing this in the benchmark method since this
// seems to throw off the counters when run as a setup step for the invocation.
this.counters = counters;
counters.invocations++;
// Now run the benchmark method.
controller.writePendingBytes();
}
private void resetWindow(Http2Stream stream) throws Http2Exception {
controller.incrementWindowSize(stream, windowSize - controller.windowSize(stream));
}
private DataRefresher dataRefresher(Http2Stream stream) {
return (DataRefresher) stream.getProperty(dataRefresherKey);
}
private void addData(Http2Stream stream, final int dataSize) {
controller.addFlowControlled(stream, new Http2RemoteFlowController.FlowControlled() {
int size = dataSize;
@Override
public int size() {
return size;
}
@Override
public void error(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
}
@Override
public void writeComplete() {
// Don't care.
}
@Override
public void write(ChannelHandlerContext ctx, int allowedBytes) {
size -= allowedBytes;
}
@Override
public boolean merge(ChannelHandlerContext ctx,
Http2RemoteFlowController.FlowControlled next) {
int nextSize = next.size();
if (Integer.MAX_VALUE - nextSize < size) {
// Disallow merge to avoid integer overflow.
return false;
}
// Merge.
size += nextSize;
return true;
}
});
}
private static int toStreamId(int i) {
return 2 * i + 1;
}
private final class DataRefresher {
private final Http2Stream stream;
private int data;
private DataRefresher(Http2Stream stream) {
this.stream = stream;
}
void add(int data) {
this.data += data;
}
void refreshData() {
if (data > 0) {
addData(stream, data);
data = 0;
}
}
}
private final class ByteCounter implements StreamByteDistributor {
private final StreamByteDistributor delegate;
private ByteCounter(StreamByteDistributor delegate) {
this.delegate = delegate;
}
@Override
public void updateStreamableBytes(StreamState state) {
delegate.updateStreamableBytes(state);
}
@Override
public boolean distribute(int maxBytes, Writer writer)
throws Http2Exception {
return delegate.distribute(maxBytes, new CountingWriter(writer));
}
private final class CountingWriter implements Writer {
private final Writer delegate;
private CountingWriter(Writer delegate) {
this.delegate = delegate;
}
@Override
public void write(Http2Stream stream, int numBytes) {
if (numBytes > 0) {
// Add the data to the refresher so that it can be given back to the
// stream at the end of the iteration.
DataRefresher refresher = dataRefresher(stream);
refresher.add(numBytes);
counters.numWrites++;
counters.totalBytes += numBytes;
if (counters.minWriteSize == 0 || numBytes < counters.minWriteSize) {
counters.minWriteSize = numBytes;
}
if (numBytes > counters.maxWriteSize) {
counters.maxWriteSize = numBytes;
}
}
delegate.write(stream, numBytes);
}
}
}
private static class TestContext implements ChannelHandlerContext {
private Channel channel = new TestChannel();
@Override
public Channel channel() {
return channel;
}
@Override
public EventExecutor executor() {
return channel.eventLoop();
}
@Override
public ChannelHandlerInvoker invoker() {
throw new UnsupportedOperationException();
}
@Override
public String name() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandler handler() {
throw new UnsupportedOperationException();
}
@Override
public boolean isRemoved() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelRegistered() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelUnregistered() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelActive() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelInactive() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireUserEventTriggered(Object event) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelRead(Object msg) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelReadComplete() {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext fireChannelWritabilityChanged() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture bind(SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture disconnect() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture close() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture deregister() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture disconnect(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture close(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture deregister(ChannelPromise promise) {
throw new UnsupportedOperationException();
}
@Override
public ChannelHandlerContext read() {
throw new UnsupportedOperationException();
}
@Override
public ChannelFuture write(Object msg) {
return channel.newSucceededFuture();
}
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return promise;
}
@Override
public ChannelHandlerContext flush() {
return this;
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return promise;
}
@Override
public ChannelFuture writeAndFlush(Object msg) {
return null;
}
@Override
public ChannelPipeline pipeline() {
return channel.pipeline();
}
@Override
public ByteBufAllocator alloc() {
return channel.alloc();
}
@Override
public ChannelPromise newPromise() {
return channel.newPromise();
}
@Override
public ChannelProgressivePromise newProgressivePromise() {
return channel.newProgressivePromise();
}
@Override
public ChannelFuture newSucceededFuture() {
return channel.newSucceededFuture();
}
@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return channel.newFailedFuture(cause);
}
@Override
public ChannelPromise voidPromise() {
return channel.voidPromise();
}
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
return channel.attr(key);
}
@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
return channel.hasAttr(key);
}
}
private static class TestChannel extends AbstractChannel {
private DefaultChannelConfig config = new DefaultChannelConfig(this);
private class TestUnsafe extends AbstractUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
}
}
public TestChannel() {
super(null);
config.setWriteBufferHighWaterMark(Integer.MAX_VALUE);
config.setWriteBufferLowWaterMark(Integer.MAX_VALUE);
}
@Override
public long bytesBeforeUnwritable() {
return Long.MAX_VALUE;
}
@Override
public boolean isWritable() {
return true;
}
@Override
public ChannelConfig config() {
return new DefaultChannelConfig(this);
}
@Override
public boolean isOpen() {
return true;
}
@Override
public boolean isActive() {
return true;
}
@Override
public ChannelMetadata metadata() {
return null;
}
@Override
protected AbstractUnsafe newUnsafe() {
return new TestUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return null;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception { }
@Override
protected void doDisconnect() throws Exception { }
@Override
protected void doClose() throws Exception { }
@Override
protected void doBeginRead() throws Exception { }
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { }
}
}