From c46b197c61649f556193a7c2f37ffec8149412b0 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 13 Aug 2014 21:21:15 +0200 Subject: [PATCH] [#2761] Proper work-around for data-corruption caused by cached ByteBuffers Motivation: The previous fix did disable the caching of ByteBuffers completely which can cause performance regressions. This fix makes sure we use nioBuffers() for all writes in NioSocketChannel and so prevent data-corruptions. This is still kind of a workaround which will be replaced by a more fundamental fix later. Modifications: - Revert 4059c9f3549753119576a287492dd70ae4742988 - Use nioBuffers() for all writes to prevent data-corruption Result: No more data-corruption but still retain the original speed. --- .../channel/socket/nio/NioSocketChannel.java | 74 ++++++++++++------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 906974c4b3..510d8b2bf7 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -240,46 +240,66 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { - // Do non-gathering write for a single buffer case. - final int msgCount = in.size(); - if (msgCount <= 1) { - super.doWrite(in); - return; + int size = in.size(); + if (size == 0) { + // All written so clear OP_WRITE + clearOpWrite(); + break; } + long writtenBytes = 0; + boolean done = false; + boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); - - if (nioBufferCnt <= 1) { - // We have something else beside ByteBuffers to write so fallback to normal writes. - super.doWrite(in); - break; - } - long expectedWrittenBytes = in.nioBufferSize(); + SocketChannel ch = javaChannel(); - final SocketChannel ch = javaChannel(); - long writtenBytes = 0; - boolean done = false; - boolean setOpWrite = false; - for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { - final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); - if (localWrittenBytes == 0) { - setOpWrite = true; + // Always us nioBuffers() to workaround data-corruption. + // See https://github.com/netty/netty/issues/2761 + switch (nioBufferCnt) { + case 0: + // We have something else beside ByteBuffers to write so fallback to normal writes. + super.doWrite(in); + return; + case 1: + // Only one ByteBuf so use non-gathering write + ByteBuffer nioBuffer = nioBuffers[0]; + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + final int localWrittenBytes = ch.write(nioBuffer); + if (localWrittenBytes == 0) { + setOpWrite = true; + break; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + if (expectedWrittenBytes == 0) { + done = true; + break; + } + } break; - } - expectedWrittenBytes -= localWrittenBytes; - writtenBytes += localWrittenBytes; - if (expectedWrittenBytes == 0) { - done = true; + default: + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); + if (localWrittenBytes == 0) { + setOpWrite = true; + break; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + if (expectedWrittenBytes == 0) { + done = true; + break; + } + } break; - } } if (done) { // Release all buffers - for (int i = msgCount; i > 0; i --) { + for (int i = size; i > 0; i --) { final ByteBuf buf = (ByteBuf) in.current(); in.progress(buf.readableBytes()); in.remove();