[#1812] Allow for inline for most common cases when use NioByteUnsafe.read()

This commit is contained in:
Norman Maurer 2013-10-23 11:10:12 +02:00
parent 9e882c793b
commit db765e5dd4

View File

@ -56,88 +56,80 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private final class NioByteUnsafe extends AbstractNioUnsafe { private final class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle; private RecvByteBufAllocator.Handle allocHandle;
@Override private void removeReadOp() {
public void read() { SelectionKey key = selectionKey();
assert eventLoop().inEventLoop(); int interestOps = key.interestOps();
final SelectionKey key = selectionKey(); if ((interestOps & readInterestOp) != 0) {
final ChannelConfig config = config(); // only remove readInterestOp if needed
if (!config.isAutoRead()) { key.interestOps(interestOps & ~readInterestOp);
int interestOps = key.interestOps(); }
if ((interestOps & readInterestOp) != 0) { }
// only remove readInterestOp if needed
key.interestOps(interestOps & ~readInterestOp); private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey();
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
} }
} }
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
pipeline.fireChannelReadComplete();
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
@Override
public void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle; RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) { if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
} }
if (!config.isAutoRead()) {
removeReadOp();
}
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
boolean closed = false;
Throwable exception = null;
ByteBuf byteBuf = null; ByteBuf byteBuf = null;
int messages = 0; int messages = 0;
boolean close = false;
try { try {
for (;;) { do {
byteBuf = allocHandle.allocate(allocator); byteBuf = allocHandle.allocate(allocator);
int localReadAmount = doReadBytes(byteBuf); int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount == 0) { if (localReadAmount <= 0) {
byteBuf.release(); close = localReadAmount < 0;
byteBuf = null;
break; break;
} }
if (localReadAmount < 0) {
closed = true;
byteBuf.release();
byteBuf = null;
break;
}
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
allocHandle.record(localReadAmount);
byteBuf = null; byteBuf = null;
if (++ messages == maxMessagesPerRead) { allocHandle.record(localReadAmount);
break; } while (++ messages < maxMessagesPerRead);
}
}
} catch (Throwable t) {
exception = t;
} finally {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
if (exception != null) { if (close) {
if (exception instanceof IOException) { closeOnRead(pipeline);
closed = true; close = false;
}
pipeline().fireExceptionCaught(exception);
}
if (closed) {
setInputShutdown();
if (isOpen()) {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
key.interestOps(key.interestOps() & ~readInterestOp);
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
}
} }
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} }
} }
} }