Respect ChannelConfig.getWriteSpinCount() when using epoll transport
Motivation: The writeSpinCount was ignored in the epoll transport and it just kept on trying writing. This could cause unnessary cpu spinning if a slow remote peer was reading the data very very slow. Modification: - Correctly take writeSpinCount into account when writing. Result: Less cpu spinning when writing to a slow remote peer.
This commit is contained in:
parent
a1efd1871b
commit
4e6cec0e26
@ -236,14 +236,14 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
return localReadAmount;
|
||||
}
|
||||
|
||||
protected final int doWriteBytes(ByteBuf buf) throws Exception {
|
||||
protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception {
|
||||
int readableBytes = buf.readableBytes();
|
||||
int writtenBytes = 0;
|
||||
if (buf.hasMemoryAddress()) {
|
||||
long memoryAddress = buf.memoryAddress();
|
||||
int readerIndex = buf.readerIndex();
|
||||
int writerIndex = buf.writerIndex();
|
||||
for (;;) {
|
||||
for (int i = writeSpinCount - 1; i >= 0; i--) {
|
||||
int localFlushedAmount = Native.writeAddress(
|
||||
fileDescriptor.intValue(), memoryAddress, readerIndex, writerIndex);
|
||||
if (localFlushedAmount > 0) {
|
||||
@ -253,9 +253,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
}
|
||||
readerIndex += localFlushedAmount;
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setFlag(Native.EPOLLOUT);
|
||||
return writtenBytes;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -265,7 +263,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
} else {
|
||||
nioBuf = buf.nioBuffer();
|
||||
}
|
||||
for (;;) {
|
||||
for (int i = writeSpinCount - 1; i >= 0; i--) {
|
||||
int pos = nioBuf.position();
|
||||
int limit = nioBuf.limit();
|
||||
int localFlushedAmount = Native.write(fileDescriptor.intValue(), nioBuf, pos, limit);
|
||||
@ -276,13 +274,15 @@ abstract class AbstractEpollChannel extends AbstractChannel {
|
||||
return writtenBytes;
|
||||
}
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setFlag(Native.EPOLLOUT);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return writtenBytes;
|
||||
}
|
||||
if (writtenBytes < readableBytes) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setFlag(Native.EPOLLOUT);
|
||||
}
|
||||
return writtenBytes;
|
||||
}
|
||||
|
||||
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
|
||||
|
@ -70,7 +70,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
|
||||
* @param buf the {@link ByteBuf} from which the bytes should be written
|
||||
*/
|
||||
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
|
||||
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception {
|
||||
int readableBytes = buf.readableBytes();
|
||||
if (readableBytes == 0) {
|
||||
in.remove();
|
||||
@ -78,16 +78,17 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
}
|
||||
|
||||
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
|
||||
int writtenBytes = doWriteBytes(buf);
|
||||
int writtenBytes = doWriteBytes(buf, writeSpinCount);
|
||||
in.removeBytes(writtenBytes);
|
||||
return writtenBytes == readableBytes;
|
||||
} else {
|
||||
ByteBuffer[] nioBuffers = buf.nioBuffers();
|
||||
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
|
||||
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
|
||||
private boolean writeBytesMultiple(
|
||||
ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException {
|
||||
|
||||
long expectedWrittenBytes = array.size();
|
||||
final long initialExpectedWrittenBytes = expectedWrittenBytes;
|
||||
@ -100,11 +101,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
boolean done = false;
|
||||
int offset = 0;
|
||||
int end = offset + cnt;
|
||||
for (;;) {
|
||||
for (int i = writeSpinCount - 1; i >= 0; i--) {
|
||||
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setFlag(Native.EPOLLOUT);
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
@ -127,14 +126,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
}
|
||||
} while (offset < end && localWrittenBytes > 0);
|
||||
}
|
||||
|
||||
if (!done) {
|
||||
setFlag(Native.EPOLLOUT);
|
||||
}
|
||||
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
||||
return done;
|
||||
}
|
||||
|
||||
private boolean writeBytesMultiple(
|
||||
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
|
||||
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
|
||||
int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException {
|
||||
|
||||
assert expectedWrittenBytes != 0;
|
||||
final long initialExpectedWrittenBytes = expectedWrittenBytes;
|
||||
@ -142,11 +143,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
boolean done = false;
|
||||
int offset = 0;
|
||||
int end = offset + nioBufferCnt;
|
||||
for (;;) {
|
||||
for (int i = writeSpinCount - 1; i >= 0; i--) {
|
||||
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
|
||||
if (localWrittenBytes == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setFlag(Native.EPOLLOUT);
|
||||
break;
|
||||
}
|
||||
expectedWrittenBytes -= localWrittenBytes;
|
||||
@ -173,6 +172,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
}
|
||||
|
||||
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
||||
if (!done) {
|
||||
setFlag(Native.EPOLLOUT);
|
||||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
@ -182,7 +184,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
* @param region the {@link DefaultFileRegion} from which the bytes should be written
|
||||
* @return amount the amount of written bytes
|
||||
*/
|
||||
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||
private boolean writeFileRegion(
|
||||
ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
|
||||
final long regionCount = region.count();
|
||||
if (region.transfered() >= regionCount) {
|
||||
in.remove();
|
||||
@ -193,13 +196,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
boolean done = false;
|
||||
long flushedAmount = 0;
|
||||
|
||||
for (;;) {
|
||||
for (int i = writeSpinCount - 1; i >= 0; i--) {
|
||||
final long offset = region.transfered();
|
||||
final long localFlushedAmount =
|
||||
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
|
||||
if (localFlushedAmount == 0) {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setFlag(Native.EPOLLOUT);
|
||||
break;
|
||||
}
|
||||
|
||||
@ -216,12 +217,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
|
||||
if (done) {
|
||||
in.remove();
|
||||
} else {
|
||||
// Returned EAGAIN need to set EPOLLOUT
|
||||
setFlag(Native.EPOLLOUT);
|
||||
}
|
||||
return done;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
int writeSpinCount = config().getWriteSpinCount();
|
||||
for (;;) {
|
||||
final int msgCount = in.size();
|
||||
|
||||
@ -233,7 +238,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
|
||||
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
|
||||
if (msgCount > 1 && in.current() instanceof ByteBuf) {
|
||||
if (!doWriteMultiple(in)) {
|
||||
if (!doWriteMultiple(in, writeSpinCount)) {
|
||||
break;
|
||||
}
|
||||
|
||||
@ -241,26 +246,26 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
// because a user might have triggered another write and flush when we notify his or her
|
||||
// listeners.
|
||||
} else { // msgCount == 1
|
||||
if (!doWriteSingle(in)) {
|
||||
if (!doWriteSingle(in, writeSpinCount)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
|
||||
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
|
||||
// The outbound buffer contains only one message or it contains a file region.
|
||||
Object msg = in.current();
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
if (!writeBytes(in, buf)) {
|
||||
if (!writeBytes(in, buf, writeSpinCount)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
}
|
||||
} else if (msg instanceof DefaultFileRegion) {
|
||||
DefaultFileRegion region = (DefaultFileRegion) msg;
|
||||
if (!writeFileRegion(in, region)) {
|
||||
if (!writeFileRegion(in, region, writeSpinCount)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
@ -273,14 +278,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
|
||||
private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
|
||||
if (PlatformDependent.hasUnsafe()) {
|
||||
// this means we can cast to IovArray and write the IovArray directly.
|
||||
IovArray array = IovArrayThreadLocal.get(in);
|
||||
int cnt = array.count();
|
||||
if (cnt >= 1) {
|
||||
// TODO: Handle the case where cnt == 1 specially.
|
||||
if (!writeBytesMultiple(in, array)) {
|
||||
if (!writeBytesMultiple(in, array, writeSpinCount)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
@ -293,7 +298,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
||||
int cnt = in.nioBufferCount();
|
||||
if (cnt >= 1) {
|
||||
// TODO: Handle the case where cnt == 1 specially.
|
||||
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
|
||||
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) {
|
||||
// was not able to write everything so break here we will get notified later again once
|
||||
// the network stack can handle more writes.
|
||||
return false;
|
||||
|
@ -88,14 +88,14 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
|
||||
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
|
||||
Object msg = in.current();
|
||||
if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
|
||||
// File descriptor was written, so remove it.
|
||||
in.remove();
|
||||
return true;
|
||||
}
|
||||
return super.doWriteSingle(in);
|
||||
return super.doWriteSingle(in, writeSpinCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user