diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 5a97f42360..c07ba80541 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -90,6 +90,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private volatile SocketAddress local; private volatile SocketAddress remote; + //to release it + private long iovecMemoryAddress; + AbstractIOUringChannel(final Channel parent, LinuxSocket socket) { super(parent); this.socket = checkNotNull(socket, "fd"); @@ -278,15 +281,38 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { logger.trace("IOUring doWrite message size: {}", in.size()); - if (!writeScheduled && in.size() >= 1) { + + if (writeScheduled) { + return; + } + int msgCount = in.size(); + if (msgCount > 1 && in.current() instanceof ByteBuf) { + doWriteMultiple(in); + //Object msg = in.current(); + //doWriteSingle((ByteBuf) msg); + } else if(msgCount == 1) { Object msg = in.current(); - if (msg instanceof ByteBuf) { - doWriteBytes((ByteBuf) msg); - } + doWriteSingle((ByteBuf) msg); } } - protected final void doWriteBytes(ByteBuf buf) { + private void doWriteMultiple(ChannelOutboundBuffer in) throws Exception { + final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool(); + + iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress(); + if (iovecMemoryAddress != -1) { + in.forEachFlushedMessage(iovecArray); + + if (iovecArray.count() > 0) { + submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count()); + submissionQueue().submit(); + } + } + //Todo error handling + } + + + protected final void doWriteSingle(ByteBuf buf) { if (buf.hasMemoryAddress()) { //link pollwrite operation addPollOut(); @@ -311,6 +337,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha abstract class AbstractUringUnsafe extends AbstractUnsafe { private IOUringRecvByteAllocatorHandle allocHandle; + @Override + protected final void flush0() { + // Flush immediately only when there's no pending flush. + // If there's a pending flush operation, event loop will call forceFlush() later, + // and thus there's no need to call it now. + if (!writeScheduled) { + super.flush0(); + } + } + private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) { if (promise == null) { // Closed via cancellation and the promise has been notified already. @@ -411,12 +447,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } } - } void writeComplete(int res) { writeScheduled = false; ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); + if (iovecMemoryAddress != 0) { + ((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress); + } if (res > 0) { channelOutboundBuffer.removeBytes(res); try { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 65ded74568..7917c68884 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -28,12 +28,10 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple AbstractIOUringServerChannel(int fd) { super(null, new LinuxSocket(fd)); - System.out.println("Server Socket fd: " + fd); } AbstractIOUringServerChannel(LinuxSocket fd) { super(null, fd); - System.out.println("Server Socket fd: " + fd); } @Override diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java index 636e5c0e92..ae982a118f 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java @@ -21,6 +21,7 @@ import io.netty.util.internal.SystemPropertyUtil; final class IOUring { private static final Throwable UNAVAILABILITY_CAUSE; + static final int OP_WRITEV = 2; static final int IO_POLL = 6; static final int IO_TIMEOUT = 11; static final int OP_ACCEPT = 13; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 9076fda01b..9e8d0a25ec 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -57,6 +57,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements private long prevDeadlineNanos = NONE; private boolean pendingWakeup; + private IovecArrayPool iovecArrayPool; IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { super(parent, executor, addTaskWakesUp); @@ -64,6 +65,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements ringBuffer = Native.createRingBuffer(ringSize); eventfd = Native.newEventFd(); logger.trace("New EventLoop: {}", this.toString()); + iovecArrayPool = new IovecArrayPool(); } @Override @@ -193,7 +195,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } ((AbstractIOUringChannel.AbstractUringUnsafe) readChannel.unsafe()).readComplete(res); break; - + case IOUring.OP_WRITEV: case IOUring.OP_WRITE: AbstractIOUringChannel writeChannel = channels.get(fd); if (writeChannel == null) { @@ -209,7 +211,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements ((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res); } break; - case IOUring.IO_TIMEOUT: if (res == ETIME) { prevDeadlineNanos = NONE; @@ -285,6 +286,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements e.printStackTrace(); } ringBuffer.close(); + iovecArrayPool.release(); } public RingBuffer getRingBuffer() { @@ -298,4 +300,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements Native.eventFdWrite(eventfd.intValue(), 1L); } } + + public IovecArrayPool getIovecArrayPool() { + return iovecArrayPool; + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 801eb80330..f6966a4407 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -244,6 +244,16 @@ final class IOUringSubmissionQueue { return true; } + public boolean addWritev(int fd, long iovecArrayAddress, int length) { + long sqe = getSqe(); + if (sqe == 0) { + return false; + } + setData(sqe, (byte) IOUring.OP_WRITEV, 0, fd, iovecArrayAddress, length, 0); + + return true; + } + private int flushSqe() { long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress)); long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IovecArrayPool.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IovecArrayPool.java new file mode 100644 index 0000000000..1aa0f42d0b --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IovecArrayPool.java @@ -0,0 +1,157 @@ +package io.netty.channel.uring; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelOutboundBuffer.MessageProcessor; +import io.netty.channel.unix.Buffer; +import io.netty.util.internal.PlatformDependent; + +import java.nio.ByteBuffer; +import java.util.Stack; + +import static io.netty.channel.unix.Limits.*; + + +final class IovecArrayPool implements MessageProcessor { + private static final int ADDRESS_SIZE = Buffer.addressSize(); + private static final int IOV_SIZE = 2 * ADDRESS_SIZE; + + //Todo configurable + private static int poolSize = 40; + + //Todo IOVEC entries shoule be lower IOVEMAX + private static final int IOV_ENTRIES = 500; + + private static final int IOVEC_ARRAY_SIZE = IOV_SIZE * IOV_ENTRIES; + private static final int CAPACITY = IOVEC_ARRAY_SIZE * poolSize; + + private final Stack remainingIovec; + private long maxBytes = SSIZE_MAX; + + private int count; + private long size; + private long currentIovecMemoryAddress; + + private final ByteBuffer iovecArrayMemory; + private final long iovecArrayMemoryAddress; + + public IovecArrayPool() { + //setup array + remainingIovec = new Stack(); + + iovecArrayMemory = Buffer.allocateDirectWithNativeOrder(CAPACITY); + iovecArrayMemoryAddress = Buffer.memoryAddress(iovecArrayMemory); + + for (long i = 0; i < poolSize; i++) { + remainingIovec.push(i); + } + } + + //Todo better naming + public long createNewIovecMemoryAddress() { + + //clear + size = 0; + count = 0; + + if (remainingIovec.empty()) { + //Todo allocate new Memory + return -1; + } + long index = remainingIovec.pop(); + + currentIovecMemoryAddress = index * IOVEC_ARRAY_SIZE + iovecArrayMemoryAddress; + return currentIovecMemoryAddress; + } + + //Todo error handling + public void releaseIovec(long iovecAddress) { + long index = (iovecAddress - iovecArrayMemoryAddress) / IOVEC_ARRAY_SIZE; + + remainingIovec.push(index); + } + + + private boolean add(ByteBuf buf, int offset, int len) { + if (count == IOV_ENTRIES) { + // No more room! + return false; + } else if (buf.nioBufferCount() == 1) { + if (len == 0) { + return true; + } + if (buf.hasMemoryAddress()) { + return add(buf.memoryAddress() + offset, len); + } else { + return false; + } + } else { + ByteBuffer[] buffers = buf.nioBuffers(offset, len); + for (ByteBuffer nioBuffer : buffers) { + final int remaining = nioBuffer.remaining(); + if (remaining != 0 && + (!add(Buffer.memoryAddress(nioBuffer) + nioBuffer.position(), remaining) || count == + IOV_ENTRIES)) { + return false; + } + } + return true; + } + } + + private boolean add(long addr, int len) { + assert addr != 0; + + // If there is at least 1 entry then we enforce the maximum bytes. We want to accept at least one entry so we + // will attempt to write some data and make progress. + if (maxBytes - len < size && count > 0) { + // If the size + len will overflow SSIZE_MAX we stop populate the IovArray. This is done as linux + // not allow to write more bytes then SSIZE_MAX with one writev(...) call and so will + // return 'EINVAL', which will raise an IOException. + // + // See also: + // - http://linux.die.net/man/2/writev + return false; + } + final int baseOffset = idx(count); + final int lengthOffset = baseOffset + ADDRESS_SIZE; + + size += len; + ++count; + + if (ADDRESS_SIZE == 8) { + // 64bit + if (PlatformDependent.hasUnsafe()) { + PlatformDependent.putLong(baseOffset + currentIovecMemoryAddress, addr); + PlatformDependent.putLong(lengthOffset + currentIovecMemoryAddress, len); + } + } else { + assert ADDRESS_SIZE == 4; + if (PlatformDependent.hasUnsafe()) { + PlatformDependent.putInt(baseOffset + currentIovecMemoryAddress, (int) addr); + PlatformDependent.putInt(lengthOffset + currentIovecMemoryAddress, len); + } + } + return true; + } + + @Override + public boolean processMessage(Object msg) throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buffer = (ByteBuf) msg; + return add(buffer, buffer.readerIndex(), buffer.readableBytes()); + } + return false; + } + + public int count() { + return count; + } + + private static int idx(int index) { + return IOV_SIZE * index; + } + + public void release() { + Buffer.free(iovecArrayMemory); + } +}