diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 28aa53f97c..b55c0a0d1a 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -73,7 +73,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private static final int WRITE_SCHEDULED = 1 << 4; private static final int READ_SCHEDULED = 1 << 5; private static final int CONNECT_SCHEDULED = 1 << 6; - private int ioState; + // A byte is enough for now. + private byte ioState; + + // It's possible that multiple read / writes are issued. We need to keep track of these. + // Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good + // enough but let's be a bit more flexible for now. + private short numOutstandingWrites; + private short numOutstandingReads; private ChannelPromise delayedClose; private boolean inputClosedSeenErrorOnRead; @@ -263,34 +270,37 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha if ((ioState & WRITE_SCHEDULED) != 0) { return; } - scheduleWrite(in); + if (scheduleWrite(in) > 0) { + ioState |= WRITE_SCHEDULED; + } } - private void scheduleWrite(ChannelOutboundBuffer in) { - if (delayedClose != null) { - return; + private int scheduleWrite(ChannelOutboundBuffer in) { + if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) { + return 0; } if (in == null) { - return; + return 0; } int msgCount = in.size(); if (msgCount == 0) { - return; + return 0; } Object msg = in.current(); - assert (ioState & WRITE_SCHEDULED) == 0; if (msgCount > 1) { - ioUringUnsafe().scheduleWriteMultiple(in); + numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in); } else if ((msg instanceof ByteBuf) && ((ByteBuf) msg).nioBufferCount() > 1 || ((msg instanceof ByteBufHolder) && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) { // We also need some special handling for CompositeByteBuf - ioUringUnsafe().scheduleWriteMultiple(in); + numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in); } else { - ioUringUnsafe().scheduleWriteSingle(msg); + numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg); } - ioState |= WRITE_SCHEDULED; + // Ensure we never overflow + assert numOutstandingWrites > 0; + return numOutstandingWrites; } private void schedulePollOut() { @@ -316,14 +326,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private IOUringRecvByteAllocatorHandle allocHandle; /** - * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer}. + * Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of + * {@link #writeComplete(int, int)} calls that are expected because of the scheduled write. */ - protected abstract void scheduleWriteMultiple(ChannelOutboundBuffer in); + protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in); /** - * Schedule the write of a singe message. + * Schedule the write of a single message and returns the number of {@link #writeComplete(int, int)} calls + * that are expected because of the scheduled write */ - protected abstract void scheduleWriteSingle(Object msg); + protected abstract int scheduleWriteSingle(Object msg); @Override public void close(ChannelPromise promise) { @@ -463,16 +475,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - final void readComplete(int res) { - ioState &= ~READ_SCHEDULED; + final void readComplete(int res, int data) { + assert numOutstandingReads > 0; + if (--numOutstandingReads == 0) { + ioState &= ~READ_SCHEDULED; + } - readComplete0(res); + readComplete0(res, data, numOutstandingReads); } /** * Called once a read was completed. */ - protected abstract void readComplete0(int res); + protected abstract void readComplete0(int res, int data, int outstandingCompletes); /** * Called once POLLRDHUP event is ready to be processed @@ -486,9 +501,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha recvBufAllocHandle().rdHupReceived(); if (isActive()) { - if ((ioState & READ_SCHEDULED) == 0) { - scheduleFirstRead(); - } + scheduleFirstReadIfNeeded(); } else { // Just to be safe make sure the input marked as closed. shutdownInput(true); @@ -505,6 +518,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha return; } + scheduleFirstReadIfNeeded(); + } + + private void scheduleFirstReadIfNeeded() { if ((ioState & READ_SCHEDULED) == 0) { scheduleFirstRead(); } @@ -520,16 +537,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha protected final void scheduleRead() { // Only schedule another read if the fd is still open. - if (delayedClose == null && fd().isOpen()) { - ioState |= READ_SCHEDULED; - scheduleRead0(); + if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) { + numOutstandingReads = (short) scheduleRead0(); + if (numOutstandingReads > 0) { + ioState |= READ_SCHEDULED; + } } } /** - * A read should be scheduled. + * Schedule a read and returns the number of {@link #readComplete(int, int)} calls that are expected because of + * the scheduled read. */ - protected abstract void scheduleRead0(); + protected abstract int scheduleRead0(); /** * Called once POLLOUT event is ready to be processed @@ -578,33 +598,32 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha /** * Called once a write was completed. */ - final void writeComplete(int res) { - ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); - if (res >= 0) { - removeFromOutboundBuffer(channelOutboundBuffer, res); - // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write - // while still removing messages internally in removeBytes(...) which then may corrupt state. + final void writeComplete(int res, int data) { + assert numOutstandingWrites > 0; + --numOutstandingWrites; + + boolean writtenAll = writeComplete0(res, data, numOutstandingWrites); + if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) { + // We were not able to write everything, let's register for POLLOUT + schedulePollOut(); + } + + // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write + // while still removing messages internally in removeBytes(...) which then may corrupt state. + if (numOutstandingWrites == 0) { ioState &= ~WRITE_SCHEDULED; - doWrite(channelOutboundBuffer); - } else { - ioState &= ~WRITE_SCHEDULED; - try { - if (ioResult("io_uring write", res) == 0) { - // We were not able to write everything, let's register for POLLOUT - schedulePollOut(); - } - } catch (Throwable cause) { - handleWriteError(cause); + + // If we could write all and we did not schedule a pollout yet let us try to write again + if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) { + doWrite(unsafe().outboundBuffer()); } } } /** - * Called once a write completed and we should remove message(s) from the {@link ChannelOutboundBuffer} + * Called once a write was completed. */ - protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) { - outboundBuffer.removeBytes(bytes); - } + abstract boolean writeComplete0(int res, int data, int outstanding); /** * Connect was completed. diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 4382de08ef..c21b7a2ced 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -75,26 +75,33 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { @Override - protected void scheduleWriteMultiple(ChannelOutboundBuffer in) { - // Do nothing + protected int scheduleWriteMultiple(ChannelOutboundBuffer in) { + throw new UnsupportedOperationException(); } @Override - protected void scheduleWriteSingle(Object msg) { - // Do nothing + protected int scheduleWriteSingle(Object msg) { + throw new UnsupportedOperationException(); } @Override - protected void scheduleRead0() { + boolean writeComplete0(int res, int data, int outstanding) { + throw new UnsupportedOperationException(); + } + + @Override + protected int scheduleRead0() { final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.attemptedBytesRead(1); IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addAccept(fd().intValue(), acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, (short) 0); + return 1; } - protected void readComplete0(int res) { + @Override + protected void readComplete0(int res, int data, int outstanding) { final IOUringRecvByteAllocatorHandle allocHandle = (IOUringRecvByteAllocatorHandle) unsafe() .recvBufAllocHandle(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index 12159acc4f..2d62ee554b 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -211,7 +211,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple private ByteBuf readBuffer; @Override - protected void scheduleWriteMultiple(ChannelOutboundBuffer in) { + protected int scheduleWriteMultiple(ChannelOutboundBuffer in) { final IovArray iovecArray = ((IOUringEventLoop) eventLoop()).iovArray(); try { int offset = iovecArray.count(); @@ -222,18 +222,20 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple // This should never happen, anyway fallback to single write. scheduleWriteSingle(in.current()); } + return 1; } @Override - protected void scheduleWriteSingle(Object msg) { + protected int scheduleWriteSingle(Object msg) { ByteBuf buf = (ByteBuf) msg; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), buf.writerIndex(), (short) 0); + return 1; } @Override - protected void scheduleRead0() { + protected int scheduleRead0() { final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); ByteBuf byteBuf = allocHandle.allocate(alloc()); IOUringSubmissionQueue submissionQueue = submissionQueue(); @@ -244,10 +246,11 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), byteBuf.writerIndex(), byteBuf.capacity(), (short) 0); + return 1; } @Override - protected void readComplete0(int res) { + protected void readComplete0(int res, int data, int outstanding) { boolean close = false; final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); @@ -315,5 +318,21 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple shutdownInput(false); } } + + @Override + boolean writeComplete0(int res, int data, int outstanding) { + if (res >= 0) { + unsafe().outboundBuffer().removeBytes(res); + } else { + try { + if (ioResult("io_uring write", res) == 0) { + return false; + } + } catch (Throwable cause) { + handleWriteError(cause); + } + } + return true; + } } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelOption.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelOption.java index 5ee6620c84..1acb0580b0 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelOption.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelOption.java @@ -41,4 +41,6 @@ public class IOUringChannelOption extends UnixChannelOption { ChannelOption.valueOf(IOUringChannelOption.class, "TCP_DEFER_ACCEPT"); public static final ChannelOption TCP_QUICKACK = valueOf(IOUringChannelOption.class, "TCP_QUICKACK"); public static final ChannelOption> TCP_MD5SIG = valueOf("TCP_MD5SIG"); + + public static final ChannelOption MAX_DATAGRAM_PAYLOAD_SIZE = valueOf("MAX_DATAGRAM_PAYLOAD_SIZE"); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java index 85528deee5..22fc3dca74 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannel.java @@ -26,12 +26,10 @@ import io.netty.channel.DefaultAddressedEnvelope; import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.InternetProtocolFamily; -import io.netty.channel.unix.Buffer; import io.netty.channel.unix.Errors; import io.netty.channel.unix.Errors.NativeIoException; import io.netty.channel.unix.Socket; import io.netty.util.internal.ObjectUtil; -import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import java.io.IOException; @@ -41,7 +39,6 @@ import java.net.InetSocketAddress; import java.net.NetworkInterface; import java.net.PortUnreachableException; import java.net.SocketAddress; -import java.nio.ByteBuffer; import static io.netty.channel.unix.Errors.ioResult; @@ -326,75 +323,78 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme } final class IOUringDatagramChannelUnsafe extends AbstractUringUnsafe { - private ByteBuf readBuffer; - private boolean recvMsg; - // These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg // // TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use // some sort of other pool. Let's keep it simple for now. - private ByteBuffer recvmsgBuffer; - private long recvmsgBufferAddr = -1; - private ByteBuffer sendmsgBuffer; - private long sendmsgBufferAddr = -1; + // + // Consider exposing some configuration for that. + private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray(256); + private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray(256); + private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()]; + private final WriteProcessor writeProcessor = new WriteProcessor(); - private long sendmsgBufferAddr() { - long address = this.sendmsgBufferAddr; - if (address == -1) { - assert sendmsgBuffer == null; - int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC; - sendmsgBuffer = Buffer.allocateDirectWithNativeOrder(length); - sendmsgBufferAddr = address = Buffer.memoryAddress(sendmsgBuffer); + private ByteBuf readBuffer; - // memset once - PlatformDependent.setMemory(address, length, (byte) 0); + private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor { + private int written; + + @Override + public boolean processMessage(Object msg) { + if (scheduleWrite(msg, true)) { + written++; + return true; + } + return false; } - return address; - } - private long recvmsgBufferAddr() { - long address = this.recvmsgBufferAddr; - if (address == -1) { - assert recvmsgBuffer == null; - int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC; - recvmsgBuffer = Buffer.allocateDirectWithNativeOrder(length); - recvmsgBufferAddr = address = Buffer.memoryAddress(recvmsgBuffer); - - // memset once - PlatformDependent.setMemory(address, length, (byte) 0); + int write(ChannelOutboundBuffer in) { + written = 0; + try { + in.forEachFlushedMessage(this); + } catch (Exception e) { + // This should never happen as our processMessage(...) never throws. + throw new IllegalStateException(e); + } + return written; } - return address; } void releaseBuffers() { - if (sendmsgBuffer != null) { - Buffer.free(sendmsgBuffer); - sendmsgBuffer = null; - sendmsgBufferAddr = -1; - } - - if (recvmsgBuffer != null) { - Buffer.free(recvmsgBuffer); - recvmsgBuffer = null; - recvmsgBufferAddr = -1; - } + sendmsgHdrs.release(); + recvmsgHdrs.release(); } @Override - protected void readComplete0(int res) { + protected void readComplete0(int res, int data, int outstanding) { final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); final ChannelPipeline pipeline = pipeline(); ByteBuf byteBuf = this.readBuffer; - this.readBuffer = null; assert byteBuf != null; - boolean recvmsg = this.recvMsg; - this.recvMsg = false; - try { + if (data == -1) { + assert outstanding == 0; + // data == -1 means that we did a read(...) and not a recvmmsg(...) + readComplete(pipeline, allocHandle, byteBuf, res); + } else { + recvmsgComplete(pipeline, allocHandle, byteBuf, res, data, outstanding); + } + } catch (Throwable t) { + if (connected && t instanceof NativeIoException) { + t = translateForConnected((NativeIoException) t); + } + pipeline.fireExceptionCaught(t); + } + } + + private void readComplete(ChannelPipeline pipeline, IOUringRecvByteAllocatorHandle allocHandle, + ByteBuf byteBuf, int res) throws IOException { + try { + this.readBuffer = null; if (res < 0) { // If res is negative we should pass it to ioResult(...) which will either throw // or convert it to 0 if we could not read because the socket was not readable. - allocHandle.lastBytesRead(ioResult("io_uring read / recvmsg", res)); + allocHandle.lastBytesRead(ioResult("io_uring read", res)); } else if (res > 0) { byteBuf.writerIndex(byteBuf.writerIndex() + res); allocHandle.lastBytesRead(res); @@ -410,26 +410,12 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme pipeline.fireChannelReadComplete(); return; } - DatagramPacket packet; - if (!recvmsg) { - packet = new DatagramPacket(byteBuf, IOUringDatagramChannel.this.localAddress(), - IOUringDatagramChannel.this.remoteAddress()); - } else { - long sockaddrAddress = recvmsgBufferAddr() + Native.SIZEOF_MSGHDR; - final InetSocketAddress remote; - if (socket.isIpv6()) { - byte[] bytes = ((IOUringEventLoop) eventLoop()).inet6AddressArray(); - remote = SockaddrIn.readIPv6(sockaddrAddress, bytes); - } else { - byte[] bytes = ((IOUringEventLoop) eventLoop()).inet4AddressArray(); - remote = SockaddrIn.readIPv4(sockaddrAddress, bytes); - } - packet = new DatagramPacket(byteBuf, - IOUringDatagramChannel.this.localAddress(), remote); - } + allocHandle.incMessagesRead(1); - pipeline.fireChannelRead(packet); + pipeline.fireChannelRead(new DatagramPacket(byteBuf, IOUringDatagramChannel.this.localAddress(), + IOUringDatagramChannel.this.remoteAddress())); byteBuf = null; + if (allocHandle.continueReading()) { // Let's schedule another read. scheduleRead(); @@ -438,11 +424,6 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme allocHandle.readComplete(); pipeline.fireChannelReadComplete(); } - } catch (Throwable t) { - if (connected && t instanceof NativeIoException) { - t = translateForConnected((NativeIoException) t); - } - pipeline.fireExceptionCaught(t); } finally { if (byteBuf != null) { byteBuf.release(); @@ -450,44 +431,134 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme } } - @Override - protected void scheduleRead0() { - final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); - ByteBuf byteBuf = allocHandle.allocate(alloc()); - IOUringSubmissionQueue submissionQueue = submissionQueue(); + private void recvmsgComplete(ChannelPipeline pipeline, IOUringRecvByteAllocatorHandle allocHandle, + ByteBuf byteBuf, int res, int idx, int outstanding) throws IOException { + MsgHdrMemory hdr = recvmsgHdrs.hdr(idx); - assert readBuffer == null; - readBuffer = byteBuf; - - recvMsg = !isConnected(); - long bufferAddress = byteBuf.memoryAddress(); - allocHandle.attemptedBytesRead(byteBuf.writableBytes()); - - if (!recvMsg) { - submissionQueue.addRead(socket.intValue(), bufferAddress, - byteBuf.writerIndex(), byteBuf.capacity(), (short) 0); + if (res < 0) { + // If res is negative we should pass it to ioResult(...) which will either throw + // or convert it to 0 if we could not read because the socket was not readable. + allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res)); + } else if (res > 0) { + allocHandle.lastBytesRead(res); + allocHandle.incMessagesRead(1); + DatagramPacket packet = hdr.read(IOUringDatagramChannel.this, byteBuf, res); + pipeline.fireChannelRead(packet); } else { - int addrLen = addrLen(); - long recvmsgBufferAddr = recvmsgBufferAddr(); - long sockaddrAddress = recvmsgBufferAddr + Native.SIZEOF_MSGHDR; - long iovecAddress = sockaddrAddress + addrLen; + allocHandle.lastBytesRead(0); + } - Iov.write(iovecAddress, bufferAddress + byteBuf.writerIndex(), byteBuf.writableBytes()); - MsgHdr.write(recvmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1); - submissionQueue.addRecvmsg(socket.intValue(), recvmsgBufferAddr, (short) 0); + if (outstanding == 0) { + // There are no outstanding completion events, release the readBuffer and see if we need to schedule + // another one or if the user will do it. + this.readBuffer.release(); + this.readBuffer = null; + recvmsgHdrs.clear(); + if (allocHandle.continueReading()) { + // Let's schedule another read. + scheduleRead(); + } else { + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + } } } - private int addrLen() { - return socket.isIpv6() ? Native.SIZEOF_SOCKADDR_IN6 : - Native.SIZEOF_SOCKADDR_IN; + @Override + protected int scheduleRead0() { + final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + ByteBuf byteBuf = allocHandle.allocate(alloc()); + assert readBuffer == null; + readBuffer = byteBuf; + + int writable = byteBuf.writableBytes(); + allocHandle.attemptedBytesRead(writable); + int datagramSize = config().getMaxDatagramPayloadSize(); + + int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize); + + if (isConnected() && numDatagram <= 1) { + submissionQueue().addRead(socket.intValue(), byteBuf.memoryAddress(), + byteBuf.writerIndex(), byteBuf.capacity(), (short) -1); + return 1; + } else { + int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize); + if (scheduled == 0) { + // We could not schedule any recvmmsg so we need to release the buffer as there will be no + // completion event. + readBuffer = null; + byteBuf.release(); + } + return scheduled; + } + } + + private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) { + int writable = byteBuf.writableBytes(); + IOUringSubmissionQueue submissionQueue = submissionQueue(); + long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex(); + if (numDatagram <= 1) { + return scheduleRecvmsg0(submissionQueue, bufferAddress, writable) ? 1 : 0; + } + int i = 0; + // Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...) + for (; i < numDatagram && writable >= datagramSize; i++) { + if (!scheduleRecvmsg0(submissionQueue, bufferAddress, datagramSize)) { + break; + } + bufferAddress += datagramSize; + writable -= datagramSize; + } + return i; + } + + private boolean scheduleRecvmsg0(IOUringSubmissionQueue submissionQueue, long bufferAddress, int bufferLength) { + MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr(); + if (msgHdrMemory == null) { + // We can not continue reading before we did not submit the recvmsg(s) and received the results. + return false; + } + msgHdrMemory.write(socket, null, bufferAddress, bufferLength); + // We always use idx here so we can detect if no idx was used by checking if data < 0 in + // readComplete0(...) + submissionQueue.addRecvmsg(socket.intValue(), msgHdrMemory.address(), (short) msgHdrMemory.idx()); + return true; } @Override - protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) { - // When using Datagram we should consider the message written as long as there were any bytes written. - boolean removed = outboundBuffer.remove(); - assert removed; + boolean writeComplete0(int res, int data, int outstanding) { + ChannelOutboundBuffer outboundBuffer = outboundBuffer(); + if (data == -1) { + assert outstanding == 0; + // idx == -1 means that we did a write(...) and not a sendmsg(...) operation + return removeFromOutboundBuffer(outboundBuffer, res, "io_uring write"); + } + // Store the result so we can handle it as soon as we have no outstanding writes anymore. + sendmsgResArray[data] = res; + if (outstanding == 0) { + // All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer + boolean writtenSomething = false; + int numWritten = sendmsgHdrs.length(); + sendmsgHdrs.clear(); + for (int i = 0; i < numWritten; i++) { + writtenSomething |= removeFromOutboundBuffer( + outboundBuffer, sendmsgResArray[i], "io_uring sendmsg"); + } + return writtenSomething; + } + return true; + } + + private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) { + if (res >= 0) { + // When using Datagram we should consider the message written as long as res is not negative. + return outboundBuffer.remove(); + } + try { + return ioResult(errormsg, res) != 0; + } catch (Throwable cause) { + return outboundBuffer.remove(cause); + } } @Override @@ -499,15 +570,18 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme } @Override - protected void scheduleWriteMultiple(ChannelOutboundBuffer in) { - // We always just use scheduleWriteSingle for now. - scheduleWriteSingle(in.current()); + protected int scheduleWriteMultiple(ChannelOutboundBuffer in) { + return writeProcessor.write(in); } @Override - protected void scheduleWriteSingle(Object msg) { + protected int scheduleWriteSingle(Object msg) { + return scheduleWrite(msg, false) ? 1 : 0; + } + + private boolean scheduleWrite(Object msg, boolean forceSendmsg) { final ByteBuf data; - InetSocketAddress remoteAddress; + final InetSocketAddress remoteAddress; if (msg instanceof AddressedEnvelope) { @SuppressWarnings("unchecked") AddressedEnvelope envelope = @@ -522,23 +596,31 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme long bufferAddress = data.memoryAddress(); IOUringSubmissionQueue submissionQueue = submissionQueue(); if (remoteAddress == null) { + if (forceSendmsg) { + return scheduleSendmsg( + IOUringDatagramChannel.this.remoteAddress(), bufferAddress, data.readableBytes()); + } submissionQueue.addWrite(socket.intValue(), bufferAddress, data.readerIndex(), - data.writerIndex(), (short) 0); - } else { - int addrLen = addrLen(); - long sendmsgBufferAddr = sendmsgBufferAddr(); - long sockaddrAddress = sendmsgBufferAddr + Native.SIZEOF_MSGHDR; - long iovecAddress = sockaddrAddress + Native.SIZEOF_SOCKADDR_STORAGE; - - SockaddrIn.write(socket.isIpv6(), sockaddrAddress, remoteAddress); - Iov.write(iovecAddress, bufferAddress + data.readerIndex(), data.readableBytes()); - MsgHdr.write(sendmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1); - submissionQueue.addSendmsg(socket.intValue(), sendmsgBufferAddr, (short) 0); + data.writerIndex(), (short) -1); + return true; } + return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes()); + } + + private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress, int bufferLength) { + MsgHdrMemory hdr = sendmsgHdrs.nextHdr(); + if (hdr == null) { + // There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete + // before we can write again. + return false; + } + hdr.write(socket, remoteAddress, bufferAddress, bufferLength); + submissionQueue().addSendmsg(socket.intValue(), hdr.address(), (short) hdr.idx()); + return true; } } - private IOException translateForConnected(NativeIoException e) { + private static IOException translateForConnected(NativeIoException e) { // We need to correctly translate connect errors to match NIO behaviour. if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) { PortUnreachableException error = new PortUnreachableException(e.getMessage()); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java index ca62dd4762..41bd0be417 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringDatagramChannelConfig.java @@ -15,6 +15,7 @@ */ package io.netty.channel.uring; +import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelException; import io.netty.channel.ChannelOption; @@ -24,6 +25,7 @@ import io.netty.channel.MessageSizeEstimator; import io.netty.channel.RecvByteBufAllocator; import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.DatagramChannelConfig; +import io.netty.util.internal.ObjectUtil; import java.io.IOException; import java.net.InetAddress; @@ -33,6 +35,7 @@ import java.util.Map; public final class IOUringDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig { private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048); private boolean activeOnOpen; + private volatile int maxDatagramSize; IOUringDatagramChannelConfig(AbstractIOUringChannel channel) { super(channel); @@ -49,7 +52,7 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, IOUringChannelOption.SO_REUSEPORT, IOUringChannelOption.IP_FREEBIND, - IOUringChannelOption.IP_TRANSPARENT); + IOUringChannelOption.IP_TRANSPARENT, IOUringChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE); } @SuppressWarnings({ "unchecked", "deprecation" }) @@ -94,6 +97,9 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp if (option == IOUringChannelOption.IP_FREEBIND) { return (T) Boolean.valueOf(isFreeBind()); } + if (option == IOUringChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) { + return (T) Integer.valueOf(getMaxDatagramPayloadSize()); + } return super.getOption(option); } @@ -128,6 +134,8 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp setFreeBind((Boolean) value); } else if (option == IOUringChannelOption.IP_TRANSPARENT) { setIpTransparent((Boolean) value); + } else if (option == IOUringChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) { + setMaxDatagramPayloadSize((Integer) value); } else { return super.setOption(option, value); } @@ -463,4 +471,25 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp throw new ChannelException(e); } } + + /** + * Set the maximum {@link io.netty.channel.socket.DatagramPacket} size. This will be used to determine if + * a batch of {@code IORING_IO_RECVMSG} should be used when reading from the underlying socket. + * When batched {@code recvmmsg} is used + * we may be able to read multiple {@link io.netty.channel.socket.DatagramPacket}s with one syscall and so + * greatly improve the performance. This number will be used to slice {@link ByteBuf}s returned by the used + * {@link RecvByteBufAllocator}. You can use {@code 0} to disable the usage of batching, any other bigger value + * will enable it. + */ + public IOUringDatagramChannelConfig setMaxDatagramPayloadSize(int maxDatagramSize) { + this.maxDatagramSize = ObjectUtil.checkPositiveOrZero(maxDatagramSize, "maxDatagramSize"); + return this; + } + + /** + * Get the maximum {@link io.netty.channel.socket.DatagramPacket} size. + */ + public int getMaxDatagramPayloadSize() { + return maxDatagramSize; + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 1319742ebc..a17537a4b7 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -235,10 +235,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom return; } if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT || op == Native.IORING_OP_RECVMSG) { - handleRead(channel, res); + handleRead(channel, res, data); } else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE || op == Native.IORING_OP_SENDMSG) { - handleWrite(channel, res); + handleWrite(channel, res, data); } else if (op == Native.IORING_OP_POLL_ADD) { handlePollAdd(channel, res, data); } else if (op == Native.IORING_OP_POLL_REMOVE) { @@ -259,12 +259,12 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom } } - private void handleRead(AbstractIOUringChannel channel, int res) { - channel.ioUringUnsafe().readComplete(res); + private void handleRead(AbstractIOUringChannel channel, int res, int data) { + channel.ioUringUnsafe().readComplete(res, data); } - private void handleWrite(AbstractIOUringChannel channel, int res) { - channel.ioUringUnsafe().writeComplete(res); + private void handleWrite(AbstractIOUringChannel channel, int res, int data) { + channel.ioUringUnsafe().writeComplete(res, data); } private void handlePollAdd(AbstractIOUringChannel channel, int res, int pollMask) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java index 45f0d5b319..a086f0c78d 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Iov.java @@ -37,4 +37,20 @@ final class Iov { PlatformDependent.putLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN, length); } } + + static long readBufferAddress(long iovAddress) { + if (Native.SIZEOF_SIZE_T == 4) { + return PlatformDependent.getInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE); + } + assert Native.SIZEOF_SIZE_T == 8; + return PlatformDependent.getLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE); + } + + static int readBufferLength(long iovAddress) { + if (Native.SIZEOF_SIZE_T == 4) { + return PlatformDependent.getInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN); + } + assert Native.SIZEOF_SIZE_T == 8; + return (int) PlatformDependent.getLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN); + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdrMemory.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdrMemory.java new file mode 100644 index 0000000000..57e4d32e90 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdrMemory.java @@ -0,0 +1,83 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.internal.PlatformDependent; + +import java.net.InetSocketAddress; + +final class MsgHdrMemory { + private final long memory; + private final int idx; + + MsgHdrMemory(int idx) { + this.idx = idx; + int size = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC; + memory = PlatformDependent.allocateMemory(size); + PlatformDependent.setMemory(memory, size, (byte) 0); + } + + void write(LinuxSocket socket, InetSocketAddress address, long bufferAddress , int length) { + long sockAddress = memory + Native.SIZEOF_MSGHDR; + long iovAddress = sockAddress + Native.SIZEOF_SOCKADDR_STORAGE; + int addressLength; + if (address == null) { + addressLength = socket.isIpv6() ? Native.SIZEOF_SOCKADDR_IN6 : Native.SIZEOF_SOCKADDR_IN; + PlatformDependent.setMemory(sockAddress, Native.SIZEOF_SOCKADDR_STORAGE, (byte) 0); + } else { + addressLength = SockaddrIn.write(socket.isIpv6(), sockAddress, address); + } + Iov.write(iovAddress, bufferAddress, length); + MsgHdr.write(memory, sockAddress, addressLength, iovAddress, 1); + } + + DatagramPacket read(IOUringDatagramChannel channel, ByteBuf buffer, int bytesRead) { + long sockAddress = memory + Native.SIZEOF_MSGHDR; + IOUringEventLoop eventLoop = (IOUringEventLoop) channel.eventLoop(); + InetSocketAddress sender; + if (channel.socket.isIpv6()) { + byte[] bytes = eventLoop.inet6AddressArray(); + sender = SockaddrIn.readIPv6(sockAddress, bytes); + } else { + byte[] bytes = eventLoop.inet4AddressArray(); + sender = SockaddrIn.readIPv4(sockAddress, bytes); + } + long iovAddress = memory + Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE; + long bufferAddress = Iov.readBufferAddress(iovAddress); + int bufferLength = Iov.readBufferLength(iovAddress); + // reconstruct the reader index based on the memoryAddress of the buffer and the bufferAddress that was used + // in the iovec. + int readerIndex = (int) (bufferAddress - buffer.memoryAddress()); + + ByteBuf slice = buffer.slice(readerIndex, bufferLength) + .writerIndex(bytesRead); + return new DatagramPacket(slice.retain(), channel.localAddress(), sender); + } + + int idx() { + return idx; + } + + long address() { + return memory; + } + + void release() { + PlatformDependent.freeMemory(memory); + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdrMemoryArray.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdrMemoryArray.java new file mode 100644 index 0000000000..7a924b8dff --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/MsgHdrMemoryArray.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +final class MsgHdrMemoryArray { + private int idx; + private final MsgHdrMemory[] hdrs; + private final int capacity; + + MsgHdrMemoryArray(int capacity) { + this.capacity = capacity; + hdrs = new MsgHdrMemory[capacity]; + for (int i = 0; i < hdrs.length; i++) { + hdrs[i] = new MsgHdrMemory(i); + } + } + + MsgHdrMemory nextHdr() { + if (idx == hdrs.length - 1) { + return null; + } + return hdrs[idx++]; + } + + MsgHdrMemory hdr(int idx) { + return hdrs[idx]; + } + + void clear() { + idx = 0; + } + + int length() { + return idx; + } + + void release() { + for (MsgHdrMemory hdr: hdrs) { + hdr.release(); + } + } + + int capacity() { + return capacity; + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java index 00f8014457..cdf2332fa8 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/SockaddrIn.java @@ -29,11 +29,11 @@ final class SockaddrIn { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff }; private SockaddrIn() { } - static void write(boolean ipv6, long memory, InetSocketAddress address) { + static int write(boolean ipv6, long memory, InetSocketAddress address) { if (ipv6) { - SockaddrIn.writeIPv6(memory, address.getAddress(), address.getPort()); + return SockaddrIn.writeIPv6(memory, address.getAddress(), address.getPort()); } else { - SockaddrIn.writeIPv4(memory, address.getAddress(), address.getPort()); + return SockaddrIn.writeIPv4(memory, address.getAddress(), address.getPort()); } }