From db765e5dd43ac1d159a2ab342856a5dd71eab15b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 23 Oct 2013 11:10:12 +0200 Subject: [PATCH] [#1812] Allow for inline for most common cases when use NioByteUnsafe.read() --- .../channel/nio/AbstractNioByteChannel.java | 112 ++++++++---------- 1 file changed, 52 insertions(+), 60 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 712ad2ba26..a9577d82bf 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -56,88 +56,80 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { private final class NioByteUnsafe extends AbstractNioUnsafe { private RecvByteBufAllocator.Handle allocHandle; - @Override - public void read() { - assert eventLoop().inEventLoop(); - final SelectionKey key = selectionKey(); - final ChannelConfig config = config(); - if (!config.isAutoRead()) { - int interestOps = key.interestOps(); - if ((interestOps & readInterestOp) != 0) { - // only remove readInterestOp if needed - key.interestOps(interestOps & ~readInterestOp); + private void removeReadOp() { + SelectionKey key = selectionKey(); + int interestOps = key.interestOps(); + if ((interestOps & readInterestOp) != 0) { + // only remove readInterestOp if needed + key.interestOps(interestOps & ~readInterestOp); + } + } + + private void closeOnRead(ChannelPipeline pipeline) { + SelectionKey key = selectionKey(); + setInputShutdown(); + if (isOpen()) { + if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { + key.interestOps(key.interestOps() & ~readInterestOp); + pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); + } else { + close(voidPromise()); } } + } + private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close) { + if (byteBuf != null) { + if (byteBuf.isReadable()) { + pipeline.fireChannelRead(byteBuf); + } else { + byteBuf.release(); + } + } + pipeline.fireChannelReadComplete(); + if (close || cause instanceof IOException) { + closeOnRead(pipeline); + } + } + + @Override + public void read() { + final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); - + final ByteBufAllocator allocator = config.getAllocator(); + final int maxMessagesPerRead = config.getMaxMessagesPerRead(); RecvByteBufAllocator.Handle allocHandle = this.allocHandle; if (allocHandle == null) { this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle(); } + if (!config.isAutoRead()) { + removeReadOp(); + } - final ByteBufAllocator allocator = config.getAllocator(); - final int maxMessagesPerRead = config.getMaxMessagesPerRead(); - - boolean closed = false; - Throwable exception = null; ByteBuf byteBuf = null; int messages = 0; + boolean close = false; try { - for (;;) { + do { byteBuf = allocHandle.allocate(allocator); int localReadAmount = doReadBytes(byteBuf); - if (localReadAmount == 0) { - byteBuf.release(); - byteBuf = null; + if (localReadAmount <= 0) { + close = localReadAmount < 0; break; } - if (localReadAmount < 0) { - closed = true; - byteBuf.release(); - byteBuf = null; - break; - } - pipeline.fireChannelRead(byteBuf); - allocHandle.record(localReadAmount); byteBuf = null; - if (++ messages == maxMessagesPerRead) { - break; - } - } - } catch (Throwable t) { - exception = t; - } finally { - if (byteBuf != null) { - if (byteBuf.isReadable()) { - pipeline.fireChannelRead(byteBuf); - } else { - byteBuf.release(); - } - } + allocHandle.record(localReadAmount); + } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); - if (exception != null) { - if (exception instanceof IOException) { - closed = true; - } - - pipeline().fireExceptionCaught(exception); - } - - if (closed) { - setInputShutdown(); - if (isOpen()) { - if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { - key.interestOps(key.interestOps() & ~readInterestOp); - pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); - } else { - close(voidPromise()); - } - } + if (close) { + closeOnRead(pipeline); + close = false; } + } catch (Throwable t) { + handleReadException(pipeline, byteBuf, t, close); } } }