[#2761] Proper work-around for data-corruption caused by cached ByteBuffers
Motivation: The previous fix did disable the caching of ByteBuffers completely which can cause performance regressions. This fix makes sure we use nioBuffers() for all writes in NioSocketChannel and so prevent data-corruptions. This is still kind of a workaround which will be replaced by a more fundamental fix later. Modifications: - Revert 4059c9f3549753119576a287492dd70ae4742988 - Use nioBuffers() for all writes to prevent data-corruption Result: No more data-corruption but still retain the original speed.
This commit is contained in:
parent
2c03030950
commit
c46b197c61
@ -240,29 +240,47 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
@Override
|
@Override
|
||||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||||
for (;;) {
|
for (;;) {
|
||||||
// Do non-gathering write for a single buffer case.
|
int size = in.size();
|
||||||
final int msgCount = in.size();
|
if (size == 0) {
|
||||||
if (msgCount <= 1) {
|
// All written so clear OP_WRITE
|
||||||
super.doWrite(in);
|
clearOpWrite();
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
|
long writtenBytes = 0;
|
||||||
|
boolean done = false;
|
||||||
|
boolean setOpWrite = false;
|
||||||
|
|
||||||
// Ensure the pending writes are made of ByteBufs only.
|
// Ensure the pending writes are made of ByteBufs only.
|
||||||
ByteBuffer[] nioBuffers = in.nioBuffers();
|
ByteBuffer[] nioBuffers = in.nioBuffers();
|
||||||
int nioBufferCnt = in.nioBufferCount();
|
int nioBufferCnt = in.nioBufferCount();
|
||||||
|
long expectedWrittenBytes = in.nioBufferSize();
|
||||||
|
SocketChannel ch = javaChannel();
|
||||||
|
|
||||||
if (nioBufferCnt <= 1) {
|
// Always us nioBuffers() to workaround data-corruption.
|
||||||
|
// See https://github.com/netty/netty/issues/2761
|
||||||
|
switch (nioBufferCnt) {
|
||||||
|
case 0:
|
||||||
// We have something else beside ByteBuffers to write so fallback to normal writes.
|
// We have something else beside ByteBuffers to write so fallback to normal writes.
|
||||||
super.doWrite(in);
|
super.doWrite(in);
|
||||||
|
return;
|
||||||
|
case 1:
|
||||||
|
// Only one ByteBuf so use non-gathering write
|
||||||
|
ByteBuffer nioBuffer = nioBuffers[0];
|
||||||
|
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||||
|
final int localWrittenBytes = ch.write(nioBuffer);
|
||||||
|
if (localWrittenBytes == 0) {
|
||||||
|
setOpWrite = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
expectedWrittenBytes -= localWrittenBytes;
|
||||||
long expectedWrittenBytes = in.nioBufferSize();
|
writtenBytes += localWrittenBytes;
|
||||||
|
if (expectedWrittenBytes == 0) {
|
||||||
final SocketChannel ch = javaChannel();
|
done = true;
|
||||||
long writtenBytes = 0;
|
break;
|
||||||
boolean done = false;
|
}
|
||||||
boolean setOpWrite = false;
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||||
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
|
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
|
||||||
if (localWrittenBytes == 0) {
|
if (localWrittenBytes == 0) {
|
||||||
@ -276,10 +294,12 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
// Release all buffers
|
// Release all buffers
|
||||||
for (int i = msgCount; i > 0; i --) {
|
for (int i = size; i > 0; i --) {
|
||||||
final ByteBuf buf = (ByteBuf) in.current();
|
final ByteBuf buf = (ByteBuf) in.current();
|
||||||
in.progress(buf.readableBytes());
|
in.progress(buf.readableBytes());
|
||||||
in.remove();
|
in.remove();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user