diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 906974c4b3..510d8b2bf7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -240,46 +240,66 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { - // Do non-gathering write for a single buffer case. - final int msgCount = in.size(); - if (msgCount <= 1) { - super.doWrite(in); - return; + int size = in.size(); + if (size == 0) { + // All written so clear OP_WRITE + clearOpWrite(); + break; } + long writtenBytes = 0; + boolean done = false; + boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); - - if (nioBufferCnt <= 1) { - // We have something else beside ByteBuffers to write so fallback to normal writes. - super.doWrite(in); - break; - } - long expectedWrittenBytes = in.nioBufferSize(); + SocketChannel ch = javaChannel(); - final SocketChannel ch = javaChannel(); - long writtenBytes = 0; - boolean done = false; - boolean setOpWrite = false; - for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); - if (localWrittenBytes == 0) { - setOpWrite = true; + // Always us nioBuffers() to workaround data-corruption. + // See https://github.com/netty/netty/issues/2761 + switch (nioBufferCnt) { + case 0: + // We have something else beside ByteBuffers to write so fallback to normal writes. + super.doWrite(in); + return; + case 1: + // Only one ByteBuf so use non-gathering write + ByteBuffer nioBuffer = nioBuffers[0]; + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + final int localWrittenBytes = ch.write(nioBuffer); + if (localWrittenBytes == 0) { + setOpWrite = true; + break; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + if (expectedWrittenBytes == 0) { + done = true; + break; + } + } break; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - if (expectedWrittenBytes == 0) { - done = true; + default: + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); + if (localWrittenBytes == 0) { + setOpWrite = true; + break; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + if (expectedWrittenBytes == 0) { + done = true; + break; + } + } break; - } } if (done) { // Release all buffers - for (int i = msgCount; i > 0; i --) { + for (int i = size; i > 0; i --) { final ByteBuf buf = (ByteBuf) in.current(); in.progress(buf.readableBytes()); in.remove();