* Un-deprecated NioSocketChannelConfig.receiveBufferSizePredictor to resolve the FIXME related with buffer creation
* Removed the usage of pre-allocated buffers (reverted back to old behavior)
This commit is contained in:
parent
80237f3074
commit
01bfefa6a4
@ -44,11 +44,10 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
|
|||||||
private static final InternalLogger logger =
|
private static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(DefaultNioSocketChannelConfig.class);
|
InternalLoggerFactory.getInstance(DefaultNioSocketChannelConfig.class);
|
||||||
|
|
||||||
private static final ReceiveBufferSizePredictor UNUSED_PREDICTOR =
|
|
||||||
new DefaultReceiveBufferSizePredictor();
|
|
||||||
|
|
||||||
private volatile int writeBufferHighWaterMark = 64 * 1024;
|
private volatile int writeBufferHighWaterMark = 64 * 1024;
|
||||||
private volatile int writeBufferLowWaterMark = 32 * 1024;
|
private volatile int writeBufferLowWaterMark = 32 * 1024;
|
||||||
|
private volatile ReceiveBufferSizePredictor predictor =
|
||||||
|
new DefaultReceiveBufferSizePredictor();
|
||||||
private volatile int writeSpinCount = 16;
|
private volatile int writeSpinCount = 16;
|
||||||
|
|
||||||
DefaultNioSocketChannelConfig(Socket socket) {
|
DefaultNioSocketChannelConfig(Socket socket) {
|
||||||
@ -116,15 +115,15 @@ class DefaultNioSocketChannelConfig extends DefaultSocketChannelConfig
|
|||||||
}
|
}
|
||||||
|
|
||||||
public ReceiveBufferSizePredictor getReceiveBufferSizePredictor() {
|
public ReceiveBufferSizePredictor getReceiveBufferSizePredictor() {
|
||||||
logger.warn(
|
return predictor;
|
||||||
"Detected an access to a deprecated configuration parameter: " +
|
|
||||||
"receiveBufferSizePredictor");
|
|
||||||
return UNUSED_PREDICTOR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setReceiveBufferSizePredictor(
|
public void setReceiveBufferSizePredictor(
|
||||||
ReceiveBufferSizePredictor predictor) {
|
ReceiveBufferSizePredictor predictor) {
|
||||||
getReceiveBufferSizePredictor();
|
if (predictor == null) {
|
||||||
|
throw new NullPointerException("predictor");
|
||||||
|
}
|
||||||
|
this.predictor = predictor;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isReadWriteFair() {
|
public boolean isReadWriteFair() {
|
||||||
|
@ -87,23 +87,17 @@ public interface NioSocketChannelConfig extends SocketChannelConfig {
|
|||||||
void setWriteSpinCount(int writeSpinCount);
|
void setWriteSpinCount(int writeSpinCount);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Works fine without prediction since 3.1.
|
|
||||||
*
|
|
||||||
* Returns the {@link ReceiveBufferSizePredictor} which predicts the
|
* Returns the {@link ReceiveBufferSizePredictor} which predicts the
|
||||||
* number of readable bytes in the socket receive buffer. The default
|
* number of readable bytes in the socket receive buffer. The default
|
||||||
* predictor is {@link DefaultReceiveBufferSizePredictor}.
|
* predictor is {@link DefaultReceiveBufferSizePredictor}.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
ReceiveBufferSizePredictor getReceiveBufferSizePredictor();
|
ReceiveBufferSizePredictor getReceiveBufferSizePredictor();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated Works fine without prediction since 3.1.
|
|
||||||
*
|
|
||||||
* Sets the {@link ReceiveBufferSizePredictor} which predicts the
|
* Sets the {@link ReceiveBufferSizePredictor} which predicts the
|
||||||
* number of readable bytes in the socket receive buffer. The default
|
* number of readable bytes in the socket receive buffer. The default
|
||||||
* predictor is {@link DefaultReceiveBufferSizePredictor}.
|
* predictor is {@link DefaultReceiveBufferSizePredictor}.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
|
||||||
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
|
void setReceiveBufferSizePredictor(ReceiveBufferSizePredictor predictor);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -41,6 +41,7 @@ import java.util.concurrent.locks.ReadWriteLock;
|
|||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.jboss.netty.buffer.ChannelBuffer;
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.buffer.ChannelBufferFactory;
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelException;
|
import org.jboss.netty.channel.ChannelException;
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
@ -255,29 +256,25 @@ class NioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ChannelBuffer preallocatedBuffer;
|
|
||||||
|
|
||||||
private static void read(SelectionKey k) {
|
private static void read(SelectionKey k) {
|
||||||
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
|
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
|
||||||
NioSocketChannel channel = (NioSocketChannel) k.attachment();
|
NioSocketChannel channel = (NioSocketChannel) k.attachment();
|
||||||
|
|
||||||
ChannelBuffer preallocatedBuffer = channel.getWorker().preallocatedBuffer;
|
ReceiveBufferSizePredictor predictor =
|
||||||
NioWorker worker = channel.getWorker();
|
channel.getConfig().getReceiveBufferSizePredictor();
|
||||||
worker.preallocatedBuffer = null;
|
ChannelBufferFactory bufferFactory =
|
||||||
|
channel.getConfig().getBufferFactory();
|
||||||
|
|
||||||
if (preallocatedBuffer == null) {
|
ChannelBuffer buffer =
|
||||||
// TODO Magic number
|
bufferFactory.getBuffer(predictor.nextReceiveBufferSize());
|
||||||
// FIXME: OOPS - the new buffer should not be shared by more than one connection
|
|
||||||
preallocatedBuffer = channel.getConfig().getBufferFactory().getBuffer(1048576);
|
|
||||||
}
|
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
int readBytes = 0;
|
int readBytes = 0;
|
||||||
boolean failure = true;
|
boolean failure = true;
|
||||||
try {
|
try {
|
||||||
while ((ret = preallocatedBuffer.writeBytes(ch, preallocatedBuffer.writableBytes())) > 0) {
|
while ((ret = buffer.writeBytes(ch, buffer.writableBytes())) > 0) {
|
||||||
readBytes += ret;
|
readBytes += ret;
|
||||||
if (!preallocatedBuffer.writable()) {
|
if (!buffer.writable()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -289,17 +286,11 @@ class NioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (readBytes > 0) {
|
if (readBytes > 0) {
|
||||||
|
// Update the predictor.
|
||||||
|
predictor.previousReceiveBufferSize(readBytes);
|
||||||
|
|
||||||
// Fire the event.
|
// Fire the event.
|
||||||
ChannelBuffer slice = preallocatedBuffer.slice(
|
fireMessageReceived(channel, buffer);
|
||||||
preallocatedBuffer.readerIndex(),
|
|
||||||
preallocatedBuffer.readableBytes());
|
|
||||||
preallocatedBuffer.readerIndex(preallocatedBuffer.writerIndex());
|
|
||||||
if (preallocatedBuffer.writable()) {
|
|
||||||
worker.preallocatedBuffer = preallocatedBuffer;
|
|
||||||
}
|
|
||||||
fireMessageReceived(channel, slice);
|
|
||||||
} else if (readBytes == 0) {
|
|
||||||
worker.preallocatedBuffer = preallocatedBuffer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ret < 0 || failure) {
|
if (ret < 0 || failure) {
|
||||||
|
Loading…
Reference in New Issue
Block a user