[#2667] Write until EAGAIN in native transport and only call setEpollOut() in this case
Motivation: In the previous fix for #2667 I did introduce a bit overhead by calling setEpollOut() too often. Modification: Only call setEpollOut() if really needed and remove unused code. Result: Less overhead when saturate network.
This commit is contained in:
parent
530badb239
commit
72173f0d16
@ -49,7 +49,6 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
|
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
|
||||||
|
|
||||||
private final EpollSocketChannelConfig config;
|
private final EpollSocketChannelConfig config;
|
||||||
private Runnable flushTask;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The future of the current connection attempt. If not null, subsequent
|
* The future of the current connection attempt. If not null, subsequent
|
||||||
@ -110,7 +109,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
in.remove();
|
in.remove();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
boolean setEpollOut = false;
|
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
long writtenBytes = 0;
|
long writtenBytes = 0;
|
||||||
if (buf.nioBufferCount() == 1) {
|
if (buf.nioBufferCount() == 1) {
|
||||||
@ -128,11 +126,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
setEpollOut = true;
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
|
setEpollOut();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
updateOutboundBuffer(in, writtenBytes, 1, done, setEpollOut);
|
updateOutboundBuffer(in, writtenBytes, 1, done);
|
||||||
return done;
|
return done;
|
||||||
} else {
|
} else {
|
||||||
ByteBuffer[] nioBuffers = buf.nioBuffers();
|
ByteBuffer[] nioBuffers = buf.nioBuffers();
|
||||||
@ -149,7 +148,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers,
|
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers,
|
||||||
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
|
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
boolean setEpollOut = false;
|
|
||||||
long writtenBytes = 0;
|
long writtenBytes = 0;
|
||||||
int offset = 0;
|
int offset = 0;
|
||||||
int end = offset + nioBufferCnt;
|
int end = offset + nioBufferCnt;
|
||||||
@ -159,7 +157,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
|
|
||||||
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
|
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
|
||||||
if (localWrittenBytes == 0) {
|
if (localWrittenBytes == 0) {
|
||||||
setEpollOut = true;
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
|
setEpollOut();
|
||||||
break loop;
|
break loop;
|
||||||
}
|
}
|
||||||
expectedWrittenBytes -= localWrittenBytes;
|
expectedWrittenBytes -= localWrittenBytes;
|
||||||
@ -172,9 +171,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
if (bytes > localWrittenBytes) {
|
if (bytes > localWrittenBytes) {
|
||||||
buffer.position(pos + (int) localWrittenBytes);
|
buffer.position(pos + (int) localWrittenBytes);
|
||||||
// incomplete write
|
// incomplete write
|
||||||
|
|
||||||
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
|
|
||||||
setEpollOut();
|
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
offset++;
|
offset++;
|
||||||
@ -189,12 +185,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
|
updateOutboundBuffer(in, writtenBytes, msgCount, done);
|
||||||
return done;
|
return done;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
|
private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
|
||||||
boolean done, boolean setEpollOut) {
|
boolean done) {
|
||||||
if (done) {
|
if (done) {
|
||||||
// Release all buffers
|
// Release all buffers
|
||||||
for (int i = msgCount; i > 0; i --) {
|
for (int i = msgCount; i > 0; i --) {
|
||||||
@ -224,26 +220,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
incompleteWrite(setEpollOut);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void incompleteWrite(boolean setEpollOut) {
|
|
||||||
// Did not write completely.
|
|
||||||
if (setEpollOut) {
|
|
||||||
setEpollOut();
|
|
||||||
} else {
|
|
||||||
// Schedule flush again later so other tasks can be picked up in the meantime
|
|
||||||
Runnable flushTask = this.flushTask;
|
|
||||||
if (flushTask == null) {
|
|
||||||
flushTask = this.flushTask = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
unsafe().flush();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
eventLoop().execute(flushTask);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,7 +230,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
* @return amount the amount of written bytes
|
* @return amount the amount of written bytes
|
||||||
*/
|
*/
|
||||||
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
|
||||||
boolean setOpWrite = false;
|
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
long flushedAmount = 0;
|
long flushedAmount = 0;
|
||||||
|
|
||||||
@ -262,7 +237,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
long expected = region.count() - region.position();
|
long expected = region.count() - region.position();
|
||||||
long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected);
|
long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected);
|
||||||
if (localFlushedAmount == 0) {
|
if (localFlushedAmount == 0) {
|
||||||
setOpWrite = true;
|
// Returned EAGAIN need to set EPOLLOUT
|
||||||
|
setEpollOut();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -270,9 +246,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
if (region.transfered() >= region.count()) {
|
if (region.transfered() >= region.count()) {
|
||||||
done = true;
|
done = true;
|
||||||
break;
|
break;
|
||||||
} else {
|
|
||||||
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
|
|
||||||
setEpollOut();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -280,8 +253,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
in.remove();
|
in.remove();
|
||||||
} else {
|
|
||||||
incompleteWrite(setOpWrite);
|
|
||||||
}
|
}
|
||||||
return done;
|
return done;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user