From 59f11ed64f641c22506bf4c328640d450dac728d Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 25 May 2012 06:16:25 -0700 Subject: [PATCH] Optimize AbstractChannel and related classes - AbstractChannel.doRead() is split into two versions so that the implementation doesn't have to validate the buffer type. - Optimized ChannelBufferHolder a little bit - Reduced GC related with flush future notification - Added FlushCheckpoint and DefaultChannelFuture implements it opportunistically - --- .../io/netty/channel/AbstractChannel.java | 136 +++++++++++------- .../netty/channel/AbstractServerChannel.java | 7 + .../io/netty/channel/ChannelBufferHolder.java | 22 +-- .../netty/channel/DefaultChannelFuture.java | 23 ++- .../netty/channel/SingleThreadEventLoop.java | 1 + .../socket/nio/NioDatagramChannel.java | 10 +- .../socket/nio/NioServerSocketChannel.java | 6 +- .../channel/socket/nio/NioSocketChannel.java | 9 +- 8 files changed, 140 insertions(+), 74 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 7af5c1fae0..28cc5d84ab 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.net.ConnectException; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -92,8 +94,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private ConnectException connectTimeoutException; private long flushedAmount; - private FlushFutureEntry flushFuture; - private FlushFutureEntry lastFlushFuture; + private final Deque flushCheckpoints = new ArrayDeque(); private ClosedChannelException closedChannelException; /** Cache for the string representation of this channel */ @@ -643,18 +644,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha boolean closed = false; boolean read = false; try { - for (;;) { - int localReadAmount = doRead(buf); - if (localReadAmount > 0) { - expandReadBuffer(buf); - read = true; - } else if (localReadAmount == 0) { - if (!expandReadBuffer(buf)) { + if (buf.hasMessageBuffer()) { + Queue msgBuf = buf.messageBuffer(); + for (;;) { + int localReadAmount = doRead(msgBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount == 0) { + break; + } else if (localReadAmount < 0) { + closed = true; + break; + } + } + } else { + ChannelBuffer byteBuf = buf.byteBuffer(); + for (;;) { + int localReadAmount = doRead(byteBuf); + if (localReadAmount > 0) { + read = true; + } else if (localReadAmount < 0) { + closed = true; + break; + } + if (!expandReadBuffer(byteBuf)) { break; } - } else if (localReadAmount < 0) { - closed = true; - break; } } } catch (Throwable t) { @@ -681,12 +696,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (eventLoop().inEventLoop()) { // Append flush future to the notification list. if (future != voidFuture) { - FlushFutureEntry newEntry = new FlushFutureEntry(future, flushedAmount + out().size(), null); - if (flushFuture == null) { - flushFuture = lastFlushFuture = newEntry; + long checkpoint = flushedAmount + out().size(); + if (future instanceof FlushCheckpoint) { + FlushCheckpoint cp = (FlushCheckpoint) future; + cp.flushCheckpoint(checkpoint); + flushCheckpoints.add(cp); } else { - lastFlushFuture.next = newEntry; - lastFlushFuture = newEntry; + flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future)); } } @@ -770,49 +786,44 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } private void notifyFlushFutures() { - FlushFutureEntry e = flushFuture; - if (e == null) { + if (flushCheckpoints.isEmpty()) { return; } final long flushedAmount = AbstractChannel.this.flushedAmount; - do { - if (e.expectedFlushedAmount > flushedAmount) { + for (;;) { + FlushCheckpoint cp = flushCheckpoints.poll(); + if (cp == null) { break; } - e.future.setSuccess(); - e = e.next; - } while (e != null); - - flushFuture = e; + if (cp.flushCheckpoint() > flushedAmount) { + break; + } + cp.future().setSuccess(); + } // Avoid overflow - if (e == null) { + if (flushCheckpoints.isEmpty()) { // Reset the counter if there's nothing in the notification list. AbstractChannel.this.flushedAmount = 0; } else if (flushedAmount >= 0x1000000000000000L) { // Otherwise, reset the counter only when the counter grew pretty large // so that we can reduce the cost of updating all entries in the notification list. AbstractChannel.this.flushedAmount = 0; - do { - e.expectedFlushedAmount -= flushedAmount; - e = e.next; - } while (e != null); + for (FlushCheckpoint cp: flushCheckpoints) { + cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount); + } } } private void notifyFlushFutures(Throwable cause) { - FlushFutureEntry e = flushFuture; - if (e == null) { - return; + for (;;) { + FlushCheckpoint cp = flushCheckpoints.poll(); + if (cp == null) { + break; + } + cp.future().setFailure(cause); } - - do { - e.future.setFailure(cause); - e = e.next; - } while (e != null); - - flushFuture = null; } private boolean ensureOpen(ChannelFuture future) { @@ -834,15 +845,34 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - private static class FlushFutureEntry { - private final ChannelFuture future; - private long expectedFlushedAmount; - private FlushFutureEntry next; + static abstract class FlushCheckpoint { + abstract long flushCheckpoint(); + abstract void flushCheckpoint(long checkpoint); + abstract ChannelFuture future(); + } - FlushFutureEntry(ChannelFuture future, long expectedWrittenAmount, FlushFutureEntry next) { + private static class DefaultFlushCheckpoint extends FlushCheckpoint { + private long checkpoint; + private final ChannelFuture future; + + DefaultFlushCheckpoint(long checkpoint, ChannelFuture future) { + this.checkpoint = checkpoint; this.future = future; - expectedFlushedAmount = expectedWrittenAmount; - this.next = next; + } + + @Override + long flushCheckpoint() { + return checkpoint; + } + + @Override + void flushCheckpoint(long checkpoint) { + this.checkpoint = checkpoint; + } + + @Override + ChannelFuture future() { + return future; } } @@ -885,16 +915,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract void doClose() throws Exception; protected abstract void doDeregister() throws Exception; - protected abstract int doRead(ChannelBufferHolder buf) throws Exception; + protected abstract int doRead(Queue buf) throws Exception; + protected abstract int doRead(ChannelBuffer buf) throws Exception; protected abstract int doFlush(boolean lastSpin) throws Exception; protected abstract boolean inEventLoopDrivenFlush(); - private static boolean expandReadBuffer(ChannelBufferHolder buf) { - if (!buf.hasByteBuffer()) { - return false; - } - - ChannelBuffer byteBuf = buf.byteBuffer(); + private static boolean expandReadBuffer(ChannelBuffer byteBuf) { if (!byteBuf.writable()) { // FIXME: Use a sensible value. byteBuf.ensureWritableBytes(4096); diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 2328856703..edac421348 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -15,6 +15,8 @@ */ package io.netty.channel; +import io.netty.buffer.ChannelBuffer; + import java.net.SocketAddress; import java.util.AbstractQueue; import java.util.Collections; @@ -76,6 +78,11 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S throw new UnsupportedOperationException(); } + @Override + protected int doRead(ChannelBuffer buf) throws Exception { + throw new UnsupportedOperationException(); + } + @Override protected int doFlush(boolean lastSpin) throws Exception { throw new UnsupportedOperationException(); diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index e69d1e19b8..9cd17f6671 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -84,7 +84,7 @@ public final class ChannelBufferHolder { public Queue messageBuffer() { switch (bypassDirection) { case 0: - if (!hasMessageBuffer()) { + if (msgBuf == null) { throw new IllegalStateException("does not have a message buffer"); } return msgBuf; @@ -100,7 +100,7 @@ public final class ChannelBufferHolder { public ChannelBuffer byteBuffer() { switch (bypassDirection) { case 0: - if (!hasByteBuffer()) { + if (byteBuf == null) { throw new IllegalStateException("does not have a byte buffer"); } return byteBuf; @@ -117,10 +117,10 @@ public final class ChannelBufferHolder { public String toString() { switch (bypassDirection) { case 0: - if (hasMessageBuffer()) { - return messageBuffer().toString(); + if (msgBuf != null) { + return msgBuf.toString(); } else { - return byteBuffer().toString(); + return byteBuf.toString(); } case 1: return ctx.nextIn().toString(); @@ -134,10 +134,10 @@ public final class ChannelBufferHolder { public int size() { switch (bypassDirection) { case 0: - if (hasMessageBuffer()) { - return messageBuffer().size(); + if (msgBuf != null) { + return msgBuf.size(); } else { - return byteBuffer().readableBytes(); + return byteBuf.readableBytes(); } case 1: return ctx.nextIn().size(); @@ -151,10 +151,10 @@ public final class ChannelBufferHolder { public boolean isEmpty() { switch (bypassDirection) { case 0: - if (hasMessageBuffer()) { - return messageBuffer().isEmpty(); + if (msgBuf != null) { + return msgBuf.isEmpty(); } else { - return byteBuffer().readable(); + return byteBuf.readable(); } case 1: return ctx.nextIn().isEmpty(); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java b/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java index a2c4e22e4e..9df1d2a780 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelFuture.java @@ -16,6 +16,7 @@ package io.netty.channel; import static java.util.concurrent.TimeUnit.*; +import io.netty.channel.AbstractChannel.FlushCheckpoint; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; @@ -32,7 +33,7 @@ import java.util.concurrent.TimeUnit; * to create a new {@link ChannelFuture} rather than calling the constructor * explicitly. */ -public class DefaultChannelFuture implements ChannelFuture { +public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFuture { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelFuture.class); @@ -76,6 +77,11 @@ public class DefaultChannelFuture implements ChannelFuture { private Throwable cause; private int waiters; + /** + * Opportunistically extending FlushCheckpoint to reduce GC. + * Only used for flush() operation. See AbstractChannel.DefaultUnsafe.flush() */ + private long flushCheckpoint; + /** * Creates a new instance. * @@ -508,4 +514,19 @@ public class DefaultChannelFuture implements ChannelFuture { } } } + + @Override + long flushCheckpoint() { + return flushCheckpoint; + } + + @Override + void flushCheckpoint(long checkpoint) { + flushCheckpoint = checkpoint; + } + + @Override + ChannelFuture future() { + return this; + } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 7a716f0a4d..d17ca6ca20 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -42,6 +42,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl private final Thread thread; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); + // TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue. private final Queue> scheduledTasks = new DelayQueue>(); /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ private volatile int state; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java index 65154f37ba..9d4f2c2415 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramChannel.java @@ -15,6 +15,7 @@ */ package io.netty.channel.socket.nio; +import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffers; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; @@ -172,7 +173,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n } @Override - protected int doRead(ChannelBufferHolder buf) throws Exception { + protected int doRead(Queue buf) throws Exception { DatagramChannel ch = javaChannel(); ByteBuffer data = ByteBuffer.allocate(config().getReceivePacketSize()); InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data); @@ -181,10 +182,15 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n } data.flip(); - buf.messageBuffer().add(new DatagramPacket(ChannelBuffers.wrappedBuffer(data), remoteAddress)); + buf.add(new DatagramPacket(ChannelBuffers.wrappedBuffer(data), remoteAddress)); return 1; } + @Override + protected int doRead(ChannelBuffer buf) throws Exception { + throw new UnsupportedOperationException(); + } + @Override protected int doFlush(boolean lastSpin) throws Exception { final Queue buf = unsafe().out().messageBuffer(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java index 45d82e717c..3ca5f10956 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketChannel.java @@ -16,7 +16,6 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractServerChannel; -import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelException; import io.netty.channel.EventLoop; import io.netty.channel.socket.DefaultServerSocketChannelConfig; @@ -29,6 +28,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; +import java.util.Queue; public class NioServerSocketChannel extends AbstractServerChannel implements io.netty.channel.socket.ServerSocketChannel { @@ -128,12 +128,12 @@ public class NioServerSocketChannel extends AbstractServerChannel } @Override - protected int doRead(ChannelBufferHolder buf) throws Exception { + protected int doRead(Queue buf) throws Exception { java.nio.channels.SocketChannel ch = javaChannel().accept(); if (ch == null) { return 0; } - buf.messageBuffer().add(new NioSocketChannel(this, null, ch)); + buf.add(new NioSocketChannel(this, null, ch)); return 1; } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 9e97fe2587..893281dc0d 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.Queue; public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel { @@ -159,11 +160,15 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha } @Override - protected int doRead(ChannelBufferHolder buf) throws Exception { - ChannelBuffer byteBuf = buf.byteBuffer(); + protected int doRead(ChannelBuffer byteBuf) throws Exception { return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes()); } + @Override + protected int doRead(Queue buf) throws Exception { + throw new UnsupportedOperationException(); + } + @Override protected int doFlush(boolean lastSpin) throws Exception { final ChannelBuffer buf = unsafe().out().byteBuffer();