Optimize AbstractChannel and related classes

- AbstractChannel.doRead() is split into two versions so that the
  implementation doesn't have to validate the buffer type.
- Optimized ChannelBufferHolder a little bit
- Reduced GC related with flush future notification
  - Added FlushCheckpoint and DefaultChannelFuture implements it
    opportunistically
-
This commit is contained in:
Trustin Lee 2012-05-25 06:16:25 -07:00
parent 02cb7adf03
commit 59f11ed64f
8 changed files with 140 additions and 74 deletions

View File

@ -24,6 +24,8 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -92,8 +94,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
private ConnectException connectTimeoutException;
private long flushedAmount;
private FlushFutureEntry flushFuture;
private FlushFutureEntry lastFlushFuture;
private final Deque<FlushCheckpoint> flushCheckpoints = new ArrayDeque<FlushCheckpoint>();
private ClosedChannelException closedChannelException;
/** Cache for the string representation of this channel */
@ -643,18 +644,32 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
boolean closed = false;
boolean read = false;
try {
for (;;) {
int localReadAmount = doRead(buf);
if (localReadAmount > 0) {
expandReadBuffer(buf);
read = true;
} else if (localReadAmount == 0) {
if (!expandReadBuffer(buf)) {
if (buf.hasMessageBuffer()) {
Queue<Object> msgBuf = buf.messageBuffer();
for (;;) {
int localReadAmount = doRead(msgBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount == 0) {
break;
} else if (localReadAmount < 0) {
closed = true;
break;
}
}
} else {
ChannelBuffer byteBuf = buf.byteBuffer();
for (;;) {
int localReadAmount = doRead(byteBuf);
if (localReadAmount > 0) {
read = true;
} else if (localReadAmount < 0) {
closed = true;
break;
}
if (!expandReadBuffer(byteBuf)) {
break;
}
} else if (localReadAmount < 0) {
closed = true;
break;
}
}
} catch (Throwable t) {
@ -681,12 +696,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (eventLoop().inEventLoop()) {
// Append flush future to the notification list.
if (future != voidFuture) {
FlushFutureEntry newEntry = new FlushFutureEntry(future, flushedAmount + out().size(), null);
if (flushFuture == null) {
flushFuture = lastFlushFuture = newEntry;
long checkpoint = flushedAmount + out().size();
if (future instanceof FlushCheckpoint) {
FlushCheckpoint cp = (FlushCheckpoint) future;
cp.flushCheckpoint(checkpoint);
flushCheckpoints.add(cp);
} else {
lastFlushFuture.next = newEntry;
lastFlushFuture = newEntry;
flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future));
}
}
@ -770,49 +786,44 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
private void notifyFlushFutures() {
FlushFutureEntry e = flushFuture;
if (e == null) {
if (flushCheckpoints.isEmpty()) {
return;
}
final long flushedAmount = AbstractChannel.this.flushedAmount;
do {
if (e.expectedFlushedAmount > flushedAmount) {
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
e.future.setSuccess();
e = e.next;
} while (e != null);
flushFuture = e;
if (cp.flushCheckpoint() > flushedAmount) {
break;
}
cp.future().setSuccess();
}
// Avoid overflow
if (e == null) {
if (flushCheckpoints.isEmpty()) {
// Reset the counter if there's nothing in the notification list.
AbstractChannel.this.flushedAmount = 0;
} else if (flushedAmount >= 0x1000000000000000L) {
// Otherwise, reset the counter only when the counter grew pretty large
// so that we can reduce the cost of updating all entries in the notification list.
AbstractChannel.this.flushedAmount = 0;
do {
e.expectedFlushedAmount -= flushedAmount;
e = e.next;
} while (e != null);
for (FlushCheckpoint cp: flushCheckpoints) {
cp.flushCheckpoint(cp.flushCheckpoint() - flushedAmount);
}
}
}
private void notifyFlushFutures(Throwable cause) {
FlushFutureEntry e = flushFuture;
if (e == null) {
return;
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
break;
}
cp.future().setFailure(cause);
}
do {
e.future.setFailure(cause);
e = e.next;
} while (e != null);
flushFuture = null;
}
private boolean ensureOpen(ChannelFuture future) {
@ -834,15 +845,34 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
private static class FlushFutureEntry {
private final ChannelFuture future;
private long expectedFlushedAmount;
private FlushFutureEntry next;
static abstract class FlushCheckpoint {
abstract long flushCheckpoint();
abstract void flushCheckpoint(long checkpoint);
abstract ChannelFuture future();
}
FlushFutureEntry(ChannelFuture future, long expectedWrittenAmount, FlushFutureEntry next) {
private static class DefaultFlushCheckpoint extends FlushCheckpoint {
private long checkpoint;
private final ChannelFuture future;
DefaultFlushCheckpoint(long checkpoint, ChannelFuture future) {
this.checkpoint = checkpoint;
this.future = future;
expectedFlushedAmount = expectedWrittenAmount;
this.next = next;
}
@Override
long flushCheckpoint() {
return checkpoint;
}
@Override
void flushCheckpoint(long checkpoint) {
this.checkpoint = checkpoint;
}
@Override
ChannelFuture future() {
return future;
}
}
@ -885,16 +915,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract void doClose() throws Exception;
protected abstract void doDeregister() throws Exception;
protected abstract int doRead(ChannelBufferHolder<Object> buf) throws Exception;
protected abstract int doRead(Queue<Object> buf) throws Exception;
protected abstract int doRead(ChannelBuffer buf) throws Exception;
protected abstract int doFlush(boolean lastSpin) throws Exception;
protected abstract boolean inEventLoopDrivenFlush();
private static boolean expandReadBuffer(ChannelBufferHolder<Object> buf) {
if (!buf.hasByteBuffer()) {
return false;
}
ChannelBuffer byteBuf = buf.byteBuffer();
private static boolean expandReadBuffer(ChannelBuffer byteBuf) {
if (!byteBuf.writable()) {
// FIXME: Use a sensible value.
byteBuf.ensureWritableBytes(4096);

View File

@ -15,6 +15,8 @@
*/
package io.netty.channel;
import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress;
import java.util.AbstractQueue;
import java.util.Collections;
@ -76,6 +78,11 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
throw new UnsupportedOperationException();
}
@Override
protected int doRead(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
throw new UnsupportedOperationException();

View File

@ -84,7 +84,7 @@ public final class ChannelBufferHolder<E> {
public Queue<E> messageBuffer() {
switch (bypassDirection) {
case 0:
if (!hasMessageBuffer()) {
if (msgBuf == null) {
throw new IllegalStateException("does not have a message buffer");
}
return msgBuf;
@ -100,7 +100,7 @@ public final class ChannelBufferHolder<E> {
public ChannelBuffer byteBuffer() {
switch (bypassDirection) {
case 0:
if (!hasByteBuffer()) {
if (byteBuf == null) {
throw new IllegalStateException("does not have a byte buffer");
}
return byteBuf;
@ -117,10 +117,10 @@ public final class ChannelBufferHolder<E> {
public String toString() {
switch (bypassDirection) {
case 0:
if (hasMessageBuffer()) {
return messageBuffer().toString();
if (msgBuf != null) {
return msgBuf.toString();
} else {
return byteBuffer().toString();
return byteBuf.toString();
}
case 1:
return ctx.nextIn().toString();
@ -134,10 +134,10 @@ public final class ChannelBufferHolder<E> {
public int size() {
switch (bypassDirection) {
case 0:
if (hasMessageBuffer()) {
return messageBuffer().size();
if (msgBuf != null) {
return msgBuf.size();
} else {
return byteBuffer().readableBytes();
return byteBuf.readableBytes();
}
case 1:
return ctx.nextIn().size();
@ -151,10 +151,10 @@ public final class ChannelBufferHolder<E> {
public boolean isEmpty() {
switch (bypassDirection) {
case 0:
if (hasMessageBuffer()) {
return messageBuffer().isEmpty();
if (msgBuf != null) {
return msgBuf.isEmpty();
} else {
return byteBuffer().readable();
return byteBuf.readable();
}
case 1:
return ctx.nextIn().isEmpty();

View File

@ -16,6 +16,7 @@
package io.netty.channel;
import static java.util.concurrent.TimeUnit.*;
import io.netty.channel.AbstractChannel.FlushCheckpoint;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.DeadLockProofWorker;
@ -32,7 +33,7 @@ import java.util.concurrent.TimeUnit;
* to create a new {@link ChannelFuture} rather than calling the constructor
* explicitly.
*/
public class DefaultChannelFuture implements ChannelFuture {
public class DefaultChannelFuture extends FlushCheckpoint implements ChannelFuture {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(DefaultChannelFuture.class);
@ -76,6 +77,11 @@ public class DefaultChannelFuture implements ChannelFuture {
private Throwable cause;
private int waiters;
/**
* Opportunistically extending FlushCheckpoint to reduce GC.
* Only used for flush() operation. See AbstractChannel.DefaultUnsafe.flush() */
private long flushCheckpoint;
/**
* Creates a new instance.
*
@ -508,4 +514,19 @@ public class DefaultChannelFuture implements ChannelFuture {
}
}
}
@Override
long flushCheckpoint() {
return flushCheckpoint;
}
@Override
void flushCheckpoint(long checkpoint) {
flushCheckpoint = checkpoint;
}
@Override
ChannelFuture future() {
return this;
}
}

View File

@ -42,6 +42,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
private final Thread thread;
private final Object stateLock = new Object();
private final Semaphore threadLock = new Semaphore(0);
// TODO: Use PriorityQueue to reduce the locking overhead of DelayQueue.
private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>();
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
private volatile int state;

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.socket.nio;
import io.netty.buffer.ChannelBuffer;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
@ -172,7 +173,7 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
}
@Override
protected int doRead(ChannelBufferHolder<Object> buf) throws Exception {
protected int doRead(Queue<Object> buf) throws Exception {
DatagramChannel ch = javaChannel();
ByteBuffer data = ByteBuffer.allocate(config().getReceivePacketSize());
InetSocketAddress remoteAddress = (InetSocketAddress) ch.receive(data);
@ -181,10 +182,15 @@ public final class NioDatagramChannel extends AbstractNioChannel implements io.n
}
data.flip();
buf.messageBuffer().add(new DatagramPacket(ChannelBuffers.wrappedBuffer(data), remoteAddress));
buf.add(new DatagramPacket(ChannelBuffers.wrappedBuffer(data), remoteAddress));
return 1;
}
@Override
protected int doRead(ChannelBuffer buf) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
final Queue<Object> buf = unsafe().out().messageBuffer();

View File

@ -16,7 +16,6 @@
package io.netty.channel.socket.nio;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DefaultServerSocketChannelConfig;
@ -29,6 +28,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.util.Queue;
public class NioServerSocketChannel extends AbstractServerChannel
implements io.netty.channel.socket.ServerSocketChannel {
@ -128,12 +128,12 @@ public class NioServerSocketChannel extends AbstractServerChannel
}
@Override
protected int doRead(ChannelBufferHolder<Object> buf) throws Exception {
protected int doRead(Queue<Object> buf) throws Exception {
java.nio.channels.SocketChannel ch = javaChannel().accept();
if (ch == null) {
return 0;
}
buf.messageBuffer().add(new NioSocketChannel(this, null, ch));
buf.add(new NioSocketChannel(this, null, ch));
return 1;
}
}

View File

@ -29,6 +29,7 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
public class NioSocketChannel extends AbstractNioChannel implements io.netty.channel.socket.SocketChannel {
@ -159,11 +160,15 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
}
@Override
protected int doRead(ChannelBufferHolder<Object> buf) throws Exception {
ChannelBuffer byteBuf = buf.byteBuffer();
protected int doRead(ChannelBuffer byteBuf) throws Exception {
return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}
@Override
protected int doRead(Queue<Object> buf) throws Exception {
throw new UnsupportedOperationException();
}
@Override
protected int doFlush(boolean lastSpin) throws Exception {
final ChannelBuffer buf = unsafe().out().byteBuffer();