Implement batching of reading and writing when using datagram with io_uring. (#10606)

Motivation:

io_uring does not support recvmmsg / sendmmsg directly and so we need to
"emulate" it by submitting multiple IORING_IO_RECVMSG /
IORING_IO_SENDMSG calls.

Modifications:

- Allow to issue multiple write / read calls at once no matter what
  concrete AbstractIOUringChannel subclass it is
- Add support for batching recvmsg / sendmsg when using
IOUringDatagramChannel

Result:

Better performance
This commit is contained in:
Norman Maurer 2020-09-29 16:58:46 +02:00 committed by GitHub
parent d266af2778
commit 70b7621963
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 502 additions and 186 deletions

View File

@ -73,7 +73,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
private static final int WRITE_SCHEDULED = 1 << 4;
private static final int READ_SCHEDULED = 1 << 5;
private static final int CONNECT_SCHEDULED = 1 << 6;
private int ioState;
// A byte is enough for now.
private byte ioState;
// It's possible that multiple read / writes are issued. We need to keep track of these.
// Let's limit the amount of pending writes and reads by Short.MAX_VALUE. Maybe Byte.MAX_VALUE would also be good
// enough but let's be a bit more flexible for now.
private short numOutstandingWrites;
private short numOutstandingReads;
private ChannelPromise delayedClose;
private boolean inputClosedSeenErrorOnRead;
@ -263,34 +270,37 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
if ((ioState & WRITE_SCHEDULED) != 0) {
return;
}
scheduleWrite(in);
if (scheduleWrite(in) > 0) {
ioState |= WRITE_SCHEDULED;
}
}
private void scheduleWrite(ChannelOutboundBuffer in) {
if (delayedClose != null) {
return;
private int scheduleWrite(ChannelOutboundBuffer in) {
if (delayedClose != null || numOutstandingWrites == Short.MAX_VALUE) {
return 0;
}
if (in == null) {
return;
return 0;
}
int msgCount = in.size();
if (msgCount == 0) {
return;
return 0;
}
Object msg = in.current();
assert (ioState & WRITE_SCHEDULED) == 0;
if (msgCount > 1) {
ioUringUnsafe().scheduleWriteMultiple(in);
numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
} else if ((msg instanceof ByteBuf) && ((ByteBuf) msg).nioBufferCount() > 1 ||
((msg instanceof ByteBufHolder) && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
// We also need some special handling for CompositeByteBuf
ioUringUnsafe().scheduleWriteMultiple(in);
numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteMultiple(in);
} else {
ioUringUnsafe().scheduleWriteSingle(msg);
numOutstandingWrites = (short) ioUringUnsafe().scheduleWriteSingle(msg);
}
ioState |= WRITE_SCHEDULED;
// Ensure we never overflow
assert numOutstandingWrites > 0;
return numOutstandingWrites;
}
private void schedulePollOut() {
@ -316,14 +326,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
private IOUringRecvByteAllocatorHandle allocHandle;
/**
* Schedule the write of multiple messages in the {@link ChannelOutboundBuffer}.
* Schedule the write of multiple messages in the {@link ChannelOutboundBuffer} and returns the number of
* {@link #writeComplete(int, int)} calls that are expected because of the scheduled write.
*/
protected abstract void scheduleWriteMultiple(ChannelOutboundBuffer in);
protected abstract int scheduleWriteMultiple(ChannelOutboundBuffer in);
/**
* Schedule the write of a singe message.
* Schedule the write of a single message and returns the number of {@link #writeComplete(int, int)} calls
* that are expected because of the scheduled write
*/
protected abstract void scheduleWriteSingle(Object msg);
protected abstract int scheduleWriteSingle(Object msg);
@Override
public void close(ChannelPromise promise) {
@ -463,16 +475,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
}
}
final void readComplete(int res) {
ioState &= ~READ_SCHEDULED;
final void readComplete(int res, int data) {
assert numOutstandingReads > 0;
if (--numOutstandingReads == 0) {
ioState &= ~READ_SCHEDULED;
}
readComplete0(res);
readComplete0(res, data, numOutstandingReads);
}
/**
* Called once a read was completed.
*/
protected abstract void readComplete0(int res);
protected abstract void readComplete0(int res, int data, int outstandingCompletes);
/**
* Called once POLLRDHUP event is ready to be processed
@ -486,9 +501,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
recvBufAllocHandle().rdHupReceived();
if (isActive()) {
if ((ioState & READ_SCHEDULED) == 0) {
scheduleFirstRead();
}
scheduleFirstReadIfNeeded();
} else {
// Just to be safe make sure the input marked as closed.
shutdownInput(true);
@ -505,6 +518,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
return;
}
scheduleFirstReadIfNeeded();
}
private void scheduleFirstReadIfNeeded() {
if ((ioState & READ_SCHEDULED) == 0) {
scheduleFirstRead();
}
@ -520,16 +537,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
protected final void scheduleRead() {
// Only schedule another read if the fd is still open.
if (delayedClose == null && fd().isOpen()) {
ioState |= READ_SCHEDULED;
scheduleRead0();
if (delayedClose == null && fd().isOpen() && (ioState & READ_SCHEDULED) == 0) {
numOutstandingReads = (short) scheduleRead0();
if (numOutstandingReads > 0) {
ioState |= READ_SCHEDULED;
}
}
}
/**
* A read should be scheduled.
* Schedule a read and returns the number of {@link #readComplete(int, int)} calls that are expected because of
* the scheduled read.
*/
protected abstract void scheduleRead0();
protected abstract int scheduleRead0();
/**
* Called once POLLOUT event is ready to be processed
@ -578,33 +598,32 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
/**
* Called once a write was completed.
*/
final void writeComplete(int res) {
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (res >= 0) {
removeFromOutboundBuffer(channelOutboundBuffer, res);
// We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
// while still removing messages internally in removeBytes(...) which then may corrupt state.
final void writeComplete(int res, int data) {
assert numOutstandingWrites > 0;
--numOutstandingWrites;
boolean writtenAll = writeComplete0(res, data, numOutstandingWrites);
if (!writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
// We were not able to write everything, let's register for POLLOUT
schedulePollOut();
}
// We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
// while still removing messages internally in removeBytes(...) which then may corrupt state.
if (numOutstandingWrites == 0) {
ioState &= ~WRITE_SCHEDULED;
doWrite(channelOutboundBuffer);
} else {
ioState &= ~WRITE_SCHEDULED;
try {
if (ioResult("io_uring write", res) == 0) {
// We were not able to write everything, let's register for POLLOUT
schedulePollOut();
}
} catch (Throwable cause) {
handleWriteError(cause);
// If we could write all and we did not schedule a pollout yet let us try to write again
if (writtenAll && (ioState & POLL_OUT_SCHEDULED) == 0) {
doWrite(unsafe().outboundBuffer());
}
}
}
/**
* Called once a write completed and we should remove message(s) from the {@link ChannelOutboundBuffer}
* Called once a write was completed.
*/
protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) {
outboundBuffer.removeBytes(bytes);
}
abstract boolean writeComplete0(int res, int data, int outstanding);
/**
* Connect was completed.

View File

@ -75,26 +75,33 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
@Override
protected void scheduleWriteMultiple(ChannelOutboundBuffer in) {
// Do nothing
protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
throw new UnsupportedOperationException();
}
@Override
protected void scheduleWriteSingle(Object msg) {
// Do nothing
protected int scheduleWriteSingle(Object msg) {
throw new UnsupportedOperationException();
}
@Override
protected void scheduleRead0() {
boolean writeComplete0(int res, int data, int outstanding) {
throw new UnsupportedOperationException();
}
@Override
protected int scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.attemptedBytesRead(1);
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addAccept(fd().intValue(),
acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, (short) 0);
return 1;
}
protected void readComplete0(int res) {
@Override
protected void readComplete0(int res, int data, int outstanding) {
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) unsafe()
.recvBufAllocHandle();

View File

@ -211,7 +211,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
private ByteBuf readBuffer;
@Override
protected void scheduleWriteMultiple(ChannelOutboundBuffer in) {
protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
final IovArray iovecArray = ((IOUringEventLoop) eventLoop()).iovArray();
try {
int offset = iovecArray.count();
@ -222,18 +222,20 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
// This should never happen, anyway fallback to single write.
scheduleWriteSingle(in.current());
}
return 1;
}
@Override
protected void scheduleWriteSingle(Object msg) {
protected int scheduleWriteSingle(Object msg) {
ByteBuf buf = (ByteBuf) msg;
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
buf.writerIndex(), (short) 0);
return 1;
}
@Override
protected void scheduleRead0() {
protected int scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = allocHandle.allocate(alloc());
IOUringSubmissionQueue submissionQueue = submissionQueue();
@ -244,10 +246,11 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity(), (short) 0);
return 1;
}
@Override
protected void readComplete0(int res) {
protected void readComplete0(int res, int data, int outstanding) {
boolean close = false;
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
@ -315,5 +318,21 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
shutdownInput(false);
}
}
@Override
boolean writeComplete0(int res, int data, int outstanding) {
if (res >= 0) {
unsafe().outboundBuffer().removeBytes(res);
} else {
try {
if (ioResult("io_uring write", res) == 0) {
return false;
}
} catch (Throwable cause) {
handleWriteError(cause);
}
}
return true;
}
}
}

View File

@ -41,4 +41,6 @@ public class IOUringChannelOption<T> extends UnixChannelOption<T> {
ChannelOption.valueOf(IOUringChannelOption.class, "TCP_DEFER_ACCEPT");
public static final ChannelOption<Boolean> TCP_QUICKACK = valueOf(IOUringChannelOption.class, "TCP_QUICKACK");
public static final ChannelOption<Map<InetAddress, byte[]>> TCP_MD5SIG = valueOf("TCP_MD5SIG");
public static final ChannelOption<Integer> MAX_DATAGRAM_PAYLOAD_SIZE = valueOf("MAX_DATAGRAM_PAYLOAD_SIZE");
}

View File

@ -26,12 +26,10 @@ import io.netty.channel.DefaultAddressedEnvelope;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.Buffer;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.channel.unix.Socket;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import java.io.IOException;
@ -41,7 +39,6 @@ import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.PortUnreachableException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import static io.netty.channel.unix.Errors.ioResult;
@ -326,75 +323,78 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
}
final class IOUringDatagramChannelUnsafe extends AbstractUringUnsafe {
private ByteBuf readBuffer;
private boolean recvMsg;
// These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
//
// TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
// some sort of other pool. Let's keep it simple for now.
private ByteBuffer recvmsgBuffer;
private long recvmsgBufferAddr = -1;
private ByteBuffer sendmsgBuffer;
private long sendmsgBufferAddr = -1;
//
// Consider exposing some configuration for that.
private final MsgHdrMemoryArray recvmsgHdrs = new MsgHdrMemoryArray(256);
private final MsgHdrMemoryArray sendmsgHdrs = new MsgHdrMemoryArray(256);
private final int[] sendmsgResArray = new int[sendmsgHdrs.capacity()];
private final WriteProcessor writeProcessor = new WriteProcessor();
private long sendmsgBufferAddr() {
long address = this.sendmsgBufferAddr;
if (address == -1) {
assert sendmsgBuffer == null;
int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC;
sendmsgBuffer = Buffer.allocateDirectWithNativeOrder(length);
sendmsgBufferAddr = address = Buffer.memoryAddress(sendmsgBuffer);
private ByteBuf readBuffer;
// memset once
PlatformDependent.setMemory(address, length, (byte) 0);
private final class WriteProcessor implements ChannelOutboundBuffer.MessageProcessor {
private int written;
@Override
public boolean processMessage(Object msg) {
if (scheduleWrite(msg, true)) {
written++;
return true;
}
return false;
}
return address;
}
private long recvmsgBufferAddr() {
long address = this.recvmsgBufferAddr;
if (address == -1) {
assert recvmsgBuffer == null;
int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC;
recvmsgBuffer = Buffer.allocateDirectWithNativeOrder(length);
recvmsgBufferAddr = address = Buffer.memoryAddress(recvmsgBuffer);
// memset once
PlatformDependent.setMemory(address, length, (byte) 0);
int write(ChannelOutboundBuffer in) {
written = 0;
try {
in.forEachFlushedMessage(this);
} catch (Exception e) {
// This should never happen as our processMessage(...) never throws.
throw new IllegalStateException(e);
}
return written;
}
return address;
}
void releaseBuffers() {
if (sendmsgBuffer != null) {
Buffer.free(sendmsgBuffer);
sendmsgBuffer = null;
sendmsgBufferAddr = -1;
}
if (recvmsgBuffer != null) {
Buffer.free(recvmsgBuffer);
recvmsgBuffer = null;
recvmsgBufferAddr = -1;
}
sendmsgHdrs.release();
recvmsgHdrs.release();
}
@Override
protected void readComplete0(int res) {
protected void readComplete0(int res, int data, int outstanding) {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline();
ByteBuf byteBuf = this.readBuffer;
this.readBuffer = null;
assert byteBuf != null;
boolean recvmsg = this.recvMsg;
this.recvMsg = false;
try {
if (data == -1) {
assert outstanding == 0;
// data == -1 means that we did a read(...) and not a recvmmsg(...)
readComplete(pipeline, allocHandle, byteBuf, res);
} else {
recvmsgComplete(pipeline, allocHandle, byteBuf, res, data, outstanding);
}
} catch (Throwable t) {
if (connected && t instanceof NativeIoException) {
t = translateForConnected((NativeIoException) t);
}
pipeline.fireExceptionCaught(t);
}
}
private void readComplete(ChannelPipeline pipeline, IOUringRecvByteAllocatorHandle allocHandle,
ByteBuf byteBuf, int res) throws IOException {
try {
this.readBuffer = null;
if (res < 0) {
// If res is negative we should pass it to ioResult(...) which will either throw
// or convert it to 0 if we could not read because the socket was not readable.
allocHandle.lastBytesRead(ioResult("io_uring read / recvmsg", res));
allocHandle.lastBytesRead(ioResult("io_uring read", res));
} else if (res > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + res);
allocHandle.lastBytesRead(res);
@ -410,26 +410,12 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
pipeline.fireChannelReadComplete();
return;
}
DatagramPacket packet;
if (!recvmsg) {
packet = new DatagramPacket(byteBuf, IOUringDatagramChannel.this.localAddress(),
IOUringDatagramChannel.this.remoteAddress());
} else {
long sockaddrAddress = recvmsgBufferAddr() + Native.SIZEOF_MSGHDR;
final InetSocketAddress remote;
if (socket.isIpv6()) {
byte[] bytes = ((IOUringEventLoop) eventLoop()).inet6AddressArray();
remote = SockaddrIn.readIPv6(sockaddrAddress, bytes);
} else {
byte[] bytes = ((IOUringEventLoop) eventLoop()).inet4AddressArray();
remote = SockaddrIn.readIPv4(sockaddrAddress, bytes);
}
packet = new DatagramPacket(byteBuf,
IOUringDatagramChannel.this.localAddress(), remote);
}
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(packet);
pipeline.fireChannelRead(new DatagramPacket(byteBuf, IOUringDatagramChannel.this.localAddress(),
IOUringDatagramChannel.this.remoteAddress()));
byteBuf = null;
if (allocHandle.continueReading()) {
// Let's schedule another read.
scheduleRead();
@ -438,11 +424,6 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
} catch (Throwable t) {
if (connected && t instanceof NativeIoException) {
t = translateForConnected((NativeIoException) t);
}
pipeline.fireExceptionCaught(t);
} finally {
if (byteBuf != null) {
byteBuf.release();
@ -450,44 +431,134 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
}
}
@Override
protected void scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = allocHandle.allocate(alloc());
IOUringSubmissionQueue submissionQueue = submissionQueue();
private void recvmsgComplete(ChannelPipeline pipeline, IOUringRecvByteAllocatorHandle allocHandle,
ByteBuf byteBuf, int res, int idx, int outstanding) throws IOException {
MsgHdrMemory hdr = recvmsgHdrs.hdr(idx);
assert readBuffer == null;
readBuffer = byteBuf;
recvMsg = !isConnected();
long bufferAddress = byteBuf.memoryAddress();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
if (!recvMsg) {
submissionQueue.addRead(socket.intValue(), bufferAddress,
byteBuf.writerIndex(), byteBuf.capacity(), (short) 0);
if (res < 0) {
// If res is negative we should pass it to ioResult(...) which will either throw
// or convert it to 0 if we could not read because the socket was not readable.
allocHandle.lastBytesRead(ioResult("io_uring recvmsg", res));
} else if (res > 0) {
allocHandle.lastBytesRead(res);
allocHandle.incMessagesRead(1);
DatagramPacket packet = hdr.read(IOUringDatagramChannel.this, byteBuf, res);
pipeline.fireChannelRead(packet);
} else {
int addrLen = addrLen();
long recvmsgBufferAddr = recvmsgBufferAddr();
long sockaddrAddress = recvmsgBufferAddr + Native.SIZEOF_MSGHDR;
long iovecAddress = sockaddrAddress + addrLen;
allocHandle.lastBytesRead(0);
}
Iov.write(iovecAddress, bufferAddress + byteBuf.writerIndex(), byteBuf.writableBytes());
MsgHdr.write(recvmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1);
submissionQueue.addRecvmsg(socket.intValue(), recvmsgBufferAddr, (short) 0);
if (outstanding == 0) {
// There are no outstanding completion events, release the readBuffer and see if we need to schedule
// another one or if the user will do it.
this.readBuffer.release();
this.readBuffer = null;
recvmsgHdrs.clear();
if (allocHandle.continueReading()) {
// Let's schedule another read.
scheduleRead();
} else {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
}
private int addrLen() {
return socket.isIpv6() ? Native.SIZEOF_SOCKADDR_IN6 :
Native.SIZEOF_SOCKADDR_IN;
@Override
protected int scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
ByteBuf byteBuf = allocHandle.allocate(alloc());
assert readBuffer == null;
readBuffer = byteBuf;
int writable = byteBuf.writableBytes();
allocHandle.attemptedBytesRead(writable);
int datagramSize = config().getMaxDatagramPayloadSize();
int numDatagram = datagramSize == 0 ? 1 : Math.max(1, byteBuf.writableBytes() / datagramSize);
if (isConnected() && numDatagram <= 1) {
submissionQueue().addRead(socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity(), (short) -1);
return 1;
} else {
int scheduled = scheduleRecvmsg(byteBuf, numDatagram, datagramSize);
if (scheduled == 0) {
// We could not schedule any recvmmsg so we need to release the buffer as there will be no
// completion event.
readBuffer = null;
byteBuf.release();
}
return scheduled;
}
}
private int scheduleRecvmsg(ByteBuf byteBuf, int numDatagram, int datagramSize) {
int writable = byteBuf.writableBytes();
IOUringSubmissionQueue submissionQueue = submissionQueue();
long bufferAddress = byteBuf.memoryAddress() + byteBuf.writerIndex();
if (numDatagram <= 1) {
return scheduleRecvmsg0(submissionQueue, bufferAddress, writable) ? 1 : 0;
}
int i = 0;
// Add multiple IORING_OP_RECVMSG to the submission queue. This basically emulates recvmmsg(...)
for (; i < numDatagram && writable >= datagramSize; i++) {
if (!scheduleRecvmsg0(submissionQueue, bufferAddress, datagramSize)) {
break;
}
bufferAddress += datagramSize;
writable -= datagramSize;
}
return i;
}
private boolean scheduleRecvmsg0(IOUringSubmissionQueue submissionQueue, long bufferAddress, int bufferLength) {
MsgHdrMemory msgHdrMemory = recvmsgHdrs.nextHdr();
if (msgHdrMemory == null) {
// We can not continue reading before we did not submit the recvmsg(s) and received the results.
return false;
}
msgHdrMemory.write(socket, null, bufferAddress, bufferLength);
// We always use idx here so we can detect if no idx was used by checking if data < 0 in
// readComplete0(...)
submissionQueue.addRecvmsg(socket.intValue(), msgHdrMemory.address(), (short) msgHdrMemory.idx());
return true;
}
@Override
protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) {
// When using Datagram we should consider the message written as long as there were any bytes written.
boolean removed = outboundBuffer.remove();
assert removed;
boolean writeComplete0(int res, int data, int outstanding) {
ChannelOutboundBuffer outboundBuffer = outboundBuffer();
if (data == -1) {
assert outstanding == 0;
// idx == -1 means that we did a write(...) and not a sendmsg(...) operation
return removeFromOutboundBuffer(outboundBuffer, res, "io_uring write");
}
// Store the result so we can handle it as soon as we have no outstanding writes anymore.
sendmsgResArray[data] = res;
if (outstanding == 0) {
// All writes are done as part of a batch. Let's remove these from the ChannelOutboundBuffer
boolean writtenSomething = false;
int numWritten = sendmsgHdrs.length();
sendmsgHdrs.clear();
for (int i = 0; i < numWritten; i++) {
writtenSomething |= removeFromOutboundBuffer(
outboundBuffer, sendmsgResArray[i], "io_uring sendmsg");
}
return writtenSomething;
}
return true;
}
private boolean removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int res, String errormsg) {
if (res >= 0) {
// When using Datagram we should consider the message written as long as res is not negative.
return outboundBuffer.remove();
}
try {
return ioResult(errormsg, res) != 0;
} catch (Throwable cause) {
return outboundBuffer.remove(cause);
}
}
@Override
@ -499,15 +570,18 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
}
@Override
protected void scheduleWriteMultiple(ChannelOutboundBuffer in) {
// We always just use scheduleWriteSingle for now.
scheduleWriteSingle(in.current());
protected int scheduleWriteMultiple(ChannelOutboundBuffer in) {
return writeProcessor.write(in);
}
@Override
protected void scheduleWriteSingle(Object msg) {
protected int scheduleWriteSingle(Object msg) {
return scheduleWrite(msg, false) ? 1 : 0;
}
private boolean scheduleWrite(Object msg, boolean forceSendmsg) {
final ByteBuf data;
InetSocketAddress remoteAddress;
final InetSocketAddress remoteAddress;
if (msg instanceof AddressedEnvelope) {
@SuppressWarnings("unchecked")
AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
@ -522,23 +596,31 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
long bufferAddress = data.memoryAddress();
IOUringSubmissionQueue submissionQueue = submissionQueue();
if (remoteAddress == null) {
if (forceSendmsg) {
return scheduleSendmsg(
IOUringDatagramChannel.this.remoteAddress(), bufferAddress, data.readableBytes());
}
submissionQueue.addWrite(socket.intValue(), bufferAddress, data.readerIndex(),
data.writerIndex(), (short) 0);
} else {
int addrLen = addrLen();
long sendmsgBufferAddr = sendmsgBufferAddr();
long sockaddrAddress = sendmsgBufferAddr + Native.SIZEOF_MSGHDR;
long iovecAddress = sockaddrAddress + Native.SIZEOF_SOCKADDR_STORAGE;
SockaddrIn.write(socket.isIpv6(), sockaddrAddress, remoteAddress);
Iov.write(iovecAddress, bufferAddress + data.readerIndex(), data.readableBytes());
MsgHdr.write(sendmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1);
submissionQueue.addSendmsg(socket.intValue(), sendmsgBufferAddr, (short) 0);
data.writerIndex(), (short) -1);
return true;
}
return scheduleSendmsg(remoteAddress, bufferAddress, data.readableBytes());
}
private boolean scheduleSendmsg(InetSocketAddress remoteAddress, long bufferAddress, int bufferLength) {
MsgHdrMemory hdr = sendmsgHdrs.nextHdr();
if (hdr == null) {
// There is no MsgHdrMemory left to use. We need to submit and wait for the writes to complete
// before we can write again.
return false;
}
hdr.write(socket, remoteAddress, bufferAddress, bufferLength);
submissionQueue().addSendmsg(socket.intValue(), hdr.address(), (short) hdr.idx());
return true;
}
}
private IOException translateForConnected(NativeIoException e) {
private static IOException translateForConnected(NativeIoException e) {
// We need to correctly translate connect errors to match NIO behaviour.
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
PortUnreachableException error = new PortUnreachableException(e.getMessage());

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelOption;
@ -24,6 +25,7 @@ import io.netty.channel.MessageSizeEstimator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.DatagramChannelConfig;
import io.netty.util.internal.ObjectUtil;
import java.io.IOException;
import java.net.InetAddress;
@ -33,6 +35,7 @@ import java.util.Map;
public final class IOUringDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig {
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
private boolean activeOnOpen;
private volatile int maxDatagramSize;
IOUringDatagramChannelConfig(AbstractIOUringChannel channel) {
super(channel);
@ -49,7 +52,7 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp
ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL,
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION,
IOUringChannelOption.SO_REUSEPORT, IOUringChannelOption.IP_FREEBIND,
IOUringChannelOption.IP_TRANSPARENT);
IOUringChannelOption.IP_TRANSPARENT, IOUringChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE);
}
@SuppressWarnings({ "unchecked", "deprecation" })
@ -94,6 +97,9 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp
if (option == IOUringChannelOption.IP_FREEBIND) {
return (T) Boolean.valueOf(isFreeBind());
}
if (option == IOUringChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
return (T) Integer.valueOf(getMaxDatagramPayloadSize());
}
return super.getOption(option);
}
@ -128,6 +134,8 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp
setFreeBind((Boolean) value);
} else if (option == IOUringChannelOption.IP_TRANSPARENT) {
setIpTransparent((Boolean) value);
} else if (option == IOUringChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
setMaxDatagramPayloadSize((Integer) value);
} else {
return super.setOption(option, value);
}
@ -463,4 +471,25 @@ public final class IOUringDatagramChannelConfig extends DefaultChannelConfig imp
throw new ChannelException(e);
}
}
/**
* Set the maximum {@link io.netty.channel.socket.DatagramPacket} size. This will be used to determine if
* a batch of {@code IORING_IO_RECVMSG} should be used when reading from the underlying socket.
* When batched {@code recvmmsg} is used
* we may be able to read multiple {@link io.netty.channel.socket.DatagramPacket}s with one syscall and so
* greatly improve the performance. This number will be used to slice {@link ByteBuf}s returned by the used
* {@link RecvByteBufAllocator}. You can use {@code 0} to disable the usage of batching, any other bigger value
* will enable it.
*/
public IOUringDatagramChannelConfig setMaxDatagramPayloadSize(int maxDatagramSize) {
this.maxDatagramSize = ObjectUtil.checkPositiveOrZero(maxDatagramSize, "maxDatagramSize");
return this;
}
/**
* Get the maximum {@link io.netty.channel.socket.DatagramPacket} size.
*/
public int getMaxDatagramPayloadSize() {
return maxDatagramSize;
}
}

View File

@ -235,10 +235,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
return;
}
if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT || op == Native.IORING_OP_RECVMSG) {
handleRead(channel, res);
handleRead(channel, res, data);
} else if (op == Native.IORING_OP_WRITEV ||
op == Native.IORING_OP_WRITE || op == Native.IORING_OP_SENDMSG) {
handleWrite(channel, res);
handleWrite(channel, res, data);
} else if (op == Native.IORING_OP_POLL_ADD) {
handlePollAdd(channel, res, data);
} else if (op == Native.IORING_OP_POLL_REMOVE) {
@ -259,12 +259,12 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
}
}
private void handleRead(AbstractIOUringChannel channel, int res) {
channel.ioUringUnsafe().readComplete(res);
private void handleRead(AbstractIOUringChannel channel, int res, int data) {
channel.ioUringUnsafe().readComplete(res, data);
}
private void handleWrite(AbstractIOUringChannel channel, int res) {
channel.ioUringUnsafe().writeComplete(res);
private void handleWrite(AbstractIOUringChannel channel, int res, int data) {
channel.ioUringUnsafe().writeComplete(res, data);
}
private void handlePollAdd(AbstractIOUringChannel channel, int res, int pollMask) {

View File

@ -37,4 +37,20 @@ final class Iov {
PlatformDependent.putLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN, length);
}
}
static long readBufferAddress(long iovAddress) {
if (Native.SIZEOF_SIZE_T == 4) {
return PlatformDependent.getInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE);
}
assert Native.SIZEOF_SIZE_T == 8;
return PlatformDependent.getLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE);
}
static int readBufferLength(long iovAddress) {
if (Native.SIZEOF_SIZE_T == 4) {
return PlatformDependent.getInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN);
}
assert Native.SIZEOF_SIZE_T == 8;
return (int) PlatformDependent.getLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN);
}
}

View File

@ -0,0 +1,83 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.internal.PlatformDependent;
import java.net.InetSocketAddress;
final class MsgHdrMemory {
private final long memory;
private final int idx;
MsgHdrMemory(int idx) {
this.idx = idx;
int size = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC;
memory = PlatformDependent.allocateMemory(size);
PlatformDependent.setMemory(memory, size, (byte) 0);
}
void write(LinuxSocket socket, InetSocketAddress address, long bufferAddress , int length) {
long sockAddress = memory + Native.SIZEOF_MSGHDR;
long iovAddress = sockAddress + Native.SIZEOF_SOCKADDR_STORAGE;
int addressLength;
if (address == null) {
addressLength = socket.isIpv6() ? Native.SIZEOF_SOCKADDR_IN6 : Native.SIZEOF_SOCKADDR_IN;
PlatformDependent.setMemory(sockAddress, Native.SIZEOF_SOCKADDR_STORAGE, (byte) 0);
} else {
addressLength = SockaddrIn.write(socket.isIpv6(), sockAddress, address);
}
Iov.write(iovAddress, bufferAddress, length);
MsgHdr.write(memory, sockAddress, addressLength, iovAddress, 1);
}
DatagramPacket read(IOUringDatagramChannel channel, ByteBuf buffer, int bytesRead) {
long sockAddress = memory + Native.SIZEOF_MSGHDR;
IOUringEventLoop eventLoop = (IOUringEventLoop) channel.eventLoop();
InetSocketAddress sender;
if (channel.socket.isIpv6()) {
byte[] bytes = eventLoop.inet6AddressArray();
sender = SockaddrIn.readIPv6(sockAddress, bytes);
} else {
byte[] bytes = eventLoop.inet4AddressArray();
sender = SockaddrIn.readIPv4(sockAddress, bytes);
}
long iovAddress = memory + Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE;
long bufferAddress = Iov.readBufferAddress(iovAddress);
int bufferLength = Iov.readBufferLength(iovAddress);
// reconstruct the reader index based on the memoryAddress of the buffer and the bufferAddress that was used
// in the iovec.
int readerIndex = (int) (bufferAddress - buffer.memoryAddress());
ByteBuf slice = buffer.slice(readerIndex, bufferLength)
.writerIndex(bytesRead);
return new DatagramPacket(slice.retain(), channel.localAddress(), sender);
}
int idx() {
return idx;
}
long address() {
return memory;
}
void release() {
PlatformDependent.freeMemory(memory);
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
final class MsgHdrMemoryArray {
private int idx;
private final MsgHdrMemory[] hdrs;
private final int capacity;
MsgHdrMemoryArray(int capacity) {
this.capacity = capacity;
hdrs = new MsgHdrMemory[capacity];
for (int i = 0; i < hdrs.length; i++) {
hdrs[i] = new MsgHdrMemory(i);
}
}
MsgHdrMemory nextHdr() {
if (idx == hdrs.length - 1) {
return null;
}
return hdrs[idx++];
}
MsgHdrMemory hdr(int idx) {
return hdrs[idx];
}
void clear() {
idx = 0;
}
int length() {
return idx;
}
void release() {
for (MsgHdrMemory hdr: hdrs) {
hdr.release();
}
}
int capacity() {
return capacity;
}
}

View File

@ -29,11 +29,11 @@ final class SockaddrIn {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff };
private SockaddrIn() { }
static void write(boolean ipv6, long memory, InetSocketAddress address) {
static int write(boolean ipv6, long memory, InetSocketAddress address) {
if (ipv6) {
SockaddrIn.writeIPv6(memory, address.getAddress(), address.getPort());
return SockaddrIn.writeIPv6(memory, address.getAddress(), address.getPort());
} else {
SockaddrIn.writeIPv4(memory, address.getAddress(), address.getPort());
return SockaddrIn.writeIPv4(memory, address.getAddress(), address.getPort());
}
}