Correctly update Channel writability when queueing data in SslHandler.
Motivation:
A regression was introduced in 86e653e
which had the effect that the writability was not updated for a Channel while queueing data in the SslHandler.
Modifications:
- Factor out code that will increment / decrement pending bytes and use it in AbstractCoalescingBufferQueue and PendingWriteQueue
- Add test-case
Result:
Channel writability changes are triggered again.
This commit is contained in:
parent
d8e187ff2c
commit
55b501d0d4
@ -19,10 +19,13 @@ import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.DefaultChannelPromise;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import io.netty.handler.codec.http2.Http2RemoteFlowController.FlowControlled;
|
||||
@ -86,6 +89,9 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
@Mock
|
||||
private Channel channel;
|
||||
|
||||
@Mock
|
||||
private Channel.Unsafe unsafe;
|
||||
|
||||
@Mock
|
||||
private ChannelPipeline pipeline;
|
||||
|
||||
@ -112,8 +118,13 @@ public class DefaultHttp2ConnectionEncoderTest {
|
||||
public void setup() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
|
||||
ChannelMetadata metadata = new ChannelMetadata(false, 16);
|
||||
when(channel.isActive()).thenReturn(true);
|
||||
when(channel.pipeline()).thenReturn(pipeline);
|
||||
when(channel.metadata()).thenReturn(metadata);
|
||||
when(channel.unsafe()).thenReturn(unsafe);
|
||||
ChannelConfig config = new DefaultChannelConfig(channel);
|
||||
when(channel.config()).thenReturn(config);
|
||||
when(writer.configuration()).thenReturn(writerConfig);
|
||||
when(writerConfig.frameSizePolicy()).thenReturn(frameSizePolicy);
|
||||
when(frameSizePolicy.maxFrameSize()).thenReturn(64);
|
||||
|
@ -45,8 +45,10 @@ import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultChannelPromise;
|
||||
import io.netty.channel.DefaultMessageSizeEstimator;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
@ -81,6 +83,9 @@ public class StreamBufferingEncoderTest {
|
||||
@Mock
|
||||
private Channel channel;
|
||||
|
||||
@Mock
|
||||
private Channel.Unsafe unsafe;
|
||||
|
||||
@Mock
|
||||
private ChannelConfig config;
|
||||
|
||||
@ -137,6 +142,10 @@ public class StreamBufferingEncoderTest {
|
||||
when(channel.isWritable()).thenReturn(true);
|
||||
when(channel.bytesBeforeUnwritable()).thenReturn(Long.MAX_VALUE);
|
||||
when(config.getWriteBufferHighWaterMark()).thenReturn(Integer.MAX_VALUE);
|
||||
when(config.getMessageSizeEstimator()).thenReturn(DefaultMessageSizeEstimator.DEFAULT);
|
||||
ChannelMetadata metadata = new ChannelMetadata(false, 16);
|
||||
when(channel.metadata()).thenReturn(metadata);
|
||||
when(channel.unsafe()).thenReturn(unsafe);
|
||||
handler.handlerAdded(ctx);
|
||||
}
|
||||
|
||||
|
@ -352,7 +352,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
private boolean sentFirstMessage;
|
||||
private boolean flushedBeforeHandshake;
|
||||
private boolean readDuringHandshake;
|
||||
private final SslHandlerCoalescingBufferQueue pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(16);
|
||||
private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
|
||||
private Promise<Channel> handshakePromise = new LazyChannelPromise();
|
||||
private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
|
||||
|
||||
@ -375,6 +375,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
private volatile long handshakeTimeoutMillis = 10000;
|
||||
private volatile long closeNotifyFlushTimeoutMillis = 3000;
|
||||
private volatile long closeNotifyReadTimeoutMillis;
|
||||
volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
@ -474,7 +475,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
*/
|
||||
@UnstableApi
|
||||
public final void setWrapDataSize(int wrapDataSize) {
|
||||
pendingUnencryptedWrites.wrapDataSize = wrapDataSize;
|
||||
this.wrapDataSize = wrapDataSize;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -742,7 +743,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
ByteBufAllocator alloc = ctx.alloc();
|
||||
boolean needUnwrap = false;
|
||||
try {
|
||||
final int wrapDataSize = pendingUnencryptedWrites.wrapDataSize;
|
||||
final int wrapDataSize = this.wrapDataSize;
|
||||
// Only continue to loop if the handler was not removed in the meantime.
|
||||
// See https://github.com/netty/netty/issues/5860
|
||||
while (!ctx.isRemoved()) {
|
||||
@ -1536,6 +1537,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
|
||||
this.ctx = ctx;
|
||||
|
||||
pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(ctx.channel(), 16);
|
||||
if (ctx.channel().isActive()) {
|
||||
if (engine.getUseClientMode()) {
|
||||
// Begin the initial handshake.
|
||||
@ -1545,9 +1547,6 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
} else {
|
||||
applyHandshakeTimeout(null);
|
||||
}
|
||||
} else {
|
||||
// channelActive() event has not been fired yet. this.channelOpen() will be invoked
|
||||
// and initialization will occur there.
|
||||
}
|
||||
}
|
||||
|
||||
@ -1806,16 +1805,15 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
* goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
|
||||
* this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
|
||||
*/
|
||||
private static final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
|
||||
volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
|
||||
private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
|
||||
|
||||
SslHandlerCoalescingBufferQueue(int initSize) {
|
||||
super(initSize);
|
||||
SslHandlerCoalescingBufferQueue(Channel channel, int initSize) {
|
||||
super(channel, initSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
|
||||
final int wrapDataSize = this.wrapDataSize;
|
||||
final int wrapDataSize = SslHandler.this.wrapDataSize;
|
||||
if (cumulation instanceof CompositeByteBuf) {
|
||||
CompositeByteBuf composite = (CompositeByteBuf) cumulation;
|
||||
int numComponents = composite.numComponents();
|
||||
@ -1849,23 +1847,23 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
protected ByteBuf removeEmptyValue() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
|
||||
final int inReadableBytes = next.readableBytes();
|
||||
final int cumulationCapacity = cumulation.capacity();
|
||||
if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
|
||||
// Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
|
||||
// Only copy if there is enough space available and the capacity is large enough, and attempt to
|
||||
// resize if the capacity is small.
|
||||
(cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
|
||||
cumulationCapacity < wrapDataSize &&
|
||||
ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
|
||||
cumulation.writeBytes(next);
|
||||
next.release();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
|
||||
final int inReadableBytes = next.readableBytes();
|
||||
final int cumulationCapacity = cumulation.capacity();
|
||||
if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
|
||||
// Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
|
||||
// Only copy if there is enough space available and the capacity is large enough, and attempt to
|
||||
// resize if the capacity is small.
|
||||
(cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
|
||||
cumulationCapacity < wrapDataSize &&
|
||||
ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
|
||||
cumulation.writeBytes(next);
|
||||
next.release();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private final class LazyChannelPromise extends DefaultPromise<Channel> {
|
||||
|
@ -30,21 +30,31 @@ import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
|
||||
public abstract class AbstractCoalescingBufferQueue {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractCoalescingBufferQueue.class);
|
||||
private final ArrayDeque<Object> bufAndListenerPairs;
|
||||
private final PendingBytesTracker tracker;
|
||||
private int readableBytes;
|
||||
|
||||
public AbstractCoalescingBufferQueue(int initSize) {
|
||||
/**
|
||||
* Create a new instance.
|
||||
*
|
||||
* @param channel the {@link Channel} which will have the {@link Channel#isWritable()} reflect the amount of queued
|
||||
* buffers or {@code null} if there is no writability state updated.
|
||||
* @param initSize theinitial size of the underlying queue.
|
||||
*/
|
||||
protected AbstractCoalescingBufferQueue(Channel channel, int initSize) {
|
||||
bufAndListenerPairs = new ArrayDeque<Object>(initSize);
|
||||
tracker = channel == null ? null : PendingBytesTracker.newTracker(channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a buffer to the front of the queue.
|
||||
*/
|
||||
public final void addFirst(ByteBuf buf) {
|
||||
incrementReadableBytes(buf.readableBytes());
|
||||
// Listener would be added here, but since it is null there is no need. The assumption is there is already a
|
||||
// listener at the front of the queue, or there is a buffer at the front of the queue, which was spliced from
|
||||
// buf via remove().
|
||||
bufAndListenerPairs.addFirst(buf);
|
||||
|
||||
incrementReadableBytes(buf.readableBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -75,11 +85,11 @@ public abstract class AbstractCoalescingBufferQueue {
|
||||
public final void add(ByteBuf buf, ChannelFutureListener listener) {
|
||||
// buffers are added before promises so that we naturally 'consume' the entire buffer during removal
|
||||
// before we complete it's promise.
|
||||
incrementReadableBytes(buf.readableBytes());
|
||||
bufAndListenerPairs.add(buf);
|
||||
if (listener != null) {
|
||||
bufAndListenerPairs.add(listener);
|
||||
}
|
||||
incrementReadableBytes(buf.readableBytes());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -95,8 +105,7 @@ public abstract class AbstractCoalescingBufferQueue {
|
||||
assert entry instanceof ByteBuf;
|
||||
ByteBuf result = (ByteBuf) entry;
|
||||
|
||||
readableBytes -= result.readableBytes();
|
||||
assert readableBytes >= 0;
|
||||
decrementReadableBytes(result.readableBytes());
|
||||
|
||||
entry = bufAndListenerPairs.peek();
|
||||
if (entry instanceof ChannelFutureListener) {
|
||||
@ -154,8 +163,7 @@ public abstract class AbstractCoalescingBufferQueue {
|
||||
toReturn = toReturn == null ? composeFirst(alloc, entryBuffer) : compose(alloc, toReturn, entryBuffer);
|
||||
}
|
||||
}
|
||||
readableBytes -= originalBytes - bytes;
|
||||
assert readableBytes >= 0;
|
||||
decrementReadableBytes(originalBytes - bytes);
|
||||
return toReturn;
|
||||
}
|
||||
|
||||
@ -186,7 +194,7 @@ public abstract class AbstractCoalescingBufferQueue {
|
||||
*/
|
||||
public final void copyTo(AbstractCoalescingBufferQueue dest) {
|
||||
dest.bufAndListenerPairs.addAll(bufAndListenerPairs);
|
||||
dest.readableBytes += readableBytes;
|
||||
dest.incrementReadableBytes(readableBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -194,7 +202,7 @@ public abstract class AbstractCoalescingBufferQueue {
|
||||
* @param ctx The context to write all elements to.
|
||||
*/
|
||||
public final void writeAndRemoveAll(ChannelHandlerContext ctx) {
|
||||
readableBytes = 0;
|
||||
decrementReadableBytes(readableBytes);
|
||||
Throwable pending = null;
|
||||
ByteBuf previousBuf = null;
|
||||
for (;;) {
|
||||
@ -260,7 +268,7 @@ public abstract class AbstractCoalescingBufferQueue {
|
||||
}
|
||||
|
||||
private void releaseAndCompleteAll(ChannelFuture future) {
|
||||
readableBytes = 0;
|
||||
decrementReadableBytes(readableBytes);
|
||||
Throwable pending = null;
|
||||
for (;;) {
|
||||
Object entry = bufAndListenerPairs.poll();
|
||||
@ -292,5 +300,16 @@ public abstract class AbstractCoalescingBufferQueue {
|
||||
throw new IllegalStateException("buffer queue length overflow: " + readableBytes + " + " + increment);
|
||||
}
|
||||
readableBytes = nextReadableBytes;
|
||||
if (tracker != null) {
|
||||
tracker.incrementPendingOutboundBytes(increment);
|
||||
}
|
||||
}
|
||||
|
||||
private void decrementReadableBytes(int decrement) {
|
||||
readableBytes -= decrement;
|
||||
assert readableBytes >= 0;
|
||||
if (tracker != null) {
|
||||
tracker.decrementPendingOutboundBytes(decrement);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,11 @@ public final class CoalescingBufferQueue extends AbstractCoalescingBufferQueue {
|
||||
}
|
||||
|
||||
public CoalescingBufferQueue(Channel channel, int initSize) {
|
||||
super(initSize);
|
||||
this(channel, initSize, false);
|
||||
}
|
||||
|
||||
public CoalescingBufferQueue(Channel channel, int initSize, boolean updateWritability) {
|
||||
super(updateWritability ? channel : null, initSize);
|
||||
this.channel = ObjectUtil.checkNotNull(channel, "channel");
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,104 @@
|
||||
/*
|
||||
* Copyright 2017 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
|
||||
abstract class PendingBytesTracker implements MessageSizeEstimator.Handle {
|
||||
private final MessageSizeEstimator.Handle estimatorHandle;
|
||||
|
||||
private PendingBytesTracker(MessageSizeEstimator.Handle estimatorHandle) {
|
||||
this.estimatorHandle = ObjectUtil.checkNotNull(estimatorHandle, "estimatorHandle");
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int size(Object msg) {
|
||||
return estimatorHandle.size(msg);
|
||||
}
|
||||
|
||||
public abstract void incrementPendingOutboundBytes(long bytes);
|
||||
public abstract void decrementPendingOutboundBytes(long bytes);
|
||||
|
||||
static PendingBytesTracker newTracker(Channel channel) {
|
||||
if (channel.pipeline() instanceof DefaultChannelPipeline) {
|
||||
return new DefaultChannelPipelinePendingBytesTracker((DefaultChannelPipeline) channel.pipeline());
|
||||
} else {
|
||||
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
|
||||
MessageSizeEstimator.Handle handle = channel.config().getMessageSizeEstimator().newHandle();
|
||||
// We need to guard against null as channel.unsafe().outboundBuffer() may returned null
|
||||
// if the channel was already closed when constructing the PendingBytesTracker.
|
||||
// See https://github.com/netty/netty/issues/3967
|
||||
return buffer == null ?
|
||||
new NoopPendingBytesTracker(handle) : new ChannelOutboundBufferPendingBytesTracker(buffer, handle);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class DefaultChannelPipelinePendingBytesTracker extends PendingBytesTracker {
|
||||
private final DefaultChannelPipeline pipeline;
|
||||
|
||||
DefaultChannelPipelinePendingBytesTracker(DefaultChannelPipeline pipeline) {
|
||||
super(pipeline.estimatorHandle());
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementPendingOutboundBytes(long bytes) {
|
||||
pipeline.incrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decrementPendingOutboundBytes(long bytes) {
|
||||
pipeline.decrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class ChannelOutboundBufferPendingBytesTracker extends PendingBytesTracker {
|
||||
private final ChannelOutboundBuffer buffer;
|
||||
|
||||
ChannelOutboundBufferPendingBytesTracker(
|
||||
ChannelOutboundBuffer buffer, MessageSizeEstimator.Handle estimatorHandle) {
|
||||
super(estimatorHandle);
|
||||
this.buffer = buffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementPendingOutboundBytes(long bytes) {
|
||||
buffer.incrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decrementPendingOutboundBytes(long bytes) {
|
||||
buffer.decrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class NoopPendingBytesTracker extends PendingBytesTracker {
|
||||
|
||||
NoopPendingBytesTracker(MessageSizeEstimator.Handle estimatorHandle) {
|
||||
super(estimatorHandle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementPendingOutboundBytes(long bytes) {
|
||||
// Noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decrementPendingOutboundBytes(long bytes) {
|
||||
// Noop
|
||||
}
|
||||
}
|
||||
}
|
@ -38,7 +38,7 @@ public final class PendingWriteQueue {
|
||||
SystemPropertyUtil.getInt("io.netty.transport.pendingWriteSizeOverhead", 64);
|
||||
|
||||
private final ChannelHandlerContext ctx;
|
||||
private final PendingTracker tracker;
|
||||
private final PendingBytesTracker tracker;
|
||||
|
||||
// head and tail pointers for the linked-list structure. If empty head and tail are null.
|
||||
private PendingWrite head;
|
||||
@ -46,80 +46,9 @@ public final class PendingWriteQueue {
|
||||
private int size;
|
||||
private long bytes;
|
||||
|
||||
private interface PendingTracker extends MessageSizeEstimator.Handle {
|
||||
void incrementPendingOutboundBytes(long bytes);
|
||||
void decrementPendingOutboundBytes(long bytes);
|
||||
}
|
||||
|
||||
public PendingWriteQueue(ChannelHandlerContext ctx) {
|
||||
this.ctx = ObjectUtil.checkNotNull(ctx, "ctx");
|
||||
if (ctx.pipeline() instanceof DefaultChannelPipeline) {
|
||||
final DefaultChannelPipeline pipeline = (DefaultChannelPipeline) ctx.pipeline();
|
||||
tracker = new PendingTracker() {
|
||||
@Override
|
||||
public void incrementPendingOutboundBytes(long bytes) {
|
||||
pipeline.incrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decrementPendingOutboundBytes(long bytes) {
|
||||
pipeline.decrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size(Object msg) {
|
||||
return pipeline.estimatorHandle().size(msg);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
final MessageSizeEstimator.Handle estimator = ctx.channel().config().getMessageSizeEstimator().newHandle();
|
||||
final ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
|
||||
if (buffer == null) {
|
||||
tracker = new PendingTracker() {
|
||||
@Override
|
||||
public void incrementPendingOutboundBytes(long bytes) {
|
||||
// noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decrementPendingOutboundBytes(long bytes) {
|
||||
// noop
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size(Object msg) {
|
||||
return estimator.size(msg);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
tracker = new PendingTracker() {
|
||||
@Override
|
||||
public void incrementPendingOutboundBytes(long bytes) {
|
||||
// We need to guard against null as channel.unsafe().outboundBuffer() may returned null
|
||||
// if the channel was already closed when constructing the PendingWriteQueue.
|
||||
// See https://github.com/netty/netty/issues/3967
|
||||
if (buffer != null) {
|
||||
buffer.incrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void decrementPendingOutboundBytes(long bytes) {
|
||||
// We need to guard against null as channel.unsafe().outboundBuffer() may returned null
|
||||
// if the channel was already closed when constructing the PendingWriteQueue.
|
||||
// See https://github.com/netty/netty/issues/3967
|
||||
if (buffer != null) {
|
||||
buffer.decrementPendingOutboundBytes(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size(Object msg) {
|
||||
return estimator.size(msg);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
tracker = PendingBytesTracker.newTracker(ctx.channel());
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -19,10 +19,9 @@ import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -44,13 +43,15 @@ public class CoalescingBufferQueueTest {
|
||||
private boolean mouseDone;
|
||||
private boolean mouseSuccess;
|
||||
|
||||
private Channel channel = new EmbeddedChannel();
|
||||
|
||||
private CoalescingBufferQueue writeQueue = new CoalescingBufferQueue(channel);
|
||||
private EmbeddedChannel channel;
|
||||
private CoalescingBufferQueue writeQueue;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
mouseDone = false;
|
||||
mouseSuccess = false;
|
||||
channel = new EmbeddedChannel();
|
||||
writeQueue = new CoalescingBufferQueue(channel, 16, true);
|
||||
catPromise = newPromise();
|
||||
mouseListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
@ -66,13 +67,18 @@ public class CoalescingBufferQueueTest {
|
||||
mouse = Unpooled.wrappedBuffer("mouse".getBytes(CharsetUtil.US_ASCII));
|
||||
}
|
||||
|
||||
@After
|
||||
public void finish() {
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregateWithFullRead() {
|
||||
writeQueue.add(cat, catPromise);
|
||||
assertQueueSize(3, false);
|
||||
writeQueue.add(mouse, mouseListener);
|
||||
assertQueueSize(8, false);
|
||||
DefaultChannelPromise aggregatePromise = newPromise();
|
||||
ChannelPromise aggregatePromise = newPromise();
|
||||
assertEquals("catmouse", dequeue(8, aggregatePromise));
|
||||
assertQueueSize(0, true);
|
||||
assertFalse(catPromise.isSuccess());
|
||||
@ -101,7 +107,7 @@ public class CoalescingBufferQueueTest {
|
||||
public void testAggregateWithPartialRead() {
|
||||
writeQueue.add(cat, catPromise);
|
||||
writeQueue.add(mouse, mouseListener);
|
||||
DefaultChannelPromise aggregatePromise = newPromise();
|
||||
ChannelPromise aggregatePromise = newPromise();
|
||||
assertEquals("catm", dequeue(4, aggregatePromise));
|
||||
assertQueueSize(4, false);
|
||||
assertFalse(catPromise.isSuccess());
|
||||
@ -125,7 +131,7 @@ public class CoalescingBufferQueueTest {
|
||||
writeQueue.add(cat, catPromise);
|
||||
writeQueue.add(mouse, mouseListener);
|
||||
|
||||
DefaultChannelPromise aggregatePromise = newPromise();
|
||||
ChannelPromise aggregatePromise = newPromise();
|
||||
assertSame(cat, writeQueue.remove(3, aggregatePromise));
|
||||
assertFalse(catPromise.isSuccess());
|
||||
aggregatePromise.setSuccess();
|
||||
@ -149,7 +155,7 @@ public class CoalescingBufferQueueTest {
|
||||
mouse.release();
|
||||
|
||||
assertQueueSize(0, true);
|
||||
DefaultChannelPromise aggregatePromise = newPromise();
|
||||
ChannelPromise aggregatePromise = newPromise();
|
||||
assertEquals("", dequeue(Integer.MAX_VALUE, aggregatePromise));
|
||||
assertQueueSize(0, true);
|
||||
}
|
||||
@ -160,7 +166,7 @@ public class CoalescingBufferQueueTest {
|
||||
writeQueue.add(mouse, mouseListener);
|
||||
RuntimeException cause = new RuntimeException("ooops");
|
||||
writeQueue.releaseAndFailAll(cause);
|
||||
DefaultChannelPromise aggregatePromise = newPromise();
|
||||
ChannelPromise aggregatePromise = newPromise();
|
||||
assertQueueSize(0, true);
|
||||
assertEquals(0, cat.refCnt());
|
||||
assertEquals(0, mouse.refCnt());
|
||||
@ -176,7 +182,7 @@ public class CoalescingBufferQueueTest {
|
||||
writeQueue.add(cat, catPromise);
|
||||
writeQueue.add(empty, emptyPromise);
|
||||
assertQueueSize(3, false);
|
||||
DefaultChannelPromise aggregatePromise = newPromise();
|
||||
ChannelPromise aggregatePromise = newPromise();
|
||||
assertEquals("cat", dequeue(3, aggregatePromise));
|
||||
assertQueueSize(0, true);
|
||||
assertFalse(catPromise.isSuccess());
|
||||
@ -195,7 +201,7 @@ public class CoalescingBufferQueueTest {
|
||||
otherQueue.add(mouse, mouseListener);
|
||||
otherQueue.copyTo(writeQueue);
|
||||
assertQueueSize(8, false);
|
||||
DefaultChannelPromise aggregatePromise = newPromise();
|
||||
ChannelPromise aggregatePromise = newPromise();
|
||||
assertEquals("catmouse", dequeue(8, aggregatePromise));
|
||||
assertQueueSize(0, true);
|
||||
assertFalse(catPromise.isSuccess());
|
||||
@ -207,8 +213,49 @@ public class CoalescingBufferQueueTest {
|
||||
assertEquals(0, mouse.refCnt());
|
||||
}
|
||||
|
||||
private DefaultChannelPromise newPromise() {
|
||||
return new DefaultChannelPromise(channel, ImmediateEventExecutor.INSTANCE);
|
||||
@Test
|
||||
public void testWritabilityChanged() {
|
||||
testWritabilityChanged0(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWritabilityChangedFailAll() {
|
||||
testWritabilityChanged0(true);
|
||||
}
|
||||
|
||||
private void testWritabilityChanged0(boolean fail) {
|
||||
channel.config().setWriteBufferWaterMark(new WriteBufferWaterMark(3, 4));
|
||||
assertTrue(channel.isWritable());
|
||||
writeQueue.add(Unpooled.wrappedBuffer(new byte[] {1 , 2, 3}));
|
||||
assertTrue(channel.isWritable());
|
||||
writeQueue.add(Unpooled.wrappedBuffer(new byte[] {4, 5}));
|
||||
assertFalse(channel.isWritable());
|
||||
assertEquals(5, writeQueue.readableBytes());
|
||||
|
||||
if (fail) {
|
||||
writeQueue.releaseAndFailAll(new IllegalStateException());
|
||||
} else {
|
||||
ByteBuf buffer = writeQueue.removeFirst(voidPromise);
|
||||
assertEquals(1, buffer.readByte());
|
||||
assertEquals(2, buffer.readByte());
|
||||
assertEquals(3, buffer.readByte());
|
||||
assertFalse(buffer.isReadable());
|
||||
buffer.release();
|
||||
assertTrue(channel.isWritable());
|
||||
|
||||
buffer = writeQueue.removeFirst(voidPromise);
|
||||
assertEquals(4, buffer.readByte());
|
||||
assertEquals(5, buffer.readByte());
|
||||
assertFalse(buffer.isReadable());
|
||||
buffer.release();
|
||||
}
|
||||
|
||||
assertTrue(channel.isWritable());
|
||||
assertTrue(writeQueue.isEmpty());
|
||||
}
|
||||
|
||||
private ChannelPromise newPromise() {
|
||||
return channel.newPromise();
|
||||
}
|
||||
|
||||
private void assertQueueSize(int size, boolean isEmpty) {
|
||||
|
Loading…
Reference in New Issue
Block a user