Simplify the logic for updating OP_WRITE in the NIO transport

- Removed code duplication
This commit is contained in:
Trustin Lee 2013-06-10 18:19:58 +09:00
parent 3be25694d0
commit fa205defa1
4 changed files with 49 additions and 82 deletions

View File

@ -31,7 +31,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import static java.nio.channels.SelectionKey.*; import static java.nio.channels.SelectionKey.*;
@ -143,25 +142,10 @@ public class NioUdtByteConnectorChannel extends AbstractNioByteChannel
} }
@Override @Override
protected int doWriteBytes(final ByteBuf byteBuf, final boolean lastSpin) protected int doWriteBytes(final ByteBuf byteBuf, final boolean lastSpin) throws Exception {
throws Exception { final int expectedWrittenBytes = byteBuf.readableBytes();
final int pendingBytes = byteBuf.readableBytes(); final int writtenBytes = byteBuf.readBytes(javaChannel(), expectedWrittenBytes);
final int writtenBytes = byteBuf.readBytes(javaChannel(), pendingBytes); updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin);
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if (writtenBytes >= pendingBytes) {
// wrote the buffer completely - clear OP_WRITE.
if ((interestOps & OP_WRITE) != 0) {
key.interestOps(interestOps & ~OP_WRITE);
}
} else {
// wrote partial or nothing - ensure OP_WRITE
if (writtenBytes > 0 || lastSpin) {
if ((interestOps & OP_WRITE) == 0) {
key.interestOps(interestOps | OP_WRITE);
}
}
}
return writtenBytes; return writtenBytes;
} }

View File

@ -24,6 +24,7 @@ import io.netty.channel.FileRegion;
import io.netty.channel.MessageList; import io.netty.channel.MessageList;
import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.internal.StringUtil;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
@ -124,36 +125,43 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf, i == 0); int localFlushedAmount = doWriteBytes(buf, i == 0);
if (localFlushedAmount > 0 || !buf.isReadable()) { if (localFlushedAmount == 0) {
break;
}
if (!buf.isReadable()) {
done = true;
break; break;
} }
} }
// We may could optimize this to write multiple buffers at once (scattering) // We may could optimize this to write multiple buffers at once (scattering)
if (!buf.isReadable()) { if (done) {
buf.release(); buf.release();
return 1; return 1;
} }
} else if (msg instanceof FileRegion) { } else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg; FileRegion region = (FileRegion) msg;
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
long localFlushedAmount = doWriteFileRegion(region, i == 0); long localFlushedAmount = doWriteFileRegion(region, i == 0);
if (localFlushedAmount == -1) { if (localFlushedAmount == 0) {
checkEOF(region); break;
return 1;
} }
if (localFlushedAmount > 0) { if (region.transfered() >= region.count()) {
done = true;
break; break;
} }
} }
if (region.transfered() >= region.count()) {
if (done) {
region.release(); region.release();
return 1; return 1;
} }
} else { } else {
throw new UnsupportedOperationException("Not support writing of message " + msg); throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
} }
return 0; return 0;
@ -183,4 +191,26 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
*/ */
protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception; protected abstract int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception;
protected void updateOpWrite(long expectedWrittenBytes, long writtenBytes, boolean lastSpin) {
if (writtenBytes >= expectedWrittenBytes) {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
// Wrote the outbound buffer completely - clear OP_WRITE.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
} else {
// 1) Wrote nothing: buffer is full obviously - set OP_WRITE
// 2) Wrote partial data:
// a) lastSpin is false: no need to set OP_WRITE because the caller will try again immediately.
// b) lastSpin is true: set OP_WRITE because the caller will not try again.
if (writtenBytes == 0 || lastSpin) {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
}
}
} }

View File

@ -35,7 +35,6 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
/** /**
* {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
@ -231,62 +230,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception { protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception {
final int expectedWrittenBytes = buf.readableBytes(); final int expectedWrittenBytes = buf.readableBytes();
final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes); final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);
updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin);
if (writtenBytes >= expectedWrittenBytes) {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
// Wrote the outbound buffer completely - clear OP_WRITE.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
} else {
// Wrote something or nothing.
// a) If wrote something, the caller will not retry.
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
// b) If wrote nothing:
// 1) If 'lastSpin' is false, the caller will call this method again real soon.
// - Do not update OP_WRITE.
// 2) If 'lastSpin' is true, the caller will not retry.
// - Set OP_WRITE so that the event loop calls flushForcibly() later.
if (writtenBytes > 0 || lastSpin) {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
}
return writtenBytes; return writtenBytes;
} }
@Override @Override
protected long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception { protected long doWriteFileRegion(FileRegion region, boolean lastSpin) throws Exception {
if (javaChannel() instanceof WritableByteChannel) { final long position = region.transfered();
WritableByteChannel wch = javaChannel(); final long expectedWrittenBytes = region.count() - position;
long localWrittenBytes = region.transferTo(wch, region.transfered()); final long writtenBytes = region.transferTo(javaChannel(), position);
if (localWrittenBytes > 0 || lastSpin) { updateOpWrite(expectedWrittenBytes, writtenBytes, lastSpin);
// check if the region was written complete. If not set OP_WRITE so the eventloop return writtenBytes;
// will write the rest once writable again
if (region.transfered() < region.count()) {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
} else {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
// Wrote the region completely - clear OP_WRITE.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
}
return localWrittenBytes;
} else {
throw new UnsupportedOperationException("Underlying Channel is not of instance "
+ WritableByteChannel.class);
}
} }
} }