Correctly update Channel.isWritable() when the write happens from outside the EventLoop in a fast fashion. Fixes [#1697]

Introduce a new interface called MessageSizeEstimator. This can be specific per Channel (via ChannelConfig). The MessageSizeEstimator will be used to estimate for a message that should be written. The default implementation handles ByteBuf, ByteBufHolder and FileRegion. A user is free to plug-in his/her own implementation for different behaviour.
This commit is contained in:
Norman Maurer 2013-08-05 14:58:16 +02:00
parent 60b889375c
commit 48a7a21541
28 changed files with 435 additions and 44 deletions

View File

@ -18,6 +18,7 @@ package io.netty.channel.rxtx;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import java.util.Map; import java.util.Map;
@ -244,4 +245,10 @@ final class DefaultRxtxChannelConfig extends DefaultChannelConfig implements Rxt
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this; return this;
} }
@Override
public RxtxChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.rxtx;
import gnu.io.SerialPort; import gnu.io.SerialPort;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
/** /**
@ -286,4 +287,13 @@ public interface RxtxChannelConfig extends ChannelConfig {
@Override @Override
RxtxChannelConfig setAutoRead(boolean autoRead); RxtxChannelConfig setAutoRead(boolean autoRead);
@Override
RxtxChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
RxtxChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
RxtxChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -22,6 +22,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -173,7 +174,8 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc
@Override @Override
public SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { public SctpChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (SctpChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis); super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
} }
@Override @Override
@ -184,12 +186,14 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc
@Override @Override
public SctpChannelConfig setWriteSpinCount(int writeSpinCount) { public SctpChannelConfig setWriteSpinCount(int writeSpinCount) {
return (SctpChannelConfig) super.setWriteSpinCount(writeSpinCount); super.setWriteSpinCount(writeSpinCount);
return this;
} }
@Override @Override
public SctpChannelConfig setAllocator(ByteBufAllocator allocator) { public SctpChannelConfig setAllocator(ByteBufAllocator allocator) {
return (SctpChannelConfig) super.setAllocator(allocator); super.setAllocator(allocator);
return this;
} }
@Override @Override
@ -200,16 +204,25 @@ public class DefaultSctpChannelConfig extends DefaultChannelConfig implements Sc
@Override @Override
public SctpChannelConfig setAutoRead(boolean autoRead) { public SctpChannelConfig setAutoRead(boolean autoRead) {
return (SctpChannelConfig) super.setAutoRead(autoRead); super.setAutoRead(autoRead);
return this;
} }
@Override @Override
public SctpChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { public SctpChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (SctpChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
} }
@Override @Override
public SctpChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { public SctpChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (SctpChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public SctpChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
} }
} }

View File

@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
@ -202,4 +203,10 @@ public class DefaultSctpServerChannelConfig extends DefaultChannelConfig impleme
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this; return this;
} }
@Override
public SctpServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.sctp;
import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams; import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
/** /**
@ -113,4 +114,13 @@ public interface SctpChannelConfig extends ChannelConfig {
@Override @Override
SctpChannelConfig setAutoRead(boolean autoRead); SctpChannelConfig setAutoRead(boolean autoRead);
@Override
SctpChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
SctpChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
SctpChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.sctp;
import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams; import com.sun.nio.sctp.SctpStandardSocketOptions.InitMaxStreams;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
/** /**
@ -108,4 +109,13 @@ public interface SctpServerChannelConfig extends ChannelConfig {
@Override @Override
SctpServerChannelConfig setAutoRead(boolean autoRead); SctpServerChannelConfig setAutoRead(boolean autoRead);
@Override
SctpServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
SctpServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
SctpServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -21,6 +21,7 @@ import com.barchart.udt.nio.ChannelUDT;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import java.io.IOException; import java.io.IOException;
@ -270,11 +271,19 @@ public class DefaultUdtChannelConfig extends DefaultChannelConfig implements
@Override @Override
public UdtChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { public UdtChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (UdtChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
} }
@Override @Override
public UdtChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { public UdtChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (UdtChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public UdtChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
} }
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.udt;
import com.barchart.udt.nio.ChannelUDT; import com.barchart.udt.nio.ChannelUDT;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import java.io.IOException; import java.io.IOException;
@ -170,4 +171,22 @@ public class DefaultUdtServerChannelConfig extends DefaultUdtChannelConfig
super.setAutoRead(autoRead); super.setAutoRead(autoRead);
return this; return this;
} }
@Override
public UdtServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public UdtServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public UdtServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
} }

View File

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
/** /**
@ -129,6 +130,15 @@ public interface UdtChannelConfig extends ChannelConfig {
@Override @Override
UdtChannelConfig setAutoRead(boolean autoRead); UdtChannelConfig setAutoRead(boolean autoRead);
@Override
UdtChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
UdtChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
UdtChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
/** /**
* Sets {@link OptionUDT#Protocol_Receive_Buffer_Size} * Sets {@link OptionUDT#Protocol_Receive_Buffer_Size}
*/ */

View File

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
/** /**
@ -86,4 +87,13 @@ public interface UdtServerChannelConfig extends UdtChannelConfig {
@Override @Override
UdtServerChannelConfig setSystemSendBufferSize(int size); UdtServerChannelConfig setSystemSendBufferSize(int size);
@Override
UdtServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
UdtServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
UdtServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -15,9 +15,7 @@
*/ */
package io.netty.channel; package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
@ -48,6 +46,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
} }
private MessageSizeEstimator.Handle estimatorHandle;
private final Channel parent; private final Channel parent;
private final long hashCode = ThreadLocalRandom.current().nextLong(); private final long hashCode = ThreadLocalRandom.current().nextLong();
private final Unsafe unsafe; private final Unsafe unsafe;
@ -360,6 +360,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return voidPromise; return voidPromise;
} }
final MessageSizeEstimator.Handle estimatorHandle() {
if (estimatorHandle == null) {
estimatorHandle = config().getMessageSizeEstimator().newHandle();
}
return estimatorHandle;
}
/** /**
* {@link Unsafe} implementation which sub-classes must extend and use. * {@link Unsafe} implementation which sub-classes must extend and use.
*/ */
@ -784,20 +791,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
} }
/**
* Calculate the number of bytes a message takes up in memory. Sub-classes may override this if they use different
* messages then {@link ByteBuf} or {@link ByteBufHolder}. If the size can not be calculated 0 should be returned.
*/
protected int calculateMessageSize(Object message) {
if (message instanceof ByteBuf) {
return ((ByteBuf) message).readableBytes();
}
if (message instanceof ByteBufHolder) {
return ((ByteBufHolder) message).content().readableBytes();
}
return 0;
}
final class CloseFuture extends DefaultChannelPromise { final class CloseFuture extends DefaultChannelPromise {
CloseFuture(AbstractChannel ch) { CloseFuture(AbstractChannel ch) {

View File

@ -219,4 +219,16 @@ public interface ChannelConfig {
* {@code true} again. * {@code true} again.
*/ */
ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark); ChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
/**
* Returns {@link MessageSizeEstimator} which is used for the channel
* to detect the size of a message.
*/
MessageSizeEstimator getMessageSizeEstimator();
/**
* Set the {@link ByteBufAllocator} which is used for the channel
* to detect the size of a message.
*/
ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -31,6 +31,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/** /**
* (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending * (Transport implementors only) an internal data structure used by {@link AbstractChannel} to store its pending
@ -79,7 +80,12 @@ public final class ChannelOutboundBuffer {
private int unflushedCount; private int unflushedCount;
private boolean inFail; private boolean inFail;
private long totalPendingSize;
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile long totalPendingSize;
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER = private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> WRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "writable"); AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "writable");
@ -136,7 +142,10 @@ public final class ChannelOutboundBuffer {
unflushed = this.unflushed; unflushed = this.unflushed;
} }
final int size = channel.calculateMessageSize(msg); int size = channel.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
unflushed[unflushedCount] = msg; unflushed[unflushedCount] = msg;
unflushedPendingSizes[unflushedCount] = size; unflushedPendingSizes[unflushedCount] = size;
unflushedPromises[unflushedCount] = promise; unflushedPromises[unflushedCount] = promise;
@ -269,12 +278,22 @@ public final class ChannelOutboundBuffer {
tail = n; tail = n;
} }
private void incrementPendingOutboundBytes(int size) { /**
* Increment the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void incrementPendingOutboundBytes(int size) {
if (size == 0) { if (size == 0) {
return; return;
} }
long newWriteBufferSize = totalPendingSize += size; long oldValue = totalPendingSize;
long newWriteBufferSize = oldValue + size;
while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
oldValue = totalPendingSize;
newWriteBufferSize = oldValue + size;
}
int highWaterMark = channel.config().getWriteBufferHighWaterMark(); int highWaterMark = channel.config().getWriteBufferHighWaterMark();
if (newWriteBufferSize > highWaterMark) { if (newWriteBufferSize > highWaterMark) {
@ -284,12 +303,22 @@ public final class ChannelOutboundBuffer {
} }
} }
private void decrementPendingOutboundBytes(int size) { /**
* Decrement the pending bytes which will be written at some point.
* This method is thread-safe!
*/
void decrementPendingOutboundBytes(int size) {
if (size == 0) { if (size == 0) {
return; return;
} }
long newWriteBufferSize = totalPendingSize -= size; long oldValue = totalPendingSize;
long newWriteBufferSize = oldValue - size;
while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
oldValue = totalPendingSize;
newWriteBufferSize = oldValue - size;
}
int lowWaterMark = channel.config().getWriteBufferLowWaterMark(); int lowWaterMark = channel.config().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) { if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark) {
@ -459,7 +488,7 @@ public final class ChannelOutboundBuffer {
} }
boolean getWritable() { boolean getWritable() {
return WRITABLE_UPDATER.get(this) != 0; return writable != 0;
} }
public int size() { public int size() {
@ -524,15 +553,22 @@ public final class ChannelOutboundBuffer {
unflushed[i] = null; unflushed[i] = null;
safeFail(unflushedPromises[i], cause); safeFail(unflushedPromises[i], cause);
unflushedPromises[i] = null; unflushedPromises[i] = null;
// Just decrease; do not trigger any events via decrementPendingOutboundBytes() // Just decrease; do not trigger any events via decrementPendingOutboundBytes()
totalPendingSize -= unflushedPendingSizes[i]; int size = unflushedPendingSizes[i];
long oldValue = totalPendingSize;
long newWriteBufferSize = oldValue - size;
while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {
oldValue = totalPendingSize;
newWriteBufferSize = oldValue - size;
}
unflushedPendingSizes[i] = 0; unflushedPendingSizes[i] = 0;
} }
} finally { } finally {
this.unflushedCount = 0; this.unflushedCount = 0;
inFail = false; inFail = false;
} }
RECYCLER.recycle(this, handle); RECYCLER.recycle(this, handle);
} }

View File

@ -32,12 +32,16 @@ public class DefaultChannelConfig implements ChannelConfig {
private static final ByteBufAllocator DEFAULT_ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; private static final ByteBufAllocator DEFAULT_ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = AdaptiveRecvByteBufAllocator.DEFAULT; private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = AdaptiveRecvByteBufAllocator.DEFAULT;
private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
private static final int DEFAULT_CONNECT_TIMEOUT = 30000; private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
protected final Channel channel; protected final Channel channel;
private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR; private volatile ByteBufAllocator allocator = DEFAULT_ALLOCATOR;
private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR; private volatile RecvByteBufAllocator rcvBufAllocator = DEFAULT_RCVBUF_ALLOCATOR;
private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT; private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
private volatile int maxMessagesPerRead; private volatile int maxMessagesPerRead;
private volatile int writeSpinCount = 16; private volatile int writeSpinCount = 16;
@ -282,4 +286,18 @@ public class DefaultChannelConfig implements ChannelConfig {
this.writeBufferLowWaterMark = writeBufferLowWaterMark; this.writeBufferLowWaterMark = writeBufferLowWaterMark;
return this; return this;
} }
@Override
public MessageSizeEstimator getMessageSizeEstimator() {
return msgSizeEstimator;
}
@Override
public ChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
if (estimator == null) {
throw new NullPointerException("estimator");
}
msgSizeEstimator = estimator;
return this;
}
} }

View File

@ -31,7 +31,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
volatile DefaultChannelHandlerContext next; volatile DefaultChannelHandlerContext next;
volatile DefaultChannelHandlerContext prev; volatile DefaultChannelHandlerContext prev;
private final Channel channel; private final AbstractChannel channel;
private final DefaultChannelPipeline pipeline; private final DefaultChannelPipeline pipeline;
private final String name; private final String name;
private final ChannelHandler handler; private final ChannelHandler handler;
@ -632,7 +632,15 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
next.invokeWrite(msg, promise); next.invokeWrite(msg, promise);
} else { } else {
executor.execute(WriteTask.newInstance(next, msg, promise)); final int size = channel.estimatorHandle().size(msg);
if (size > 0) {
ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.incrementPendingOutboundBytes(size);
}
}
executor.execute(WriteTask.newInstance(next, msg, size, promise));
} }
return promise; return promise;
@ -822,6 +830,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
private DefaultChannelHandlerContext ctx; private DefaultChannelHandlerContext ctx;
private Object msg; private Object msg;
private ChannelPromise promise; private ChannelPromise promise;
private int size;
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() { private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override @Override
@ -830,11 +839,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
} }
}; };
private static WriteTask newInstance(DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) { private static WriteTask newInstance(
DefaultChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
WriteTask task = RECYCLER.get(); WriteTask task = RECYCLER.get();
task.ctx = ctx; task.ctx = ctx;
task.msg = msg; task.msg = msg;
task.promise = promise; task.promise = promise;
task.size = size;
return task; return task;
} }
@ -847,6 +858,13 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
@Override @Override
public void run() { public void run() {
try { try {
if (size > 0) {
ChannelOutboundBuffer buffer = ctx.channel.unsafe().outboundBuffer();
// Check for null as it may be set to null if the channel is closed already
if (buffer != null) {
buffer.decrementPendingOutboundBytes(size);
}
}
ctx.invokeWrite(msg, promise); ctx.invokeWrite(msg, promise);
} finally { } finally {
// Set to null so the GC can collect them directly // Set to null so the GC can collect them directly

View File

@ -55,7 +55,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
} }
} }
final Channel channel; final AbstractChannel channel;
final DefaultChannelHandlerContext head; final DefaultChannelHandlerContext head;
final DefaultChannelHandlerContext tail; final DefaultChannelHandlerContext tail;
@ -66,7 +66,7 @@ final class DefaultChannelPipeline implements ChannelPipeline {
final Map<EventExecutorGroup, EventExecutor> childExecutors = final Map<EventExecutorGroup, EventExecutor> childExecutors =
new IdentityHashMap<EventExecutorGroup, EventExecutor>(); new IdentityHashMap<EventExecutorGroup, EventExecutor>();
public DefaultChannelPipeline(Channel channel) { public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) { if (channel == null) {
throw new NullPointerException("channel"); throw new NullPointerException("channel");
} }

View File

@ -0,0 +1,72 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
/**
* Default {@link MessageSizeEstimator} implementation which supports the estimation of the size of
* {@link ByteBuf}, {@link ByteBufHolder} and {@link FileRegion}.
*/
public final class DefaultMessageSizeEstimator implements MessageSizeEstimator {
private static final class HandleImpl implements Handle {
private final int unknownSize;
private HandleImpl(int unknownSize) {
this.unknownSize = unknownSize;
}
@Override
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
}
/**
* Return the default implementation which returns {@code -1} for unknown messages.
*/
public static final MessageSizeEstimator DEFAULT = new DefaultMessageSizeEstimator(0);
private final Handle handle;
/**
* Create a new instance
*
* @param unknownSize The size which is returned for unknown messages.
*/
public DefaultMessageSizeEstimator(int unknownSize) {
if (unknownSize < 0) {
throw new IllegalArgumentException("unknownSize: " + unknownSize + " (expected: >= 0)");
}
handle = new HandleImpl(unknownSize);
}
@Override
public Handle newHandle() {
return handle;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
/**
* Responsible to estimate size of a message. The size represent how much memory the message will ca. reserve in
* memory.
*/
public interface MessageSizeEstimator {
/**
* Creates a new handle. The handle provides the actual operations.
*/
Handle newHandle();
interface Handle {
/**
* Calculate the size of the given message.
*
* @param msg The message for which the size should be calculated
* @return size The size in bytes. The returned size must be >= 0
*/
int size(Object msg);
}
}

View File

@ -18,6 +18,7 @@ package io.netty.channel.socket;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import java.net.InetAddress; import java.net.InetAddress;
@ -172,4 +173,7 @@ public interface DatagramChannelConfig extends ChannelConfig {
@Override @Override
DatagramChannelConfig setAutoRead(boolean autoRead); DatagramChannelConfig setAutoRead(boolean autoRead);
@Override
DatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -20,6 +20,7 @@ import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -394,4 +395,10 @@ public class DefaultDatagramChannelConfig extends DefaultChannelConfig implement
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this; return this;
} }
@Override
public DatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
} }

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.NetUtil; import io.netty.util.NetUtil;
@ -191,4 +192,10 @@ public class DefaultServerSocketChannelConfig extends DefaultChannelConfig
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this; return this;
} }
@Override
public ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
} }

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -280,7 +281,8 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { public SocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
return (SocketChannelConfig) super.setConnectTimeoutMillis(connectTimeoutMillis); super.setConnectTimeoutMillis(connectTimeoutMillis);
return this;
} }
@Override @Override
@ -291,12 +293,14 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public SocketChannelConfig setWriteSpinCount(int writeSpinCount) { public SocketChannelConfig setWriteSpinCount(int writeSpinCount) {
return (SocketChannelConfig) super.setWriteSpinCount(writeSpinCount); super.setWriteSpinCount(writeSpinCount);
return this;
} }
@Override @Override
public SocketChannelConfig setAllocator(ByteBufAllocator allocator) { public SocketChannelConfig setAllocator(ByteBufAllocator allocator) {
return (SocketChannelConfig) super.setAllocator(allocator); super.setAllocator(allocator);
return this;
} }
@Override @Override
@ -307,16 +311,25 @@ public class DefaultSocketChannelConfig extends DefaultChannelConfig
@Override @Override
public SocketChannelConfig setAutoRead(boolean autoRead) { public SocketChannelConfig setAutoRead(boolean autoRead) {
return (SocketChannelConfig) super.setAutoRead(autoRead); super.setAutoRead(autoRead);
return this;
} }
@Override @Override
public SocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { public SocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
return (SocketChannelConfig) super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
} }
@Override @Override
public SocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { public SocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
return (SocketChannelConfig) super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
} }
} }

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import java.net.ServerSocket; import java.net.ServerSocket;
@ -100,4 +101,7 @@ public interface ServerSocketChannelConfig extends ChannelConfig {
@Override @Override
ServerSocketChannelConfig setAutoRead(boolean autoRead); ServerSocketChannelConfig setAutoRead(boolean autoRead);
@Override
ServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -20,6 +20,7 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import java.net.Socket; import java.net.Socket;
@ -173,4 +174,7 @@ public interface SocketChannelConfig extends ChannelConfig {
@Override @Override
SocketChannelConfig setAutoRead(boolean autoRead); SocketChannelConfig setAutoRead(boolean autoRead);
@Override
SocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DefaultServerSocketChannelConfig; import io.netty.channel.socket.DefaultServerSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
@ -143,4 +144,22 @@ public class DefaultOioServerSocketChannelConfig extends DefaultServerSocketChan
super.setAutoRead(autoRead); super.setAutoRead(autoRead);
return this; return this;
} }
@Override
public OioServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public OioServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public OioServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
} }

View File

@ -18,9 +18,11 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import java.io.IOException; import java.io.IOException;
import java.net.Socket; import java.net.Socket;
@ -171,4 +173,22 @@ public class DefaultOioSocketChannelConfig extends DefaultSocketChannelConfig im
super.setAutoRead(autoRead); super.setAutoRead(autoRead);
return this; return this;
} }
@Override
public OioSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
return this;
}
@Override
public OioSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
return this;
}
@Override
public OioSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
super.setMessageSizeEstimator(estimator);
return this;
}
} }

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ServerSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannelConfig;
@ -79,4 +80,13 @@ public interface OioServerSocketChannelConfig extends ServerSocketChannelConfig
@Override @Override
OioServerSocketChannelConfig setAutoRead(boolean autoRead); OioServerSocketChannelConfig setAutoRead(boolean autoRead);
@Override
OioServerSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
OioServerSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
OioServerSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.socket.oio;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
@ -94,4 +95,13 @@ public interface OioSocketChannelConfig extends SocketChannelConfig {
@Override @Override
OioSocketChannelConfig setAutoRead(boolean autoRead); OioSocketChannelConfig setAutoRead(boolean autoRead);
@Override
OioSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark);
@Override
OioSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark);
@Override
OioSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator);
} }