From 3642879d9804126a4230175991df734a386e0788 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 13 May 2012 05:09:05 +0900 Subject: [PATCH] Move up write spinning from SelectorEventLoop to AbstractChannel --- .../io/netty/channel/AbstractChannel.java | 61 +++++++++++++----- .../main/java/io/netty/channel/Channel.java | 1 + .../io/netty/channel/ChannelBufferHolder.java | 17 +++++ .../socket/nio/AbstractNioChannel.java | 5 ++ .../channel/socket/nio/NioSocketChannel.java | 64 ++++++++----------- .../channel/socket/nio/SelectorEventLoop.java | 2 +- 6 files changed, 95 insertions(+), 55 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index a97a06648f..970639a7a8 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -636,7 +636,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha public void flush(final ChannelFuture future) { if (eventLoop().inEventLoop()) { // Append flush future to the notification list. - if (future != voidFuture && !future.isDone()) { + if (future != voidFuture) { FlushFutureEntry newEntry = new FlushFutureEntry(future, flushedAmount + out().size(), null); if (flushFuture == null) { flushFuture = lastFlushFuture = newEntry; @@ -646,21 +646,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } - // Perform outbound I/O. - try { - flushedAmount += doFlush(); - } catch (Throwable t) { - notifyFlushFutures(t); - pipeline().fireExceptionCaught(t); - if (t instanceof IOException) { - close(voidFuture()); - } - } finally { - // Notify flush futures if necessary. - notifyFlushFutures(); - if (!isActive()) { - close(voidFuture()); - } + // Attempt/perform outbound I/O if: + // - the channel is inactive - flush0() will fail the futures. + // - the event loop has no plan to call flushForcibly(). + if (!isActive() || !inEventLoopDrivenFlush()) { + // Note that we don't call flushForcibly() because otherwise its stack trace + // will be confusing. + flush0(); } } else { eventLoop().execute(new Runnable() { @@ -672,6 +664,40 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } + @Override + public void flushForcibly() { + flush0(); + } + + private void flush0() { + // Perform outbound I/O. + try { + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + int localFlushedAmount = doFlush(i == 0); + if (localFlushedAmount > 0) { + flushedAmount += localFlushedAmount; + notifyFlushFutures(); + break; + } + if (out().isEmpty()) { + // Reset reader/writerIndex to 0 if the buffer is empty. + if (out().hasByteBuffer()) { + out().byteBuffer().clear(); + } + break; + } + } + } catch (Throwable t) { + notifyFlushFutures(t); + pipeline().fireExceptionCaught(t); + close(voidFuture()); + } finally { + if (!isActive()) { + close(voidFuture()); + } + } + } + private void notifyFlushFutures() { FlushFutureEntry e = flushFuture; if (e == null) { @@ -788,5 +814,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha protected abstract void doDeregister() throws Exception; protected abstract int doRead() throws Exception; - protected abstract int doFlush() throws Exception; + protected abstract int doFlush(boolean lastSpin) throws Exception; + protected abstract boolean inEventLoopDrivenFlush(); } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 72bd726033..56ebb4e075 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -189,5 +189,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu void read(); void flush(ChannelFuture future); + void flushForcibly(); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java index 33145f2a3f..d7ec361a08 100644 --- a/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java +++ b/transport/src/main/java/io/netty/channel/ChannelBufferHolder.java @@ -140,4 +140,21 @@ public final class ChannelBufferHolder { throw new Error(); } } + + public boolean isEmpty() { + switch (bypassDirection) { + case 0: + if (hasMessageBuffer()) { + return messageBuffer().isEmpty(); + } else { + return byteBuffer().readable(); + } + case 1: + return ctx.nextIn().isEmpty(); + case 2: + return ctx.out().isEmpty(); + default: + throw new Error(); + } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java index 641f8c1760..c47ff7403f 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannel.java @@ -92,4 +92,9 @@ public abstract class AbstractNioChannel extends AbstractChannel { SelectorEventLoop loop = eventLoop(); selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this); } + + @Override + protected boolean inEventLoopDrivenFlush() { + return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index a48b2d163b..d1da449009 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -162,46 +162,36 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha } @Override - protected int doFlush() throws Exception { + protected int doFlush(boolean lastSpin) throws Exception { + final ChannelBuffer buf = unsafe().out().byteBuffer(); + final int expectedWrittenBytes = buf.readableBytes(); + if (expectedWrittenBytes == 0) { + return 0; + } + + final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes); + final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - return 0; - } - - boolean addOpWrite = false; - boolean removeOpWrite = false; - final SocketChannel ch = javaChannel(); - final int writeSpinCount = config().getWriteSpinCount(); - final ChannelBuffer buf = unsafe().out().byteBuffer(); - int bytesLeft = buf.readableBytes(); - if (bytesLeft == 0) { - return 0; - } - - int localWrittenBytes = 0; - int writtenBytes = 0; - - // FIXME: Spinning should be done by AbstractChannel. - for (int i = writeSpinCount; i > 0; i --) { - localWrittenBytes = buf.readBytes(ch, bytesLeft); - if (localWrittenBytes > 0) { - writtenBytes += localWrittenBytes; - bytesLeft -= localWrittenBytes; - if (bytesLeft <= 0) { - removeOpWrite = true; - break; - } - } else { - addOpWrite = true; - break; + if (writtenBytes >= expectedWrittenBytes) { + // Wrote the outbound buffer completely - clear OP_WRITE. + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + key.interestOps(interestOps & ~SelectionKey.OP_WRITE); + } + } else { + // Wrote something or nothing. + // a) If wrote something, the caller will not retry. + // - Set OP_WRITE so that the event loop calls flushForcibly() later. + // b) If wrote nothing: + // 1) If 'lastSpin' is false, the caller will call this method again real soon. + // - Do not update OP_WRITE. + // a) If 'lastSpin' is true, the caller will not retry. + // - Set OP_WRITE so that the event loop calls flushForcibly() later. + if (writtenBytes > 0 || lastSpin) { + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + key.interestOps(interestOps | SelectionKey.OP_WRITE); + } } - } - - if (addOpWrite) { - key.interestOps(interestOps | SelectionKey.OP_WRITE); - } else if (removeOpWrite) { - key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } return writtenBytes; diff --git a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java index e19608fdef..880e45ff4e 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/SelectorEventLoop.java @@ -200,7 +200,7 @@ public class SelectorEventLoop extends SingleThreadEventLoop { } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { - unsafe.flush(unsafe.voidFuture()); + unsafe.flushForcibly(); } if ((readyOps & SelectionKey.OP_ACCEPT) != 0) { unsafe.read();