Move up write spinning from SelectorEventLoop to AbstractChannel

This commit is contained in:
Trustin Lee 2012-05-13 05:09:05 +09:00
parent 6d14fac99c
commit 3642879d98
6 changed files with 95 additions and 55 deletions

View File

@ -636,7 +636,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
public void flush(final ChannelFuture future) {
if (eventLoop().inEventLoop()) {
// Append flush future to the notification list.
if (future != voidFuture && !future.isDone()) {
if (future != voidFuture) {
FlushFutureEntry newEntry = new FlushFutureEntry(future, flushedAmount + out().size(), null);
if (flushFuture == null) {
flushFuture = lastFlushFuture = newEntry;
@ -646,21 +646,13 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
// Perform outbound I/O.
try {
flushedAmount += doFlush();
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
if (t instanceof IOException) {
close(voidFuture());
}
} finally {
// Notify flush futures if necessary.
notifyFlushFutures();
if (!isActive()) {
close(voidFuture());
}
// Attempt/perform outbound I/O if:
// - the channel is inactive - flush0() will fail the futures.
// - the event loop has no plan to call flushForcibly().
if (!isActive() || !inEventLoopDrivenFlush()) {
// Note that we don't call flushForcibly() because otherwise its stack trace
// will be confusing.
flush0();
}
} else {
eventLoop().execute(new Runnable() {
@ -672,6 +664,40 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
@Override
public void flushForcibly() {
flush0();
}
private void flush0() {
// Perform outbound I/O.
try {
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doFlush(i == 0);
if (localFlushedAmount > 0) {
flushedAmount += localFlushedAmount;
notifyFlushFutures();
break;
}
if (out().isEmpty()) {
// Reset reader/writerIndex to 0 if the buffer is empty.
if (out().hasByteBuffer()) {
out().byteBuffer().clear();
}
break;
}
}
} catch (Throwable t) {
notifyFlushFutures(t);
pipeline().fireExceptionCaught(t);
close(voidFuture());
} finally {
if (!isActive()) {
close(voidFuture());
}
}
}
private void notifyFlushFutures() {
FlushFutureEntry e = flushFuture;
if (e == null) {
@ -788,5 +814,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
protected abstract void doDeregister() throws Exception;
protected abstract int doRead() throws Exception;
protected abstract int doFlush() throws Exception;
protected abstract int doFlush(boolean lastSpin) throws Exception;
protected abstract boolean inEventLoopDrivenFlush();
}

View File

@ -189,5 +189,6 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelFu
void read();
void flush(ChannelFuture future);
void flushForcibly();
}
}

View File

@ -140,4 +140,21 @@ public final class ChannelBufferHolder<E> {
throw new Error();
}
}
public boolean isEmpty() {
switch (bypassDirection) {
case 0:
if (hasMessageBuffer()) {
return messageBuffer().isEmpty();
} else {
return byteBuffer().readable();
}
case 1:
return ctx.nextIn().isEmpty();
case 2:
return ctx.out().isEmpty();
default:
throw new Error();
}
}
}

View File

@ -92,4 +92,9 @@ public abstract class AbstractNioChannel extends AbstractChannel {
SelectorEventLoop loop = eventLoop();
selectionKey = javaChannel().register(loop.selector, isActive()? SelectionKey.OP_READ : 0, this);
}
@Override
protected boolean inEventLoopDrivenFlush() {
return (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
}

View File

@ -162,46 +162,36 @@ public class NioSocketChannel extends AbstractNioChannel implements io.netty.cha
}
@Override
protected int doFlush() throws Exception {
protected int doFlush(boolean lastSpin) throws Exception {
final ChannelBuffer buf = unsafe().out().byteBuffer();
final int expectedWrittenBytes = buf.readableBytes();
if (expectedWrittenBytes == 0) {
return 0;
}
final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
return 0;
}
boolean addOpWrite = false;
boolean removeOpWrite = false;
final SocketChannel ch = javaChannel();
final int writeSpinCount = config().getWriteSpinCount();
final ChannelBuffer buf = unsafe().out().byteBuffer();
int bytesLeft = buf.readableBytes();
if (bytesLeft == 0) {
return 0;
}
int localWrittenBytes = 0;
int writtenBytes = 0;
// FIXME: Spinning should be done by AbstractChannel.
for (int i = writeSpinCount; i > 0; i --) {
localWrittenBytes = buf.readBytes(ch, bytesLeft);
if (localWrittenBytes > 0) {
writtenBytes += localWrittenBytes;
bytesLeft -= localWrittenBytes;
if (bytesLeft <= 0) {
removeOpWrite = true;
break;
}
} else {
addOpWrite = true;
break;
if (writtenBytes >= expectedWrittenBytes) {
// Wrote the outbound buffer completely - clear OP_WRITE.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
} else {
// Wrote something or nothing.
// a) If wrote something, the caller will not retry.
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
// b) If wrote nothing:
// 1) If 'lastSpin' is false, the caller will call this method again real soon.
// - Do not update OP_WRITE.
// a) If 'lastSpin' is true, the caller will not retry.
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
if (writtenBytes > 0 || lastSpin) {
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
}
if (addOpWrite) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
} else if (removeOpWrite) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
return writtenBytes;

View File

@ -200,7 +200,7 @@ public class SelectorEventLoop extends SingleThreadEventLoop {
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
unsafe.flush(unsafe.voidFuture());
unsafe.flushForcibly();
}
if ((readyOps & SelectionKey.OP_ACCEPT) != 0) {
unsafe.read();