[#2667] Write until EAGAIN in native transport

Motivation:

We need to continue write until we hit EAGAIN to make sure we not see an starvation

Modification:

Write until EAGAIN is returned

Result:

No starvation when using native transport with ET.
This commit is contained in:
Norman Maurer 2014-07-18 09:40:15 +02:00
parent 89593a719b
commit 14096d85cd
2 changed files with 104 additions and 104 deletions

View File

@ -581,14 +581,7 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_write(JNIEnv * env, jc
throwRuntimeException(env, "Unable to access address of buffer"); throwRuntimeException(env, "Unable to access address of buffer");
return -1; return -1;
} }
jint res = write0(env, clazz, fd, buffer, pos, limit); return write0(env, clazz, fd, buffer, pos, limit);
if (res > 0) {
// Increment the pos of the ByteBuffer as it may be only partial written to prevent data-corruption later once we
// try to write the remaining data.
// See https://github.com/netty/netty/issues/2371
incrementPosition(env, jbuffer, res);
}
return res;
} }
JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit) { JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_writeAddress(JNIEnv * env, jclass clazz, jint fd, jlong address, jint pos, jint limit) {
@ -675,18 +668,6 @@ JNIEXPORT jobject JNICALL Java_io_netty_channel_epoll_Native_recvFromAddress(JNI
return recvFrom0(env, fd, (void*) address, pos, limit); return recvFrom0(env, fd, (void*) address, pos, limit);
} }
void incrementPosition(JNIEnv * env, jobject bufObj, int written) {
// Get the current position using the (*env)->GetIntField if possible and fallback
// to slower (*env)->CallIntMethod(...) if needed
if (posFieldId == NULL) {
jint pos = (*env)->CallIntMethod(env, bufObj, posId, NULL);
(*env)->CallObjectMethod(env, bufObj, updatePosId, pos + written);
} else {
jint pos = (*env)->GetIntField(env, bufObj, posFieldId);
(*env)->SetIntField(env, bufObj, posFieldId, pos + written);
}
}
jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint length) { jlong writev0(JNIEnv * env, jclass clazz, jint fd, struct iovec iov[], jint length) {
ssize_t res; ssize_t res;
int err; int err;

View File

@ -107,28 +107,43 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written * @param buf the {@link ByteBuf} from which the bytes should be written
*/ */
private int doWriteBytes(ByteBuf buf) throws Exception { private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
int readerIndex = buf.readerIndex(); int readableBytes = buf.readableBytes();
int localFlushedAmount; if (readableBytes == 0) {
in.remove();
return true;
}
boolean setEpollOut = false;
boolean done = false;
long writtenBytes = 0;
if (buf.nioBufferCount() == 1) { if (buf.nioBufferCount() == 1) {
if (buf.hasMemoryAddress()) { int readerIndex = buf.readerIndex();
localFlushedAmount = Native.writeAddress(fd, buf.memoryAddress(), readerIndex, buf.writerIndex());
} else {
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes()); ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit()); for (;;) {
int pos = nioBuf.position();
int limit = nioBuf.limit();
int localFlushedAmount = Native.write(fd, nioBuf, pos, limit);
if (localFlushedAmount > 0) {
nioBuf.position(pos + localFlushedAmount);
writtenBytes += localFlushedAmount;
if (writtenBytes == readableBytes) {
done = true;
break;
} }
} else { } else {
// backed by more then one buffer, do a gathering write... setEpollOut = true;
ByteBuffer[] nioBufs = buf.nioBuffers(); break;
localFlushedAmount = (int) Native.writev(fd, nioBufs, 0, nioBufs.length);
} }
if (localFlushedAmount > 0) {
buf.readerIndex(readerIndex + localFlushedAmount);
} }
return localFlushedAmount; updateOutboundBuffer(in, writtenBytes, 1, done, setEpollOut);
return done;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple0(in, 1, nioBuffers, nioBuffers.length, readableBytes);
}
} }
private void writeBytesMultiple( private boolean writeBytesMultiple(
EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses) throws IOException { EpollChannelOutboundBuffer in, int msgCount, AddressEntry[] addresses) throws IOException {
int addressCnt = in.addressCount(); int addressCnt = in.addressCount();
@ -138,9 +153,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
long writtenBytes = 0; long writtenBytes = 0;
int offset = 0; int offset = 0;
int end = offset + addressCnt; int end = offset + addressCnt;
int spinCount = config.getWriteSpinCount();
loop: while (addressCnt > 0) { loop: while (addressCnt > 0) {
for (int i = spinCount - 1; i >= 0; i --) { for (;;) {
int cnt = addressCnt > Native.IOV_MAX? Native.IOV_MAX : addressCnt; int cnt = addressCnt > Native.IOV_MAX? Native.IOV_MAX : addressCnt;
long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt); long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt);
@ -165,30 +179,33 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
localWrittenBytes -= bytes; localWrittenBytes -= bytes;
} }
} }
}
if (expectedWrittenBytes == 0) { if (expectedWrittenBytes == 0) {
done = true; done = true;
break; break;
} }
} }
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
} }
private void writeBytesMultiple( updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
NioSocketChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException { return done;
}
int nioBufferCnt = in.nioBufferCount(); private boolean writeBytesMultiple(
long expectedWrittenBytes = in.nioBufferSize(); NioSocketChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException {
return writeBytesMultiple0(in, msgCount, nioBuffers, in.nioBufferCount(), in.nioBufferSize());
}
private boolean writeBytesMultiple0(
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
boolean done = false; boolean done = false;
boolean setEpollOut = false; boolean setEpollOut = false;
long writtenBytes = 0; long writtenBytes = 0;
int offset = 0; int offset = 0;
int end = offset + nioBufferCnt; int end = offset + nioBufferCnt;
int spinCount = config.getWriteSpinCount();
loop: while (nioBufferCnt > 0) { loop: while (nioBufferCnt > 0) {
for (int i = spinCount - 1; i >= 0; i --) { for (;;) {
int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt; int cnt = nioBufferCnt > Native.IOV_MAX? Native.IOV_MAX : nioBufferCnt;
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
@ -206,6 +223,9 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (bytes > localWrittenBytes) { if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes); buffer.position(pos + (int) localWrittenBytes);
// incomplete write // incomplete write
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
setEpollOut();
break; break;
} else { } else {
offset++; offset++;
@ -213,15 +233,15 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
localWrittenBytes -= bytes; localWrittenBytes -= bytes;
} }
} }
}
if (expectedWrittenBytes == 0) { if (expectedWrittenBytes == 0) {
done = true; done = true;
break; break;
} }
} }
}
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
return done;
} }
private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
@ -231,11 +251,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
for (int i = msgCount; i > 0; i --) { for (int i = msgCount; i > 0; i --) {
in.remove(); in.remove();
} }
in.progress(writtenBytes);
// Finish the write loop if no new messages were flushed by in.remove().
if (in.isEmpty()) {
clearEpollOut();
}
} else { } else {
// Did not write all buffers completely. // Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer. // Release the fully written buffers and update the indexes of the partially written buffer.
@ -273,7 +289,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
flushTask = this.flushTask = new Runnable() { flushTask = this.flushTask = new Runnable() {
@Override @Override
public void run() { public void run() {
flush(); unsafe().flush();
} }
}; };
} }
@ -287,8 +303,37 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
* @param region the {@link DefaultFileRegion} from which the bytes should be written * @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes * @return amount the amount of written bytes
*/ */
private long doWriteFileRegion(DefaultFileRegion region, long count) throws Exception { private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
return Native.sendfile(fd, region, region.transfered(), count); boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
long expected = region.count() - region.position();
long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (region.transfered() >= region.count()) {
done = true;
break;
} else {
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
setEpollOut();
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
}
return done;
} }
@Override @Override
@ -312,7 +357,11 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
// Ensure the pending writes are made of memoryaddresses only. // Ensure the pending writes are made of memoryaddresses only.
AddressEntry[] addresses = epollIn.memoryAddresses(); AddressEntry[] addresses = epollIn.memoryAddresses();
if (addresses != null) { if (addresses != null) {
writeBytesMultiple(epollIn, msgCount, addresses); if (!writeBytesMultiple(epollIn, msgCount, addresses)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely, // We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her // because a user might have triggered another write and flush when we notify his or her
@ -322,9 +371,13 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} else { } else {
NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in; NioSocketChannelOutboundBuffer nioIn = (NioSocketChannelOutboundBuffer) in;
// Ensure the pending writes are made of memoryaddresses only. // Ensure the pending writes are made of memoryaddresses only.
ByteBuffer[] buffers = nioIn.nioBuffers(); ByteBuffer[] nioBuffers = nioIn.nioBuffers();
if (buffers != null) { if (nioBuffers != null) {
writeBytesMultiple(nioIn, msgCount, buffers); if (!writeBytesMultiple(nioIn, msgCount, nioBuffers)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
break;
}
// We do not break the loop here even if the outbound buffer was flushed completely, // We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her // because a user might have triggered another write and flush when we notify his or her
@ -338,52 +391,18 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
Object msg = in.current(); Object msg = in.current();
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes(); if (!writeBytes(in, buf)) {
if (readableBytes == 0) { // was not able to write everything so break here we will get notified later again once
in.remove(); // the network stack can handle more writes.
continue;
}
boolean setEpollOut = false;
boolean done = false;
long flushedAmount = 0;
int writeSpinCount = config().getWriteSpinCount();
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setEpollOut = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setEpollOut);
break; break;
} }
} else if (msg instanceof DefaultFileRegion) { } else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg; DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
long expected = region.count() - region.position(); // was not able to write everything so break here we will get notified later again once
long localFlushedAmount = doWriteFileRegion(region, expected); // the network stack can handle more writes.
in.progress(localFlushedAmount);
if (localFlushedAmount < expected) {
setEpollOut();
break; break;
} }
if (region.transfered() >= region.count()) {
in.remove();
}
} else { } else {
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg)); throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg));
} }