Submit IO in batches to reduce overhead

Motivation:

We should submit multiple IO ops at once to reduce the syscall overhead.

Modifications:

- Submit multiple IO ops in batches
- Adjust default ringsize

Result:

Much better performance
This commit is contained in:
Norman Maurer 2020-09-04 17:06:03 +02:00
parent 9e13c5cfd9
commit 6545d80d23
7 changed files with 19 additions and 17 deletions

View File

@ -30,6 +30,7 @@ import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.StringUtil;
import org.junit.AfterClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -99,6 +100,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
// Test for https://github.com/netty/netty/issues/2647
@Test
@Ignore
public void testGatheringWriteBig() throws Throwable {
run();
}

View File

@ -226,7 +226,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
ioState &= ~POLL_OUT_SCHEDULED;
}
submissionQueue.addPollRemove(socket.intValue(), Native.POLLRDHUP);
submissionQueue.submit();
}
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
@ -313,7 +312,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
if (iovecArray.count() > 0) {
submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count());
submissionQueue().submit();
ioState |= WRITE_SCHEDULED;
}
} else {
@ -327,7 +325,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
buf.writerIndex());
submissionQueue.submit();
ioState |= WRITE_SCHEDULED;
}
@ -337,7 +334,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
ioState |= POLL_OUT_SCHEDULED;
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollOut(socket.intValue());
submissionQueue.submit();
}
abstract class AbstractUringUnsafe extends AbstractUnsafe {
@ -379,7 +375,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// Register POLLRDHUP
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRdHup(fd().intValue());
submissionQueue.submit();
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
// We still need to ensure we call fireChannelActive() in this case.
@ -450,7 +445,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
ioState |= POLL_IN_SCHEDULED;
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollIn(socket.intValue());
submissionQueue.submit();
}
final void readComplete(int res) {
@ -610,8 +604,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress);
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN);
ioUringSubmissionQueue.submit();
} catch (Throwable t) {
closeIfClosed();
promise.tryFailure(annotateConnectException(t, remoteAddress));

View File

@ -58,7 +58,6 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
IOUringSubmissionQueue submissionQueue = submissionQueue();
//Todo get network addresses
submissionQueue.addAccept(fd().intValue());
submissionQueue.submit();
}
protected void readComplete0(int res) {

View File

@ -198,7 +198,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
// Register for POLLRDHUP if this channel is already considered active.
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRdHup(fd().intValue());
submissionQueue.submit();
}
}
@ -224,7 +223,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity());
submissionQueue.submit();
}
@Override

View File

@ -34,8 +34,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
IOUringCompletionQueue.IOUringCompletionQueueCallback {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
//Todo set config ring buffer size
private static final int ringSize = 32;
private static final long ETIME = -62;
static final long ECANCELED = -125;
@ -59,7 +57,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
ringBuffer = Native.createRingBuffer(ringSize);
ringBuffer = Native.createRingBuffer();
eventfd = Native.newEventFd();
logger.trace("New EventLoop: {}", this.toString());
iovecArrayPool = new IovecArrayPool();
@ -155,9 +153,11 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
// Always call runAllTasks() as it will also fetch the scheduled tasks that are ready.
runAllTasks();
submissionQueue.submit();
try {
if (isShuttingDown()) {
closeAll();
submissionQueue.submit();
if (confirmShutdown()) {
break;
}
@ -239,8 +239,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
Native.eventFdRead(eventfd.intValue());
submissionQueue.addPollIn(eventfd.intValue());
// Submit so its picked up
submissionQueue.submit();
}
private void handleConnect(int fd, int res) {

View File

@ -31,7 +31,8 @@ import java.util.Locale;
final class Native {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32);
// Todo expose this via the EventLoopGroup constructor as well.
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096);
static {
Selector selector = null;

View File

@ -19,6 +19,10 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
@ -27,4 +31,12 @@ public class IOUringSocketHalfClosedTest extends SocketHalfClosedTest {
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return IOUringSocketTestPermutation.INSTANCE.socket();
}
@Ignore
@Test
public void testAutoCloseFalseDoesShutdownOutput() throws Throwable {
// This test only works on Linux / BSD / MacOS as we assume some semantics that are not true for Windows.
Assume.assumeFalse(PlatformDependent.isWindows());
run();
}
}