[#2761] Proper work-around for data-corruption caused by cached ByteBuffers
Motivation:
The previous fix did disable the caching of ByteBuffers completely which can cause performance regressions. This fix makes sure we use nioBuffers() for all writes in NioSocketChannel and so prevent data-corruptions. This is still kind of a workaround which will be replaced by a more fundamental fix later.
Modifications:
- Revert 4059c9f354
- Use nioBuffers() for all writes to prevent data-corruption
Result:
No more data-corruption but still retain the original speed.
This commit is contained in:
parent
f89907dba5
commit
a7d1f983a2
@ -240,46 +240,66 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
for (;;) {
|
||||
// Do non-gathering write for a single buffer case.
|
||||
final int msgCount = in.size();
|
||||
if (msgCount <= 1) {
|
||||
super.doWrite(in);
|
||||
return;
|
||||
int size = in.size();
|
||||
if (size == 0) {
|
||||
// All written so clear OP_WRITE
|
||||
clearOpWrite();
|
||||
break;
|
||||
}
|
||||
long writtenBytes = 0;
|
||||
boolean done = false;
|
||||
boolean setOpWrite = false;
|
||||
|
||||
// Ensure the pending writes are made of ByteBufs only.
|
||||
ByteBuffer[] nioBuffers = in.nioBuffers();
|
||||
int nioBufferCnt = in.nioBufferCount();
|
||||
|
||||
if (nioBufferCnt <= 1) {
|
||||
// We have something else beside ByteBuffers to write so fallback to normal writes.
|
||||
super.doWrite(in);
|
||||
break;
|
||||
}
|
||||
|
||||
long expectedWrittenBytes = in.nioBufferSize();
|
||||
SocketChannel ch = javaChannel();
|
||||
|
||||
final SocketChannel ch = javaChannel();
|
||||
long writtenBytes = 0;
|
||||
boolean done = false;
|
||||
boolean setOpWrite = false;
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
setOpWrite = true;
|
||||
// Always us nioBuffers() to workaround data-corruption.
|
||||
// See https://github.com/netty/netty/issues/2761
|
||||
switch (nioBufferCnt) {
|
||||
case 0:
|
||||
// We have something else beside ByteBuffers to write so fallback to normal writes.
|
||||
super.doWrite(in);
|
||||
return;
|
||||
case 1:
|
||||
// Only one ByteBuf so use non-gathering write
|
||||
ByteBuffer nioBuffer = nioBuffers[0];
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||
final int localWrittenBytes = ch.write(nioBuffer);
|
||||
if (localWrittenBytes == 0) {
|
||||
setOpWrite = true;
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
if (expectedWrittenBytes == 0) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
if (expectedWrittenBytes == 0) {
|
||||
done = true;
|
||||
default:
|
||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
setOpWrite = true;
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
writtenBytes += localWrittenBytes;
|
||||
if (expectedWrittenBytes == 0) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (done) {
|
||||
// Release all buffers
|
||||
for (int i = msgCount; i > 0; i --) {
|
||||
for (int i = size; i > 0; i --) {
|
||||
final ByteBuf buf = (ByteBuf) in.current();
|
||||
in.progress(buf.readableBytes());
|
||||
in.remove();
|
||||
|
Loading…
Reference in New Issue
Block a user