* Removed unnecessary conditional
* Optimized the use of memory bandwidth in NioWorker.read
This commit is contained in:
parent
03268034c7
commit
43a603cfde
@ -204,7 +204,7 @@ public class ByteBufferBackedChannelBuffer extends AbstractChannelBuffer {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!buffer.isReadOnly() && buffer.hasArray()) {
|
||||
if (buffer.hasArray()) {
|
||||
out.write(
|
||||
buffer.array(),
|
||||
index + buffer.arrayOffset(),
|
||||
|
@ -306,62 +306,49 @@ class NioWorker implements Runnable {
|
||||
}
|
||||
|
||||
private boolean read(SelectionKey k) {
|
||||
SocketChannel ch = (SocketChannel) k.channel();
|
||||
NioSocketChannel channel = (NioSocketChannel) k.attachment();
|
||||
final SocketChannel ch = (SocketChannel) k.channel();
|
||||
final NioSocketChannel channel = (NioSocketChannel) k.attachment();
|
||||
|
||||
ReceiveBufferSizePredictor predictor =
|
||||
final ReceiveBufferSizePredictor predictor =
|
||||
channel.getConfig().getReceiveBufferSizePredictor();
|
||||
ChannelBufferFactory bufferFactory =
|
||||
channel.getConfig().getBufferFactory();
|
||||
|
||||
ChannelBuffer buffer =
|
||||
bufferFactory.getBuffer(predictor.nextReceiveBufferSize());
|
||||
final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
|
||||
|
||||
int ret = 0;
|
||||
int readBytes = 0;
|
||||
boolean failure = true;
|
||||
|
||||
if (buffer.isDirect()) {
|
||||
try {
|
||||
while ((ret = buffer.writeBytes(ch, buffer.writableBytes())) > 0) {
|
||||
readBytes += ret;
|
||||
if (!buffer.writable()) {
|
||||
break;
|
||||
}
|
||||
ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
|
||||
try {
|
||||
while ((ret = ch.read(bb)) > 0) {
|
||||
readBytes += ret;
|
||||
if (!bb.hasRemaining()) {
|
||||
break;
|
||||
}
|
||||
failure = false;
|
||||
} catch (ClosedChannelException e) {
|
||||
// Can happen, and does not need a user attention.
|
||||
} catch (Throwable t) {
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
} else {
|
||||
ByteBuffer bb = recvBufferPool.acquire(buffer.writableBytes());
|
||||
try {
|
||||
while ((ret = ch.read(bb)) > 0) {
|
||||
readBytes += ret;
|
||||
if (!bb.hasRemaining()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
failure = false;
|
||||
} catch (ClosedChannelException e) {
|
||||
// Can happen, and does not need a user attention.
|
||||
} catch (Throwable t) {
|
||||
fireExceptionCaught(channel, t);
|
||||
} finally {
|
||||
bb.flip();
|
||||
buffer.writeBytes(bb);
|
||||
recvBufferPool.release(bb);
|
||||
}
|
||||
failure = false;
|
||||
} catch (ClosedChannelException e) {
|
||||
// Can happen, and does not need a user attention.
|
||||
} catch (Throwable t) {
|
||||
fireExceptionCaught(channel, t);
|
||||
}
|
||||
|
||||
if (readBytes > 0) {
|
||||
bb.flip();
|
||||
|
||||
final ChannelBufferFactory bufferFactory =
|
||||
channel.getConfig().getBufferFactory();
|
||||
final ChannelBuffer buffer = bufferFactory.getBuffer(
|
||||
bb.order(bufferFactory.getDefaultOrder()));
|
||||
|
||||
recvBufferPool.release(bb);
|
||||
|
||||
// Update the predictor.
|
||||
predictor.previousReceiveBufferSize(readBytes);
|
||||
|
||||
// Fire the event.
|
||||
fireMessageReceived(channel, buffer);
|
||||
} else {
|
||||
recvBufferPool.release(bb);
|
||||
}
|
||||
|
||||
if (ret < 0 || failure) {
|
||||
|
@ -25,7 +25,7 @@ import java.nio.ByteBuffer;
|
||||
*/
|
||||
final class SocketReceiveBufferPool {
|
||||
|
||||
private static final int POOL_SIZE = 4;
|
||||
private static final int POOL_SIZE = 8;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private final SoftReference<ByteBuffer>[] pool = new SoftReference[POOL_SIZE];
|
||||
@ -54,13 +54,12 @@ final class SocketReceiveBufferPool {
|
||||
|
||||
pool[i] = null;
|
||||
|
||||
buf.rewind();
|
||||
buf.limit(size);
|
||||
buf.clear();
|
||||
return buf;
|
||||
}
|
||||
|
||||
ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size));
|
||||
buf.limit(size);
|
||||
buf.clear();
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user