Add IOUringDatagramChannel and so also support UDP (#10588)
Motivation: We can also support UDP / Datagram based on io_uring, so we should do it for maximal performance Modifications: - Add IOUringDatagramChannel - Add tests based on our transport testsuite for it Result: UDP / Datagram is supported via io_uring as well now
This commit is contained in:
parent
0421c9c751
commit
09a0b78a81
@ -323,6 +323,56 @@ static jint netty_io_uring_in6AddressOffsetofS6Addr(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct in6_addr, s6_addr);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_sizeofSockaddrStorage(JNIEnv* env, jclass clazz) {
|
||||
return sizeof(struct sockaddr_storage);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_sizeofSizeT(JNIEnv* env, jclass clazz) {
|
||||
return sizeof(size_t);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_sizeofIovec(JNIEnv* env, jclass clazz) {
|
||||
return sizeof(struct iovec);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_iovecOffsetofIovBase(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct iovec, iov_base);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_iovecOffsetofIovLen(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct iovec, iov_len);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_sizeofMsghdr(JNIEnv* env, jclass clazz) {
|
||||
return sizeof(struct msghdr);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_msghdrOffsetofMsgName(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct msghdr, msg_name);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_msghdrOffsetofMsgNamelen(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct msghdr, msg_namelen);
|
||||
}
|
||||
static jint netty_io_uring_msghdrOffsetofMsgIov(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct msghdr, msg_iov);
|
||||
}
|
||||
static jint netty_io_uring_msghdrOffsetofMsgIovlen(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct msghdr, msg_iovlen);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_msghdrOffsetofMsgControl(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct msghdr, msg_control);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_msghdrOffsetofMsgControllen(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct msghdr, msg_controllen);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_msghdrOffsetofMsgFlags(JNIEnv* env, jclass clazz) {
|
||||
return offsetof(struct msghdr, msg_flags);
|
||||
}
|
||||
|
||||
static jint netty_io_uring_etime(JNIEnv* env, jclass clazz) {
|
||||
return ETIME;
|
||||
}
|
||||
@ -379,6 +429,14 @@ static jint netty_io_uring_ioringOpClose(JNIEnv* env, jclass clazz) {
|
||||
return IORING_OP_CLOSE;
|
||||
}
|
||||
|
||||
static jint netty_io_uring_ioringOpSendmsg(JNIEnv* env, jclass clazz) {
|
||||
return IORING_OP_SENDMSG;
|
||||
}
|
||||
|
||||
static jint netty_io_uring_ioringOpRecvmsg(JNIEnv* env, jclass clazz) {
|
||||
return IORING_OP_RECVMSG;
|
||||
}
|
||||
|
||||
static jint netty_io_uring_ioringEnterGetevents(JNIEnv* env, jclass clazz) {
|
||||
return IORING_ENTER_GETEVENTS;
|
||||
}
|
||||
@ -406,6 +464,19 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
|
||||
{ "sockaddrIn6OffsetofSin6Addr", "()I", (void *) netty_io_uring_sockaddrIn6OffsetofSin6Addr },
|
||||
{ "sockaddrIn6OffsetofSin6ScopeId", "()I", (void *) netty_io_uring_sockaddrIn6OffsetofSin6ScopeId },
|
||||
{ "in6AddressOffsetofS6Addr", "()I", (void *) netty_io_uring_in6AddressOffsetofS6Addr },
|
||||
{ "sizeofSockaddrStorage", "()I", (void *) netty_io_uring_sizeofSockaddrStorage },
|
||||
{ "sizeofSizeT", "()I", (void *) netty_io_uring_sizeofSizeT },
|
||||
{ "sizeofIovec", "()I", (void *) netty_io_uring_sizeofIovec },
|
||||
{ "iovecOffsetofIovBase", "()I", (void *) netty_io_uring_iovecOffsetofIovBase },
|
||||
{ "iovecOffsetofIovLen", "()I", (void *) netty_io_uring_iovecOffsetofIovLen },
|
||||
{ "sizeofMsghdr", "()I", (void *) netty_io_uring_sizeofMsghdr },
|
||||
{ "msghdrOffsetofMsgName", "()I", (void *) netty_io_uring_msghdrOffsetofMsgName },
|
||||
{ "msghdrOffsetofMsgNamelen", "()I", (void *) netty_io_uring_msghdrOffsetofMsgNamelen },
|
||||
{ "msghdrOffsetofMsgIov", "()I", (void *) netty_io_uring_msghdrOffsetofMsgIov },
|
||||
{ "msghdrOffsetofMsgIovlen", "()I", (void *) netty_io_uring_msghdrOffsetofMsgIovlen },
|
||||
{ "msghdrOffsetofMsgControl", "()I", (void *) netty_io_uring_msghdrOffsetofMsgControl },
|
||||
{ "msghdrOffsetofMsgControllen", "()I", (void *) netty_io_uring_msghdrOffsetofMsgControllen },
|
||||
{ "msghdrOffsetofMsgFlags", "()I", (void *) netty_io_uring_msghdrOffsetofMsgFlags },
|
||||
{ "etime", "()I", (void *) netty_io_uring_etime },
|
||||
{ "ecanceled", "()I", (void *) netty_io_uring_ecanceled },
|
||||
{ "pollin", "()I", (void *) netty_io_uring_pollin },
|
||||
@ -420,6 +491,8 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
|
||||
{ "ioringOpWrite", "()I", (void *) netty_io_uring_ioringOpWrite },
|
||||
{ "ioringOpConnect", "()I", (void *) netty_io_uring_ioringOpConnect },
|
||||
{ "ioringOpClose", "()I", (void *) netty_io_uring_ioringOpClose },
|
||||
{ "ioringOpSendmsg", "()I", (void *) netty_io_uring_ioringOpSendmsg },
|
||||
{ "ioringOpRecvmsg", "()I", (void *) netty_io_uring_ioringOpRecvmsg },
|
||||
{ "ioringEnterGetevents", "()I", (void *) netty_io_uring_ioringEnterGetevents },
|
||||
{ "iosqeAsync", "()I", (void *) netty_io_uring_iosqeAsync }
|
||||
};
|
||||
|
@ -17,6 +17,7 @@ package io.netty.channel.uring;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufHolder;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.AbstractChannel;
|
||||
@ -29,7 +30,6 @@ import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.ChannelPromiseNotifier;
|
||||
import io.netty.channel.ConnectTimeoutException;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
@ -38,9 +38,6 @@ import io.netty.channel.socket.SocketChannelConfig;
|
||||
import io.netty.channel.unix.Buffer;
|
||||
import io.netty.channel.unix.Errors;
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.unix.NativeInetAddress;
|
||||
import io.netty.channel.unix.Socket;
|
||||
import io.netty.channel.unix.UnixChannel;
|
||||
import io.netty.channel.unix.UnixChannelUtil;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
@ -48,8 +45,6 @@ import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
@ -82,7 +77,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
|
||||
private ChannelPromise delayedClose;
|
||||
private boolean inputClosedSeenErrorOnRead;
|
||||
static final int SOCK_ADDR_LEN = 128;
|
||||
|
||||
/**
|
||||
* The future of the current connection attempt. If not null, subsequent connection attempts will fail.
|
||||
@ -257,8 +251,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
|
||||
//deregister
|
||||
// Channel/ChannelHandlerContext.read() was called
|
||||
@Override
|
||||
protected void doBeginRead() {
|
||||
if ((ioState & POLL_IN_SCHEDULED) == 0) {
|
||||
@ -286,39 +278,21 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
if (msgCount == 0) {
|
||||
return;
|
||||
}
|
||||
ByteBuf msg = (ByteBuf) in.current();
|
||||
if (msgCount > 1 ||
|
||||
// We also need some special handling for CompositeByteBuf
|
||||
msg.nioBufferCount() > 1) {
|
||||
doWriteMultiple(in);
|
||||
} else if (msgCount == 1) {
|
||||
doWriteSingle(msg);
|
||||
}
|
||||
}
|
||||
Object msg = in.current();
|
||||
|
||||
private void doWriteMultiple(ChannelOutboundBuffer in) {
|
||||
final IovArray iovecArray = ((IOUringEventLoop) eventLoop()).iovArray();
|
||||
try {
|
||||
int offset = iovecArray.count();
|
||||
in.forEachFlushedMessage(iovecArray);
|
||||
submissionQueue().addWritev(socket.intValue(),
|
||||
iovecArray.memoryAddress(offset), iovecArray.count() - offset);
|
||||
ioState |= WRITE_SCHEDULED;
|
||||
} catch (Exception e) {
|
||||
// This should never happen, anyway fallback to single write.
|
||||
doWriteSingle((ByteBuf) in.current());
|
||||
}
|
||||
}
|
||||
|
||||
protected final void doWriteSingle(ByteBuf buf) {
|
||||
assert (ioState & WRITE_SCHEDULED) == 0;
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
|
||||
buf.writerIndex());
|
||||
if (msgCount > 1) {
|
||||
ioUringUnsafe().scheduleWriteMultiple(in);
|
||||
} else if ((msg instanceof ByteBuf) && ((ByteBuf) msg).nioBufferCount() > 1 ||
|
||||
((msg instanceof ByteBufHolder) && ((ByteBufHolder) msg).content().nioBufferCount() > 1)) {
|
||||
// We also need some special handling for CompositeByteBuf
|
||||
ioUringUnsafe().scheduleWriteMultiple(in);
|
||||
} else {
|
||||
ioUringUnsafe().scheduleWriteSingle(msg);
|
||||
}
|
||||
ioState |= WRITE_SCHEDULED;
|
||||
}
|
||||
|
||||
//POLLOUT
|
||||
private void schedulePollOut() {
|
||||
assert (ioState & POLL_OUT_SCHEDULED) == 0;
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
@ -326,16 +300,31 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
ioState |= POLL_OUT_SCHEDULED;
|
||||
}
|
||||
|
||||
void schedulePollRdHup() {
|
||||
final void schedulePollRdHup() {
|
||||
assert (ioState & POLL_RDHUP_SCHEDULED) == 0;
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addPollRdHup(fd().intValue());
|
||||
ioState |= POLL_RDHUP_SCHEDULED;
|
||||
}
|
||||
|
||||
final void resetCachedAddresses() {
|
||||
local = socket.localAddress();
|
||||
remote = socket.remoteAddress();
|
||||
}
|
||||
|
||||
abstract class AbstractUringUnsafe extends AbstractUnsafe {
|
||||
private IOUringRecvByteAllocatorHandle allocHandle;
|
||||
|
||||
/**
|
||||
* Schedule the write of multiple messages in the {@link ChannelOutboundBuffer}.
|
||||
*/
|
||||
protected abstract void scheduleWriteMultiple(ChannelOutboundBuffer in);
|
||||
|
||||
/**
|
||||
* Schedule the write of a singe message.
|
||||
*/
|
||||
protected abstract void scheduleWriteSingle(Object msg);
|
||||
|
||||
@Override
|
||||
public void close(ChannelPromise promise) {
|
||||
if ((ioState & (WRITE_SCHEDULED | READ_SCHEDULED | CONNECT_SCHEDULED)) == 0) {
|
||||
@ -414,19 +403,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
|
||||
IOUringRecvByteAllocatorHandle newIOUringHandle(RecvByteBufAllocator.ExtendedHandle handle) {
|
||||
final IOUringRecvByteAllocatorHandle newIOUringHandle(RecvByteBufAllocator.ExtendedHandle handle) {
|
||||
return new IOUringRecvByteAllocatorHandle(handle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringRecvByteAllocatorHandle recvBufAllocHandle() {
|
||||
public final IOUringRecvByteAllocatorHandle recvBufAllocHandle() {
|
||||
if (allocHandle == null) {
|
||||
allocHandle = newIOUringHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
|
||||
}
|
||||
return allocHandle;
|
||||
}
|
||||
|
||||
void shutdownInput(boolean rdHup) {
|
||||
final void shutdownInput(boolean rdHup) {
|
||||
logger.trace("shutdownInput Fd: {}", fd().intValue());
|
||||
if (!socket.isInputShutdown()) {
|
||||
if (isAllowHalfClosure(config())) {
|
||||
@ -456,7 +445,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
close(voidPromise());
|
||||
}
|
||||
|
||||
void schedulePollIn() {
|
||||
final void schedulePollIn() {
|
||||
assert (ioState & POLL_IN_SCHEDULED) == 0;
|
||||
if (!isActive() || shouldBreakIoUringInReady(config())) {
|
||||
return;
|
||||
@ -466,7 +455,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
submissionQueue.addPollIn(socket.intValue());
|
||||
}
|
||||
|
||||
void processDelayedClose() {
|
||||
final void processDelayedClose() {
|
||||
ChannelPromise promise = delayedClose;
|
||||
if (promise != null && (ioState & (READ_SCHEDULED | WRITE_SCHEDULED | CONNECT_SCHEDULED)) == 0) {
|
||||
delayedClose = null;
|
||||
@ -480,6 +469,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
readComplete0(res);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once a read was completed.
|
||||
*/
|
||||
protected abstract void readComplete0(int res);
|
||||
|
||||
/**
|
||||
@ -534,6 +526,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A read should be scheduled.
|
||||
*/
|
||||
protected abstract void scheduleRead0();
|
||||
|
||||
/**
|
||||
@ -574,16 +569,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
schedulePollOut();
|
||||
}
|
||||
}
|
||||
} else if (!getSocket().isOutputShutdown()) {
|
||||
} else if (!socket.isOutputShutdown()) {
|
||||
// Try writing again
|
||||
super.flush0();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once a write was completed.
|
||||
*/
|
||||
final void writeComplete(int res) {
|
||||
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
|
||||
if (res >= 0) {
|
||||
channelOutboundBuffer.removeBytes(res);
|
||||
removeFromOutboundBuffer(channelOutboundBuffer, res);
|
||||
// We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
|
||||
// while still removing messages internally in removeBytes(...) which then may corrupt state.
|
||||
ioState &= ~WRITE_SCHEDULED;
|
||||
@ -601,7 +599,17 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
|
||||
final void connectComplete(int res) {
|
||||
/**
|
||||
* Called once a write completed and we should remove message(s) from the {@link ChannelOutboundBuffer}
|
||||
*/
|
||||
protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) {
|
||||
outboundBuffer.removeBytes(bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect was completed.
|
||||
*/
|
||||
void connectComplete(int res) {
|
||||
ioState &= ~CONNECT_SCHEDULED;
|
||||
freeRemoteAddressMemory();
|
||||
|
||||
@ -646,19 +654,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
doConnect(remoteAddress, localAddress);
|
||||
InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
|
||||
|
||||
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
|
||||
remoteAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
|
||||
long remoteAddressMemoryAddress = Buffer.memoryAddress(remoteAddressMemory);
|
||||
|
||||
if (socket.isIpv6()) {
|
||||
SockaddrIn.writeIPv6(remoteAddressMemoryAddress, inetSocketAddress.getAddress(),
|
||||
inetSocketAddress.getPort());
|
||||
} else {
|
||||
SockaddrIn.writeIPv4(remoteAddressMemoryAddress, inetSocketAddress.getAddress(),
|
||||
inetSocketAddress.getPort());
|
||||
}
|
||||
SockaddrIn.write(socket.isIpv6(), remoteAddressMemoryAddress, inetSocketAddress);
|
||||
|
||||
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
|
||||
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN);
|
||||
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress,
|
||||
Native.SIZEOF_SOCKADDR_STORAGE);
|
||||
ioState |= CONNECT_SCHEDULED;
|
||||
} catch (Throwable t) {
|
||||
closeIfClosed();
|
||||
@ -714,7 +717,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDeregister() {
|
||||
protected final void doDeregister() {
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
|
||||
if (submissionQueue != null) {
|
||||
@ -735,7 +738,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doBind(final SocketAddress local) throws Exception {
|
||||
protected void doBind(final SocketAddress local) throws Exception {
|
||||
if (local instanceof InetSocketAddress) {
|
||||
checkResolvable((InetSocketAddress) local);
|
||||
}
|
||||
@ -749,9 +752,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract DefaultChannelConfig config();
|
||||
|
||||
@Override
|
||||
protected SocketAddress localAddress0() {
|
||||
return local;
|
||||
@ -762,10 +762,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
return remote;
|
||||
}
|
||||
|
||||
protected Socket getSocket() {
|
||||
return socket;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to the remote peer
|
||||
*/
|
||||
|
@ -39,12 +39,12 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
|
||||
protected AbstractIOUringServerChannel(LinuxSocket socket, boolean active) {
|
||||
super(null, socket, active);
|
||||
|
||||
acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(SOCK_ADDR_LEN);
|
||||
acceptedAddressMemory = Buffer.allocateDirectWithNativeOrder(Native.SIZEOF_SOCKADDR_STORAGE);
|
||||
acceptedAddressMemoryAddress = Buffer.memoryAddress(acceptedAddressMemory);
|
||||
acceptedAddressLengthMemory = Buffer.allocateDirectWithNativeOrder(Long.BYTES);
|
||||
// Needs to be initialized to the size of acceptedAddressMemory.
|
||||
// See https://man7.org/linux/man-pages/man2/accept.2.html
|
||||
acceptedAddressLengthMemory.putLong(0, SOCK_ADDR_LEN);
|
||||
acceptedAddressLengthMemory.putLong(0, Native.SIZEOF_SOCKADDR_STORAGE);
|
||||
acceptedAddressLengthMemoryAddress = Buffer.memoryAddress(acceptedAddressLengthMemory);
|
||||
}
|
||||
|
||||
@ -74,6 +74,16 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
|
||||
|
||||
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
|
||||
|
||||
@Override
|
||||
protected void scheduleWriteMultiple(ChannelOutboundBuffer in) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scheduleWriteSingle(Object msg) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scheduleRead0() {
|
||||
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
||||
|
@ -19,10 +19,12 @@ import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.socket.DuplexChannel;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
@ -208,6 +210,28 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
|
||||
private ByteBuf readBuffer;
|
||||
|
||||
@Override
|
||||
protected void scheduleWriteMultiple(ChannelOutboundBuffer in) {
|
||||
final IovArray iovecArray = ((IOUringEventLoop) eventLoop()).iovArray();
|
||||
try {
|
||||
int offset = iovecArray.count();
|
||||
in.forEachFlushedMessage(iovecArray);
|
||||
submissionQueue().addWritev(socket.intValue(),
|
||||
iovecArray.memoryAddress(offset), iovecArray.count() - offset);
|
||||
} catch (Exception e) {
|
||||
// This should never happen, anyway fallback to single write.
|
||||
scheduleWriteSingle(in.current());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scheduleWriteSingle(Object msg) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
|
||||
buf.writerIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scheduleRead0() {
|
||||
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
||||
|
@ -0,0 +1,550 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.AddressedEnvelope;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelMetadata;
|
||||
import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.DefaultAddressedEnvelope;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.DatagramPacket;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.channel.unix.Buffer;
|
||||
import io.netty.channel.unix.Errors;
|
||||
import io.netty.channel.unix.Errors.NativeIoException;
|
||||
import io.netty.channel.unix.Socket;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.net.PortUnreachableException;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import static io.netty.channel.unix.Errors.ioResult;
|
||||
|
||||
public final class IOUringDatagramChannel extends AbstractIOUringChannel implements DatagramChannel {
|
||||
private static final ChannelMetadata METADATA = new ChannelMetadata(true);
|
||||
private static final String EXPECTED_TYPES =
|
||||
" (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
|
||||
StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
|
||||
StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
||||
StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
|
||||
StringUtil.simpleClassName(ByteBuf.class) + ')';
|
||||
|
||||
private final IOUringDatagramChannelConfig config;
|
||||
private volatile boolean connected;
|
||||
|
||||
/**
|
||||
* Create a new instance which selects the {@link InternetProtocolFamily} to use depending
|
||||
* on the Operation Systems default which will be chosen.
|
||||
*/
|
||||
public IOUringDatagramChannel() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance using the given {@link InternetProtocolFamily}. If {@code null} is used it will depend
|
||||
* on the Operation Systems default which will be chosen.
|
||||
*/
|
||||
public IOUringDatagramChannel(InternetProtocolFamily family) {
|
||||
this(family == null ?
|
||||
LinuxSocket.newSocketDgram(Socket.isIPv6Preferred()) :
|
||||
LinuxSocket.newSocketDgram(family == InternetProtocolFamily.IPv6), false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance which selects the {@link InternetProtocolFamily} to use depending
|
||||
* on the Operation Systems default which will be chosen.
|
||||
*/
|
||||
public IOUringDatagramChannel(int fd) {
|
||||
this(new LinuxSocket(fd), true);
|
||||
}
|
||||
|
||||
private IOUringDatagramChannel(LinuxSocket fd, boolean active) {
|
||||
super(null, fd, active);
|
||||
config = new IOUringDatagramChannelConfig(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress remoteAddress() {
|
||||
return (InetSocketAddress) super.remoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetSocketAddress localAddress() {
|
||||
return (InetSocketAddress) super.localAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelMetadata metadata() {
|
||||
return METADATA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive() {
|
||||
return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isConnected() {
|
||||
return connected;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture joinGroup(InetAddress multicastAddress) {
|
||||
return joinGroup(multicastAddress, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
|
||||
try {
|
||||
return joinGroup(
|
||||
multicastAddress,
|
||||
NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
|
||||
} catch (IOException e) {
|
||||
promise.setFailure(e);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture joinGroup(
|
||||
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
|
||||
return joinGroup(multicastAddress, networkInterface, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture joinGroup(
|
||||
InetSocketAddress multicastAddress, NetworkInterface networkInterface,
|
||||
ChannelPromise promise) {
|
||||
return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture joinGroup(
|
||||
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
|
||||
return joinGroup(multicastAddress, networkInterface, source, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture joinGroup(
|
||||
final InetAddress multicastAddress, final NetworkInterface networkInterface,
|
||||
final InetAddress source, final ChannelPromise promise) {
|
||||
|
||||
ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
|
||||
ObjectUtil.checkNotNull(networkInterface, "networkInterface");
|
||||
|
||||
try {
|
||||
socket.joinGroup(multicastAddress, networkInterface, source);
|
||||
promise.setSuccess();
|
||||
} catch (IOException e) {
|
||||
promise.setFailure(e);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture leaveGroup(InetAddress multicastAddress) {
|
||||
return leaveGroup(multicastAddress, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
|
||||
try {
|
||||
return leaveGroup(
|
||||
multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
|
||||
} catch (IOException e) {
|
||||
promise.setFailure(e);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture leaveGroup(
|
||||
InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
|
||||
return leaveGroup(multicastAddress, networkInterface, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture leaveGroup(
|
||||
InetSocketAddress multicastAddress,
|
||||
NetworkInterface networkInterface, ChannelPromise promise) {
|
||||
return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture leaveGroup(
|
||||
InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
|
||||
return leaveGroup(multicastAddress, networkInterface, source, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture leaveGroup(
|
||||
final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
|
||||
final ChannelPromise promise) {
|
||||
ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
|
||||
ObjectUtil.checkNotNull(networkInterface, "networkInterface");
|
||||
|
||||
try {
|
||||
socket.leaveGroup(multicastAddress, networkInterface, source);
|
||||
promise.setSuccess();
|
||||
} catch (IOException e) {
|
||||
promise.setFailure(e);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture block(
|
||||
InetAddress multicastAddress, NetworkInterface networkInterface,
|
||||
InetAddress sourceToBlock) {
|
||||
return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture block(
|
||||
final InetAddress multicastAddress, final NetworkInterface networkInterface,
|
||||
final InetAddress sourceToBlock, final ChannelPromise promise) {
|
||||
ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
|
||||
ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
|
||||
ObjectUtil.checkNotNull(networkInterface, "networkInterface");
|
||||
|
||||
promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
|
||||
return block(multicastAddress, sourceToBlock, newPromise());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture block(
|
||||
InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
|
||||
try {
|
||||
return block(
|
||||
multicastAddress,
|
||||
NetworkInterface.getByInetAddress(localAddress().getAddress()),
|
||||
sourceToBlock, promise);
|
||||
} catch (Throwable e) {
|
||||
promise.setFailure(e);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AbstractUringUnsafe newUnsafe() {
|
||||
return new IOUringDatagramChannelUnsafe();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doBind(SocketAddress localAddress) throws Exception {
|
||||
if (localAddress instanceof InetSocketAddress) {
|
||||
InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
|
||||
if (socketAddress.getAddress().isAnyLocalAddress() &&
|
||||
socketAddress.getAddress() instanceof Inet4Address && Socket.isIPv6Preferred()) {
|
||||
localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
|
||||
}
|
||||
}
|
||||
super.doBind(localAddress);
|
||||
active = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof DatagramPacket) {
|
||||
DatagramPacket packet = (DatagramPacket) msg;
|
||||
ByteBuf content = packet.content();
|
||||
return !content.hasMemoryAddress() ?
|
||||
new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
|
||||
}
|
||||
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
return !buf.hasMemoryAddress()? newDirectBuffer(buf) : buf;
|
||||
}
|
||||
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
|
||||
if (e.content() instanceof ByteBuf &&
|
||||
(e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
|
||||
|
||||
ByteBuf content = (ByteBuf) e.content();
|
||||
return !content.hasMemoryAddress()?
|
||||
new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
|
||||
newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
|
||||
}
|
||||
}
|
||||
|
||||
throw new UnsupportedOperationException(
|
||||
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doDisconnect() throws Exception {
|
||||
// TODO: use io_uring for this too...
|
||||
socket.disconnect();
|
||||
connected = active = false;
|
||||
|
||||
resetCachedAddresses();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doClose() throws Exception {
|
||||
super.doClose();
|
||||
((IOUringDatagramChannelUnsafe) unsafe()).releaseBuffers();
|
||||
connected = false;
|
||||
}
|
||||
|
||||
final class IOUringDatagramChannelUnsafe extends AbstractUringUnsafe {
|
||||
private ByteBuf readBuffer;
|
||||
private boolean recvMsg;
|
||||
|
||||
// These buffers are used for msghdr, iov, sockaddr_in / sockaddr_in6 when doing recvmsg / sendmsg
|
||||
//
|
||||
// TODO: Alternative we could also allocate these everytime from the ByteBufAllocator or we could use
|
||||
// some sort of other pool. Let's keep it simple for now.
|
||||
private ByteBuffer recvmsgBuffer;
|
||||
private long recvmsgBufferAddr = -1;
|
||||
private ByteBuffer sendmsgBuffer;
|
||||
private long sendmsgBufferAddr = -1;
|
||||
|
||||
private long sendmsgBufferAddr() {
|
||||
long address = this.sendmsgBufferAddr;
|
||||
if (address == -1) {
|
||||
assert sendmsgBuffer == null;
|
||||
int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC;
|
||||
sendmsgBuffer = Buffer.allocateDirectWithNativeOrder(length);
|
||||
sendmsgBufferAddr = address = Buffer.memoryAddress(sendmsgBuffer);
|
||||
|
||||
// memset once
|
||||
PlatformDependent.setMemory(address, length, (byte) 0);
|
||||
}
|
||||
return address;
|
||||
}
|
||||
|
||||
private long recvmsgBufferAddr() {
|
||||
long address = this.recvmsgBufferAddr;
|
||||
if (address == -1) {
|
||||
assert recvmsgBuffer == null;
|
||||
int length = Native.SIZEOF_MSGHDR + Native.SIZEOF_SOCKADDR_STORAGE + Native.SIZEOF_IOVEC;
|
||||
recvmsgBuffer = Buffer.allocateDirectWithNativeOrder(length);
|
||||
recvmsgBufferAddr = address = Buffer.memoryAddress(recvmsgBuffer);
|
||||
|
||||
// memset once
|
||||
PlatformDependent.setMemory(address, length, (byte) 0);
|
||||
}
|
||||
return address;
|
||||
}
|
||||
|
||||
void releaseBuffers() {
|
||||
if (sendmsgBuffer != null) {
|
||||
Buffer.free(sendmsgBuffer);
|
||||
sendmsgBuffer = null;
|
||||
sendmsgBufferAddr = -1;
|
||||
}
|
||||
|
||||
if (recvmsgBuffer != null) {
|
||||
Buffer.free(recvmsgBuffer);
|
||||
recvmsgBuffer = null;
|
||||
recvmsgBufferAddr = -1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void readComplete0(int res) {
|
||||
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
||||
final ChannelPipeline pipeline = pipeline();
|
||||
ByteBuf byteBuf = this.readBuffer;
|
||||
this.readBuffer = null;
|
||||
assert byteBuf != null;
|
||||
boolean recvmsg = this.recvMsg;
|
||||
this.recvMsg = false;
|
||||
|
||||
try {
|
||||
if (res < 0) {
|
||||
// If res is negative we should pass it to ioResult(...) which will either throw
|
||||
// or convert it to 0 if we could not read because the socket was not readable.
|
||||
allocHandle.lastBytesRead(ioResult("io_uring read / recvmsg", res));
|
||||
} else if (res > 0) {
|
||||
byteBuf.writerIndex(byteBuf.writerIndex() + res);
|
||||
allocHandle.lastBytesRead(res);
|
||||
} else {
|
||||
allocHandle.lastBytesRead(-1);
|
||||
}
|
||||
if (allocHandle.lastBytesRead() <= 0) {
|
||||
// nothing was read, release the buffer.
|
||||
byteBuf.release();
|
||||
byteBuf = null;
|
||||
|
||||
allocHandle.readComplete();
|
||||
pipeline.fireChannelReadComplete();
|
||||
return;
|
||||
}
|
||||
DatagramPacket packet;
|
||||
if (!recvmsg) {
|
||||
packet = new DatagramPacket(byteBuf, IOUringDatagramChannel.this.localAddress(),
|
||||
IOUringDatagramChannel.this.remoteAddress());
|
||||
} else {
|
||||
long sockaddrAddress = recvmsgBufferAddr() + Native.SIZEOF_MSGHDR;
|
||||
final InetSocketAddress remote;
|
||||
if (socket.isIpv6()) {
|
||||
byte[] bytes = ((IOUringEventLoop) eventLoop()).inet6AddressArray();
|
||||
remote = SockaddrIn.readIPv6(sockaddrAddress, bytes);
|
||||
} else {
|
||||
byte[] bytes = ((IOUringEventLoop) eventLoop()).inet4AddressArray();
|
||||
remote = SockaddrIn.readIPv4(sockaddrAddress, bytes);
|
||||
}
|
||||
packet = new DatagramPacket(byteBuf,
|
||||
IOUringDatagramChannel.this.localAddress(), remote);
|
||||
}
|
||||
allocHandle.incMessagesRead(1);
|
||||
pipeline.fireChannelRead(packet);
|
||||
byteBuf = null;
|
||||
if (allocHandle.continueReading()) {
|
||||
// Let's schedule another read.
|
||||
scheduleRead();
|
||||
} else {
|
||||
// We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
|
||||
allocHandle.readComplete();
|
||||
pipeline.fireChannelReadComplete();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
if (connected && t instanceof NativeIoException) {
|
||||
t = translateForConnected((NativeIoException) t);
|
||||
}
|
||||
pipeline.fireExceptionCaught(t);
|
||||
} finally {
|
||||
if (byteBuf != null) {
|
||||
byteBuf.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scheduleRead0() {
|
||||
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
||||
ByteBuf byteBuf = allocHandle.allocate(alloc());
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
|
||||
assert readBuffer == null;
|
||||
readBuffer = byteBuf;
|
||||
|
||||
recvMsg = !isConnected();
|
||||
long bufferAddress = byteBuf.memoryAddress();
|
||||
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
|
||||
|
||||
if (!recvMsg) {
|
||||
submissionQueue.addRead(socket.intValue(), bufferAddress,
|
||||
byteBuf.writerIndex(), byteBuf.capacity());
|
||||
} else {
|
||||
int addrLen = addrLen();
|
||||
long recvmsgBufferAddr = recvmsgBufferAddr();
|
||||
long sockaddrAddress = recvmsgBufferAddr + Native.SIZEOF_MSGHDR;
|
||||
long iovecAddress = sockaddrAddress + addrLen;
|
||||
|
||||
Iov.write(iovecAddress, bufferAddress + byteBuf.writerIndex(), byteBuf.writableBytes());
|
||||
MsgHdr.write(recvmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1);
|
||||
submissionQueue.addRecvmsg(socket.intValue(), recvmsgBufferAddr);
|
||||
}
|
||||
}
|
||||
|
||||
private int addrLen() {
|
||||
return socket.isIpv6() ? Native.SIZEOF_SOCKADDR_IN6 :
|
||||
Native.SIZEOF_SOCKADDR_IN;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removeFromOutboundBuffer(ChannelOutboundBuffer outboundBuffer, int bytes) {
|
||||
// When using Datagram we should consider the message written as long as there were any bytes written.
|
||||
boolean removed = outboundBuffer.remove();
|
||||
assert removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
void connectComplete(int res) {
|
||||
if (res >= 0) {
|
||||
connected = true;
|
||||
}
|
||||
super.connectComplete(res);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scheduleWriteMultiple(ChannelOutboundBuffer in) {
|
||||
// We always just use scheduleWriteSingle for now.
|
||||
scheduleWriteSingle(in.current());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void scheduleWriteSingle(Object msg) {
|
||||
final ByteBuf data;
|
||||
InetSocketAddress remoteAddress;
|
||||
if (msg instanceof AddressedEnvelope) {
|
||||
@SuppressWarnings("unchecked")
|
||||
AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
|
||||
(AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
|
||||
data = envelope.content();
|
||||
remoteAddress = envelope.recipient();
|
||||
} else {
|
||||
data = (ByteBuf) msg;
|
||||
remoteAddress = null;
|
||||
}
|
||||
|
||||
long bufferAddress = data.memoryAddress();
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
if (remoteAddress == null) {
|
||||
submissionQueue.addWrite(socket.intValue(), bufferAddress, data.readerIndex(),
|
||||
data.writerIndex());
|
||||
} else {
|
||||
int addrLen = addrLen();
|
||||
long sendmsgBufferAddr = sendmsgBufferAddr();
|
||||
long sockaddrAddress = sendmsgBufferAddr + Native.SIZEOF_MSGHDR;
|
||||
long iovecAddress = sockaddrAddress + Native.SIZEOF_SOCKADDR_STORAGE;
|
||||
|
||||
SockaddrIn.write(socket.isIpv6(), sockaddrAddress, remoteAddress);
|
||||
Iov.write(iovecAddress, bufferAddress + data.readerIndex(), data.readableBytes());
|
||||
MsgHdr.write(sendmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1);
|
||||
submissionQueue.addSendmsg(socket.intValue(), sendmsgBufferAddr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private IOException translateForConnected(NativeIoException e) {
|
||||
// We need to correctly translate connect errors to match NIO behaviour.
|
||||
if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
|
||||
PortUnreachableException error = new PortUnreachableException(e.getMessage());
|
||||
error.initCause(e);
|
||||
return error;
|
||||
}
|
||||
return e;
|
||||
}
|
||||
}
|
@ -0,0 +1,466 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.FixedRecvByteBufAllocator;
|
||||
import io.netty.channel.MessageSizeEstimator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.socket.DatagramChannelConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.NetworkInterface;
|
||||
import java.util.Map;
|
||||
|
||||
public final class IOUringDatagramChannelConfig extends DefaultChannelConfig implements DatagramChannelConfig {
|
||||
private static final RecvByteBufAllocator DEFAULT_RCVBUF_ALLOCATOR = new FixedRecvByteBufAllocator(2048);
|
||||
private boolean activeOnOpen;
|
||||
|
||||
IOUringDatagramChannelConfig(AbstractIOUringChannel channel) {
|
||||
super(channel);
|
||||
setRecvByteBufAllocator(DEFAULT_RCVBUF_ALLOCATOR);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public Map<ChannelOption<?>, Object> getOptions() {
|
||||
return getOptions(
|
||||
super.getOptions(),
|
||||
ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF,
|
||||
ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED,
|
||||
ChannelOption.IP_MULTICAST_ADDR, ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL,
|
||||
ChannelOption.IP_TOS, ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION,
|
||||
IOUringChannelOption.SO_REUSEPORT, IOUringChannelOption.IP_FREEBIND,
|
||||
IOUringChannelOption.IP_TRANSPARENT);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "deprecation" })
|
||||
@Override
|
||||
public <T> T getOption(ChannelOption<T> option) {
|
||||
if (option == ChannelOption.SO_BROADCAST) {
|
||||
return (T) Boolean.valueOf(isBroadcast());
|
||||
}
|
||||
if (option == ChannelOption.SO_RCVBUF) {
|
||||
return (T) Integer.valueOf(getReceiveBufferSize());
|
||||
}
|
||||
if (option == ChannelOption.SO_SNDBUF) {
|
||||
return (T) Integer.valueOf(getSendBufferSize());
|
||||
}
|
||||
if (option == ChannelOption.SO_REUSEADDR) {
|
||||
return (T) Boolean.valueOf(isReuseAddress());
|
||||
}
|
||||
if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
|
||||
return (T) Boolean.valueOf(isLoopbackModeDisabled());
|
||||
}
|
||||
if (option == ChannelOption.IP_MULTICAST_ADDR) {
|
||||
return (T) getInterface();
|
||||
}
|
||||
if (option == ChannelOption.IP_MULTICAST_IF) {
|
||||
return (T) getNetworkInterface();
|
||||
}
|
||||
if (option == ChannelOption.IP_MULTICAST_TTL) {
|
||||
return (T) Integer.valueOf(getTimeToLive());
|
||||
}
|
||||
if (option == ChannelOption.IP_TOS) {
|
||||
return (T) Integer.valueOf(getTrafficClass());
|
||||
}
|
||||
if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
|
||||
return (T) Boolean.valueOf(activeOnOpen);
|
||||
}
|
||||
if (option == IOUringChannelOption.SO_REUSEPORT) {
|
||||
return (T) Boolean.valueOf(isReusePort());
|
||||
}
|
||||
if (option == IOUringChannelOption.IP_TRANSPARENT) {
|
||||
return (T) Boolean.valueOf(isIpTransparent());
|
||||
}
|
||||
if (option == IOUringChannelOption.IP_FREEBIND) {
|
||||
return (T) Boolean.valueOf(isFreeBind());
|
||||
}
|
||||
return super.getOption(option);
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("deprecation")
|
||||
public <T> boolean setOption(ChannelOption<T> option, T value) {
|
||||
validate(option, value);
|
||||
|
||||
if (option == ChannelOption.SO_BROADCAST) {
|
||||
setBroadcast((Boolean) value);
|
||||
} else if (option == ChannelOption.SO_RCVBUF) {
|
||||
setReceiveBufferSize((Integer) value);
|
||||
} else if (option == ChannelOption.SO_SNDBUF) {
|
||||
setSendBufferSize((Integer) value);
|
||||
} else if (option == ChannelOption.SO_REUSEADDR) {
|
||||
setReuseAddress((Boolean) value);
|
||||
} else if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
|
||||
setLoopbackModeDisabled((Boolean) value);
|
||||
} else if (option == ChannelOption.IP_MULTICAST_ADDR) {
|
||||
setInterface((InetAddress) value);
|
||||
} else if (option == ChannelOption.IP_MULTICAST_IF) {
|
||||
setNetworkInterface((NetworkInterface) value);
|
||||
} else if (option == ChannelOption.IP_MULTICAST_TTL) {
|
||||
setTimeToLive((Integer) value);
|
||||
} else if (option == ChannelOption.IP_TOS) {
|
||||
setTrafficClass((Integer) value);
|
||||
} else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
|
||||
setActiveOnOpen((Boolean) value);
|
||||
} else if (option == IOUringChannelOption.SO_REUSEPORT) {
|
||||
setReusePort((Boolean) value);
|
||||
} else if (option == IOUringChannelOption.IP_FREEBIND) {
|
||||
setFreeBind((Boolean) value);
|
||||
} else if (option == IOUringChannelOption.IP_TRANSPARENT) {
|
||||
setIpTransparent((Boolean) value);
|
||||
} else {
|
||||
return super.setOption(option, value);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void setActiveOnOpen(boolean activeOnOpen) {
|
||||
if (channel.isRegistered()) {
|
||||
throw new IllegalStateException("Can only changed before channel was registered");
|
||||
}
|
||||
this.activeOnOpen = activeOnOpen;
|
||||
}
|
||||
|
||||
boolean getActiveOnOpen() {
|
||||
return activeOnOpen;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
|
||||
super.setMessageSizeEstimator(estimator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringDatagramChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
|
||||
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringDatagramChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
||||
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
|
||||
super.setWriteBufferWaterMark(writeBufferWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setAutoClose(boolean autoClose) {
|
||||
super.setAutoClose(autoClose);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setAutoRead(boolean autoRead) {
|
||||
super.setAutoRead(autoRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
|
||||
super.setRecvByteBufAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setWriteSpinCount(int writeSpinCount) {
|
||||
super.setWriteSpinCount(writeSpinCount);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setAllocator(ByteBufAllocator allocator) {
|
||||
super.setAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
|
||||
super.setConnectTimeoutMillis(connectTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringDatagramChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
|
||||
super.setMaxMessagesPerRead(maxMessagesPerRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSendBufferSize() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.getSendBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setSendBufferSize(int sendBufferSize) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setSendBufferSize(sendBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReceiveBufferSize() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.getReceiveBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setReceiveBufferSize(int receiveBufferSize) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setReceiveBufferSize(receiveBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTrafficClass() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.getTrafficClass();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setTrafficClass(int trafficClass) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setTrafficClass(trafficClass);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReuseAddress() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.isReuseAddress();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setReuseAddress(boolean reuseAddress) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setReuseAddress(reuseAddress);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isBroadcast() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.isBroadcast();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setBroadcast(boolean broadcast) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setBroadcast(broadcast);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLoopbackModeDisabled() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.isLoopbackModeDisabled();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setLoopbackModeDisabled(boolean loopbackModeDisabled) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setLoopbackModeDisabled(loopbackModeDisabled);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTimeToLive() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.getTimeToLive();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setTimeToLive(int ttl) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setTimeToLive(ttl);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetAddress getInterface() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.getInterface();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setInterface(InetAddress interfaceAddress) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setInterface(interfaceAddress);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NetworkInterface getNetworkInterface() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.getNetworkInterface();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringDatagramChannelConfig setNetworkInterface(NetworkInterface networkInterface) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setNetworkInterface(networkInterface);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the SO_REUSEPORT option is set.
|
||||
*/
|
||||
public boolean isReusePort() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.isReusePort();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the SO_REUSEPORT option on the underlying Channel. This will allow to bind multiple
|
||||
* {@link io.netty.channel.socket.DatagramChannel}s to the same port and so receive datagrams with multiple threads.
|
||||
*
|
||||
* Be aware this method needs be called before
|
||||
* {@link io.netty.channel.socket.DatagramChannel#bind(java.net.SocketAddress)} to have
|
||||
* any affect.
|
||||
*/
|
||||
public IOUringDatagramChannelConfig setReusePort(boolean reusePort) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setReusePort(reusePort);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if <a href="http://man7.org/linux/man-pages/man7/ip.7.html">IP_TRANSPARENT</a> is enabled,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
public boolean isIpTransparent() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.isIpTransparent();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If {@code true} is used <a href="http://man7.org/linux/man-pages/man7/ip.7.html">IP_TRANSPARENT</a> is enabled,
|
||||
* {@code false} for disable it. Default is disabled.
|
||||
*/
|
||||
public IOUringDatagramChannelConfig setIpTransparent(boolean ipTransparent) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setIpTransparent(ipTransparent);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if <a href="http://man7.org/linux/man-pages/man7/ip.7.html">IP_FREEBIND</a> is enabled,
|
||||
* {@code false} otherwise.
|
||||
*/
|
||||
public boolean isFreeBind() {
|
||||
try {
|
||||
return ((AbstractIOUringChannel) channel).socket.isIpFreeBind();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If {@code true} is used <a href="http://man7.org/linux/man-pages/man7/ip.7.html">IP_FREEBIND</a> is enabled,
|
||||
* {@code false} for disable it. Default is disabled.
|
||||
*/
|
||||
public IOUringDatagramChannelConfig setFreeBind(boolean freeBind) {
|
||||
try {
|
||||
((AbstractIOUringChannel) channel).socket.setIpFreeBind(freeBind);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -233,9 +233,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
if (channel == null) {
|
||||
return;
|
||||
}
|
||||
if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) {
|
||||
if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT || op == Native.IORING_OP_RECVMSG) {
|
||||
handleRead(channel, res);
|
||||
} else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) {
|
||||
} else if (op == Native.IORING_OP_WRITEV ||
|
||||
op == Native.IORING_OP_WRITE || op == Native.IORING_OP_SENDMSG) {
|
||||
handleWrite(channel, res);
|
||||
} else if (op == Native.IORING_OP_POLL_ADD) {
|
||||
handlePollAdd(channel, res, data);
|
||||
|
@ -156,7 +156,14 @@ final class IOUringSubmissionQueue {
|
||||
return enqueueSqe(Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0, pollMask);
|
||||
}
|
||||
|
||||
//return true -> submit() was called
|
||||
boolean addRecvmsg(int fd, long msgHdr) {
|
||||
return enqueueSqe(Native.IORING_OP_RECVMSG, 0, fd, msgHdr, 1, 0, 0);
|
||||
}
|
||||
|
||||
boolean addSendmsg(int fd, long msgHdr) {
|
||||
return enqueueSqe(Native.IORING_OP_SENDMSG, 0, fd, msgHdr, 1, 0, 0);
|
||||
}
|
||||
|
||||
boolean addRead(int fd, long bufferAddress, int pos, int limit) {
|
||||
return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0, 0);
|
||||
}
|
||||
|
@ -0,0 +1,40 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
/**
|
||||
* struct iovec {
|
||||
* void *iov_base; // Starting address
|
||||
* size_t iov_len; // Number of bytes to transfer
|
||||
* };
|
||||
*/
|
||||
final class Iov {
|
||||
|
||||
private Iov() { }
|
||||
|
||||
static void write(long iovAddress, long bufferAddress, int length) {
|
||||
if (Native.SIZEOF_SIZE_T == 4) {
|
||||
PlatformDependent.putInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE, (int) bufferAddress);
|
||||
PlatformDependent.putInt(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN, length);
|
||||
} else {
|
||||
assert Native.SIZEOF_SIZE_T == 8;
|
||||
PlatformDependent.putLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_BASE, bufferAddress);
|
||||
PlatformDependent.putLong(iovAddress + Native.IOVEC_OFFSETOF_IOV_LEN, length);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
/**
|
||||
* struct msghdr {
|
||||
* void *msg_name; // optional address
|
||||
* socklen_t msg_namelen; // size of address
|
||||
* struct iovec*msg_iov; // scatter/gather array
|
||||
* size_t msg_iovlen; // # elements in msg_iov
|
||||
* void* msg_control; // ancillary data, see below
|
||||
* size_t msg_controllen; // ancillary data buffer len
|
||||
* int msg_flags; // flags on received message
|
||||
* };
|
||||
*/
|
||||
final class MsgHdr {
|
||||
|
||||
private MsgHdr() { }
|
||||
|
||||
static void write(long memoryAddress, long address, int addressSize, long iovAddress, int iovLength) {
|
||||
PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_NAMELEN, addressSize);
|
||||
|
||||
if (Native.SIZEOF_SIZE_T == 4) {
|
||||
PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_NAME, (int) address);
|
||||
PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOV, (int) iovAddress);
|
||||
PlatformDependent.putInt(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOVLEN, iovLength);
|
||||
} else {
|
||||
assert Native.SIZEOF_SIZE_T == 8;
|
||||
PlatformDependent.putLong(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_NAME, address);
|
||||
PlatformDependent.putLong(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOV, iovAddress);
|
||||
PlatformDependent.putLong(memoryAddress + Native.MSGHDR_OFFSETOF_MSG_IOVLEN, iovLength);
|
||||
}
|
||||
|
||||
// No msg_control and flags (we assume the memory was memset before)
|
||||
}
|
||||
}
|
@ -66,6 +66,7 @@ final class Native {
|
||||
static final int SOCK_CLOEXEC = NativeStaticallyReferencedJniMethods.sockCloexec();
|
||||
static final short AF_INET = (short) NativeStaticallyReferencedJniMethods.afInet();
|
||||
static final short AF_INET6 = (short) NativeStaticallyReferencedJniMethods.afInet6();
|
||||
static final int SIZEOF_SOCKADDR_STORAGE = NativeStaticallyReferencedJniMethods.sizeofSockaddrStorage();
|
||||
static final int SIZEOF_SOCKADDR_IN = NativeStaticallyReferencedJniMethods.sizeofSockaddrIn();
|
||||
static final int SIZEOF_SOCKADDR_IN6 = NativeStaticallyReferencedJniMethods.sizeofSockaddrIn6();
|
||||
static final int SOCKADDR_IN_OFFSETOF_SIN_FAMILY =
|
||||
@ -73,7 +74,6 @@ final class Native {
|
||||
static final int SOCKADDR_IN_OFFSETOF_SIN_PORT = NativeStaticallyReferencedJniMethods.sockaddrInOffsetofSinPort();
|
||||
static final int SOCKADDR_IN_OFFSETOF_SIN_ADDR = NativeStaticallyReferencedJniMethods.sockaddrInOffsetofSinAddr();
|
||||
static final int IN_ADDRESS_OFFSETOF_S_ADDR = NativeStaticallyReferencedJniMethods.inAddressOffsetofSAddr();
|
||||
|
||||
static final int SOCKADDR_IN6_OFFSETOF_SIN6_FAMILY =
|
||||
NativeStaticallyReferencedJniMethods.sockaddrIn6OffsetofSin6Family();
|
||||
static final int SOCKADDR_IN6_OFFSETOF_SIN6_PORT =
|
||||
@ -85,7 +85,19 @@ final class Native {
|
||||
static final int SOCKADDR_IN6_OFFSETOF_SIN6_SCOPE_ID =
|
||||
NativeStaticallyReferencedJniMethods.sockaddrIn6OffsetofSin6ScopeId();
|
||||
static final int IN6_ADDRESS_OFFSETOF_S6_ADDR = NativeStaticallyReferencedJniMethods.in6AddressOffsetofS6Addr();
|
||||
|
||||
static final int SIZEOF_SIZE_T = NativeStaticallyReferencedJniMethods.sizeofSizeT();
|
||||
static final int SIZEOF_IOVEC = NativeStaticallyReferencedJniMethods.sizeofIovec();
|
||||
static final int IOVEC_OFFSETOF_IOV_BASE = NativeStaticallyReferencedJniMethods.iovecOffsetofIovBase();
|
||||
static final int IOVEC_OFFSETOF_IOV_LEN = NativeStaticallyReferencedJniMethods.iovecOffsetofIovLen();
|
||||
static final int SIZEOF_MSGHDR = NativeStaticallyReferencedJniMethods.sizeofMsghdr();
|
||||
static final int MSGHDR_OFFSETOF_MSG_NAME = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgName();
|
||||
static final int MSGHDR_OFFSETOF_MSG_NAMELEN = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgNamelen();
|
||||
static final int MSGHDR_OFFSETOF_MSG_IOV = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgIov();
|
||||
static final int MSGHDR_OFFSETOF_MSG_IOVLEN = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgIovlen();
|
||||
static final int MSGHDR_OFFSETOF_MSG_CONTROL = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgControl();
|
||||
static final int MSGHDR_OFFSETOF_MSG_CONTROLLEN =
|
||||
NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgControllen();
|
||||
static final int MSGHDR_OFFSETOF_MSG_FLAGS = NativeStaticallyReferencedJniMethods.msghdrOffsetofMsgFlags();
|
||||
static final int POLLIN = NativeStaticallyReferencedJniMethods.pollin();
|
||||
static final int POLLOUT = NativeStaticallyReferencedJniMethods.pollout();
|
||||
static final int POLLRDHUP = NativeStaticallyReferencedJniMethods.pollrdhup();
|
||||
@ -100,6 +112,8 @@ final class Native {
|
||||
static final int IORING_OP_CONNECT = NativeStaticallyReferencedJniMethods.ioringOpConnect();
|
||||
static final int IORING_OP_CLOSE = NativeStaticallyReferencedJniMethods.ioringOpClose();
|
||||
static final int IORING_OP_WRITEV = NativeStaticallyReferencedJniMethods.ioringOpWritev();
|
||||
static final int IORING_OP_SENDMSG = NativeStaticallyReferencedJniMethods.ioringOpSendmsg();
|
||||
static final int IORING_OP_RECVMSG = NativeStaticallyReferencedJniMethods.ioringOpRecvmsg();
|
||||
static final int IORING_ENTER_GETEVENTS = NativeStaticallyReferencedJniMethods.ioringEnterGetevents();
|
||||
static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync();
|
||||
|
||||
|
@ -46,6 +46,19 @@ final class NativeStaticallyReferencedJniMethods {
|
||||
static native int sockaddrIn6OffsetofSin6Addr();
|
||||
static native int sockaddrIn6OffsetofSin6ScopeId();
|
||||
static native int in6AddressOffsetofS6Addr();
|
||||
static native int sizeofSockaddrStorage();
|
||||
static native int sizeofSizeT();
|
||||
static native int sizeofIovec();
|
||||
static native int iovecOffsetofIovBase();
|
||||
static native int iovecOffsetofIovLen();
|
||||
static native int sizeofMsghdr();
|
||||
static native int msghdrOffsetofMsgName();
|
||||
static native int msghdrOffsetofMsgNamelen();
|
||||
static native int msghdrOffsetofMsgIov();
|
||||
static native int msghdrOffsetofMsgIovlen();
|
||||
static native int msghdrOffsetofMsgControl();
|
||||
static native int msghdrOffsetofMsgControllen();
|
||||
static native int msghdrOffsetofMsgFlags();
|
||||
static native int etime();
|
||||
static native int ecanceled();
|
||||
static native int pollin();
|
||||
@ -60,6 +73,8 @@ final class NativeStaticallyReferencedJniMethods {
|
||||
static native int ioringOpWrite();
|
||||
static native int ioringOpConnect();
|
||||
static native int ioringOpClose();
|
||||
static native int ioringOpSendmsg();
|
||||
static native int ioringOpRecvmsg();
|
||||
static native int ioringEnterGetevents();
|
||||
static native int iosqeAsync();
|
||||
}
|
||||
|
@ -29,6 +29,14 @@ final class SockaddrIn {
|
||||
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, (byte) 0xff, (byte) 0xff };
|
||||
private SockaddrIn() { }
|
||||
|
||||
static void write(boolean ipv6, long memory, InetSocketAddress address) {
|
||||
if (ipv6) {
|
||||
SockaddrIn.writeIPv6(memory, address.getAddress(), address.getPort());
|
||||
} else {
|
||||
SockaddrIn.writeIPv4(memory, address.getAddress(), address.getPort());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* struct sockaddr_in {
|
||||
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.DatagramConnectNotExistsTest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class IOUringDatagramConnectNotExistsTest extends DatagramConnectNotExistsTest {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapFactory<Bootstrap>> newFactories() {
|
||||
return IOUringSocketTestPermutation.INSTANCE.datagramSocket();
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.DatagramMulticastIPv6Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class IOUringDatagramMulticastIPv6Test extends DatagramMulticastIPv6Test {
|
||||
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
|
||||
return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.DatagramMulticastTest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class IOUringDatagramMulticastTest extends DatagramMulticastTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
|
||||
return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory;
|
||||
import io.netty.testsuite.transport.socket.DatagramUnicastIPv6MappedTest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class IOUringDatagramUnicastIPv6MappedTest extends DatagramUnicastIPv6MappedTest {
|
||||
@Override
|
||||
protected List<BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
|
||||
return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.DatagramUnicastIPv6Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class IOUringDatagramUnicastIPv6Test extends DatagramUnicastIPv6Test {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
|
||||
return IOUringSocketTestPermutation.INSTANCE.datagram(internetProtocolFamily());
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
import io.netty.testsuite.transport.socket.DatagramUnicastTest;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class IOUringDatagramUnicastTest extends DatagramUnicastTest {
|
||||
@Override
|
||||
protected List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> newFactories() {
|
||||
return IOUringSocketTestPermutation.INSTANCE.datagram(InternetProtocolFamily.IPv4);
|
||||
}
|
||||
}
|
@ -17,7 +17,11 @@ package io.netty.channel.uring;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.socket.InternetProtocolFamily;
|
||||
import io.netty.channel.socket.nio.NioDatagramChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||
@ -110,6 +114,49 @@ public class IOUringSocketTestPermutation extends SocketTestPermutation {
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TestsuitePermutation.BootstrapComboFactory<Bootstrap, Bootstrap>> datagram(
|
||||
final InternetProtocolFamily family) {
|
||||
// Make the list of Bootstrap factories.
|
||||
List<BootstrapFactory<Bootstrap>> bfs = Arrays.<BootstrapFactory<Bootstrap>>asList(
|
||||
new BootstrapFactory<Bootstrap>() {
|
||||
@Override
|
||||
public Bootstrap newInstance() {
|
||||
return new Bootstrap().group(IO_URING_WORKER_GROUP)
|
||||
.channelFactory(new ChannelFactory<Channel>() {
|
||||
@Override
|
||||
public Channel newChannel() {
|
||||
return new IOUringDatagramChannel(family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return InternetProtocolFamily.class.getSimpleName() + ".class";
|
||||
}
|
||||
});
|
||||
}
|
||||
},
|
||||
new BootstrapFactory<Bootstrap>() {
|
||||
@Override
|
||||
public Bootstrap newInstance() {
|
||||
return new Bootstrap().group(nioWorkerGroup).channelFactory(new ChannelFactory<Channel>() {
|
||||
@Override
|
||||
public Channel newChannel() {
|
||||
return new NioDatagramChannel(family);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return NioDatagramChannel.class.getSimpleName() + ".class";
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return combo(bfs, bfs);
|
||||
}
|
||||
|
||||
public boolean isServerFastOpen() {
|
||||
return AccessController.doPrivileged(new PrivilegedAction<Integer>() {
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user