Fix a stall write in EpollSocketChannel

Motivation:

When a ChannelOutboundBuffer contains a series of entries whose messages
are all empty buffers, EpollSocketChannel sometimes fails to remove
them. As a result, the result of the write(EmptyByteBuf) is never
notified, making the user application hang.

Modifications:

- Add ChannelOutboundBuffer.removeBytes(long) method that updates the
  progress of the entries and removes them as much as the specified
  number of written bytes.  It also updates the reader index of
  partially flushed buffer.
  - Make both NioSocketChannel and EpollSocketChannel use it to reduce
    code duplication
  - Replace EpollSocketChannel.updateOutboundBuffer()
- Refactor EpollSocketChannel.doWrite() for simplicity
  - Split doWrite() into doWriteSingle() and doWriteMultiple()
- Do not add a zero-length buffer to IovArray
- Do not perform any real I/O when the size of IovArray is 0

Result:

Another regression is gone.
This commit is contained in:
Trustin Lee 2014-08-01 16:50:15 -07:00
parent d9934e5fb4
commit 16e50765d1
4 changed files with 136 additions and 107 deletions

View File

@ -110,6 +110,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.remove();
return true;
}
boolean done = false;
long writtenBytes = 0;
if (buf.hasMemoryAddress()) {
@ -131,7 +132,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
}
updateOutboundBuffer(in, writtenBytes);
in.removeBytes(writtenBytes);
return done;
} else if (buf.nioBufferCount() == 1) {
int readerIndex = buf.readerIndex();
@ -153,7 +155,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
}
updateOutboundBuffer(in, writtenBytes);
in.removeBytes(writtenBytes);
return done;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
@ -161,11 +164,15 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
}
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, IovArray array) throws IOException {
boolean done = false;
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
long expectedWrittenBytes = array.size();
int cnt = array.count();
assert expectedWrittenBytes != 0;
assert cnt != 0;
boolean done = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + cnt;
@ -198,13 +205,16 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} while (offset < end && localWrittenBytes > 0);
}
updateOutboundBuffer(in, writtenBytes);
in.removeBytes(writtenBytes);
return done;
}
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
assert expectedWrittenBytes != 0;
boolean done = false;
long writtenBytes = 0;
int offset = 0;
@ -239,32 +249,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
} while (offset < end && localWrittenBytes > 0);
}
updateOutboundBuffer(in, writtenBytes);
in.removeBytes(writtenBytes);
return done;
}
private static void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes) {
for (;;) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readable == writtenBytes
in.progress(readableBytes);
in.remove();
break;
}
}
}
/**
* Write a {@link DefaultFileRegion}
*
@ -272,6 +261,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
* @return amount the amount of written bytes
*/
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
if (region.transfered() >= region.count()) {
in.remove();
return true;
}
boolean done = false;
long flushedAmount = 0;
@ -310,66 +304,81 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
// Do gathering write if:
// * the outbound buffer contains more than one messages and
// * they are all buffers rather than a file region.
if (msgCount >= 1) {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArray.get(in);
int cnt = array.count();
if (cnt > 1) {
if (!writeBytesMultiple(in, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt > 1) {
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
}
}
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in)) {
break;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} else { // msgCount == 1
if (!doWriteSingle(in)) {
break;
}
} else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
}
}
}
private boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else {
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg));
}
return true;
}
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArray.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
} else {
ByteBuffer[] buffers = in.nioBuffers();
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else { // cnt == 0, which means the outbound buffer contained empty buffers only.
in.removeBytes(0);
}
}
return true;
}
@Override
public EpollSocketChannelConfig config() {
return config;

View File

@ -84,12 +84,21 @@ final class IovArray implements MessageProcessor {
// No more room!
return false;
}
int len = buf.readableBytes();
long addr = buf.memoryAddress();
int offset = buf.readerIndex();
long baseOffset = memoryAddress(count++);
long lengthOffset = baseOffset + ADDRESS_SIZE;
final int len = buf.readableBytes();
if (len == 0) {
// No need to add an empty buffer.
// We return true here because we want ChannelOutboundBuffer.forEachFlushedMessage() to continue
// fetching the next buffers.
return true;
}
final long addr = buf.memoryAddress();
final int offset = buf.readerIndex();
final long baseOffset = memoryAddress(count++);
final long lengthOffset = baseOffset + ADDRESS_SIZE;
if (ADDRESS_SIZE == 8) {
// 64bit
PlatformDependent.putLong(baseOffset, addr + offset);
@ -99,6 +108,7 @@ final class IovArray implements MessageProcessor {
PlatformDependent.putInt(baseOffset, (int) addr + offset);
PlatformDependent.putInt(lengthOffset, len);
}
size += len;
return true;
}

View File

@ -338,6 +338,36 @@ public final class ChannelOutboundBuffer {
}
}
/**
* Removes the fully written entries and update the reader index of the partially written entry.
* This operation assumes all messages in this buffer is {@link ByteBuf}.
*/
public void removeBytes(long writtenBytes) {
for (;;) {
final ByteBuf buf = (ByteBuf) current();
if (buf == null) {
break;
}
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
}
/**
* Returns an array of direct NIO buffers if the currently pending messages are made of {@link ByteBuf} only.
* {@code null} is returned otherwise. If this method returns a non-null array, {@link #nioBufferCount()} and

View File

@ -293,27 +293,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove();
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readableBytes == writtenBytes
in.progress(readableBytes);
in.remove();
break;
}
}
in.removeBytes(writtenBytes);
incompleteWrite(setOpWrite);
break;
}