Merge pull request #32 from normanmaurer/submit_batching
Submit IO in batches to reduce overhead
This commit is contained in:
commit
55460eea2e
@ -226,7 +226,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
ioState &= ~POLL_OUT_SCHEDULED;
|
ioState &= ~POLL_OUT_SCHEDULED;
|
||||||
}
|
}
|
||||||
submissionQueue.addPollRemove(socket.intValue(), Native.POLLRDHUP);
|
submissionQueue.addPollRemove(socket.intValue(), Native.POLLRDHUP);
|
||||||
submissionQueue.submit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
|
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
|
||||||
@ -313,7 +312,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
|
|
||||||
if (iovecArray.count() > 0) {
|
if (iovecArray.count() > 0) {
|
||||||
submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count());
|
submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count());
|
||||||
submissionQueue().submit();
|
|
||||||
ioState |= WRITE_SCHEDULED;
|
ioState |= WRITE_SCHEDULED;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -327,7 +325,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
|
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
|
||||||
buf.writerIndex());
|
buf.writerIndex());
|
||||||
submissionQueue.submit();
|
|
||||||
ioState |= WRITE_SCHEDULED;
|
ioState |= WRITE_SCHEDULED;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -337,7 +334,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
ioState |= POLL_OUT_SCHEDULED;
|
ioState |= POLL_OUT_SCHEDULED;
|
||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
submissionQueue.addPollOut(socket.intValue());
|
submissionQueue.addPollOut(socket.intValue());
|
||||||
submissionQueue.submit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class AbstractUringUnsafe extends AbstractUnsafe {
|
abstract class AbstractUringUnsafe extends AbstractUnsafe {
|
||||||
@ -379,7 +375,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
// Register POLLRDHUP
|
// Register POLLRDHUP
|
||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
submissionQueue.addPollRdHup(fd().intValue());
|
submissionQueue.addPollRdHup(fd().intValue());
|
||||||
submissionQueue.submit();
|
|
||||||
|
|
||||||
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
|
// Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
|
||||||
// We still need to ensure we call fireChannelActive() in this case.
|
// We still need to ensure we call fireChannelActive() in this case.
|
||||||
@ -450,7 +445,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
ioState |= POLL_IN_SCHEDULED;
|
ioState |= POLL_IN_SCHEDULED;
|
||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
submissionQueue.addPollIn(socket.intValue());
|
submissionQueue.addPollIn(socket.intValue());
|
||||||
submissionQueue.submit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final void readComplete(int res) {
|
final void readComplete(int res) {
|
||||||
@ -610,8 +604,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress);
|
socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress);
|
||||||
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
|
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
|
||||||
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN);
|
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN);
|
||||||
ioUringSubmissionQueue.submit();
|
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
closeIfClosed();
|
closeIfClosed();
|
||||||
promise.tryFailure(annotateConnectException(t, remoteAddress));
|
promise.tryFailure(annotateConnectException(t, remoteAddress));
|
||||||
|
@ -58,7 +58,6 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
|
|||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
//Todo get network addresses
|
//Todo get network addresses
|
||||||
submissionQueue.addAccept(fd().intValue());
|
submissionQueue.addAccept(fd().intValue());
|
||||||
submissionQueue.submit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void readComplete0(int res) {
|
protected void readComplete0(int res) {
|
||||||
|
@ -198,7 +198,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
|||||||
// Register for POLLRDHUP if this channel is already considered active.
|
// Register for POLLRDHUP if this channel is already considered active.
|
||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
submissionQueue.addPollRdHup(fd().intValue());
|
submissionQueue.addPollRdHup(fd().intValue());
|
||||||
submissionQueue.submit();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -224,7 +223,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
|||||||
|
|
||||||
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
|
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
|
||||||
byteBuf.writerIndex(), byteBuf.capacity());
|
byteBuf.writerIndex(), byteBuf.capacity());
|
||||||
submissionQueue.submit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -34,8 +34,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
|||||||
IOUringCompletionQueue.IOUringCompletionQueueCallback {
|
IOUringCompletionQueue.IOUringCompletionQueueCallback {
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
|
||||||
|
|
||||||
//Todo set config ring buffer size
|
|
||||||
private static final int ringSize = 32;
|
|
||||||
private static final long ETIME = -62;
|
private static final long ETIME = -62;
|
||||||
static final long ECANCELED = -125;
|
static final long ECANCELED = -125;
|
||||||
|
|
||||||
@ -59,7 +57,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
|||||||
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
|
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
|
||||||
super(parent, executor, addTaskWakesUp);
|
super(parent, executor, addTaskWakesUp);
|
||||||
|
|
||||||
ringBuffer = Native.createRingBuffer(ringSize);
|
ringBuffer = Native.createRingBuffer();
|
||||||
eventfd = Native.newEventFd();
|
eventfd = Native.newEventFd();
|
||||||
logger.trace("New EventLoop: {}", this.toString());
|
logger.trace("New EventLoop: {}", this.toString());
|
||||||
iovecArrayPool = new IovecArrayPool();
|
iovecArrayPool = new IovecArrayPool();
|
||||||
@ -155,9 +153,11 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
|||||||
// Always call runAllTasks() as it will also fetch the scheduled tasks that are ready.
|
// Always call runAllTasks() as it will also fetch the scheduled tasks that are ready.
|
||||||
runAllTasks();
|
runAllTasks();
|
||||||
|
|
||||||
|
submissionQueue.submit();
|
||||||
try {
|
try {
|
||||||
if (isShuttingDown()) {
|
if (isShuttingDown()) {
|
||||||
closeAll();
|
closeAll();
|
||||||
|
submissionQueue.submit();
|
||||||
if (confirmShutdown()) {
|
if (confirmShutdown()) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -239,8 +239,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
|||||||
Native.eventFdRead(eventfd.intValue());
|
Native.eventFdRead(eventfd.intValue());
|
||||||
|
|
||||||
submissionQueue.addPollIn(eventfd.intValue());
|
submissionQueue.addPollIn(eventfd.intValue());
|
||||||
// Submit so its picked up
|
|
||||||
submissionQueue.submit();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleConnect(int fd, int res) {
|
private void handleConnect(int fd, int res) {
|
||||||
|
@ -31,7 +31,8 @@ import java.util.Locale;
|
|||||||
|
|
||||||
final class Native {
|
final class Native {
|
||||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
|
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
|
||||||
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32);
|
// Todo expose this via the EventLoopGroup constructor as well.
|
||||||
|
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096);
|
||||||
|
|
||||||
static {
|
static {
|
||||||
Selector selector = null;
|
Selector selector = null;
|
||||||
|
@ -19,6 +19,10 @@ import io.netty.bootstrap.Bootstrap;
|
|||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||||
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
|
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
|
||||||
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
import org.junit.Assume;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -27,4 +31,12 @@ public class IOUringSocketHalfClosedTest extends SocketHalfClosedTest {
|
|||||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||||
return IOUringSocketTestPermutation.INSTANCE.socket();
|
return IOUringSocketTestPermutation.INSTANCE.socket();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
@Test
|
||||||
|
public void testAutoCloseFalseDoesShutdownOutput() throws Throwable {
|
||||||
|
// This test only works on Linux / BSD / MacOS as we assume some semantics that are not true for Windows.
|
||||||
|
Assume.assumeFalse(PlatformDependent.isWindows());
|
||||||
|
run();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user