diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index d81a984ce2..ada5b73ce9 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -42,13 +42,11 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements final LinuxSocket socket; protected volatile boolean active; boolean uringInReadyPending; - private final long ioUring; - AbstractIOUringChannel(final Channel parent, LinuxSocket fd, boolean active, final long ioUring) { + AbstractIOUringChannel(final Channel parent, LinuxSocket fd) { super(parent); this.socket = checkNotNull(fd, "fd"); - this.active = active; - this.ioUring = ioUring; + this.active = true; } public boolean isOpen() { @@ -75,6 +73,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements public void doReadBytes(ByteBuf byteBuf) { IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); + IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); + unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); if (byteBuf.hasMemoryAddress()) { @@ -82,13 +82,10 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements final Event event = new Event(); event.setId(eventId); event.setOp(EventType.READ); - - //Todo - //int error = socket.readEvent(ioUring, eventId, byteBuf.memoryAddress(), byteBuf.writerIndex(), - // byteBuf.capacity()); - // if (error == 0) { - // ioUringEventLoop.addNewEvent(event); - // } + event.setReadBuffer(byteBuf); + event.setAbstractIOUringChannel(this); + submissionQueue.add(eventId, EventType.READ, socket.getFd(), byteBuf.memoryAddress(), + byteBuf.writerIndex(), byteBuf.capacity()); } } @@ -145,34 +142,25 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { - Object msg = in.current(); - if (msg == null) { - // nothing left to write - return; - } - if (msg instanceof ByteBuf) { - ByteBuf buf = (ByteBuf) msg; - int readableBytes = buf.readableBytes(); - while (readableBytes > 0) { - doWriteBytes(buf); - - // have to move it to the eventloop - int newReadableBytes = buf.readableBytes(); - in.progress(readableBytes - newReadableBytes); - readableBytes = newReadableBytes; + if (in.size() == 1) { + Object msg = in.current(); + if (msg instanceof ByteBuf) { + doWriteBytes((ByteBuf) msg); } - in.remove(); } } protected final void doWriteBytes(ByteBuf buf) throws Exception { if (buf.hasMemoryAddress()) { IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); + IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); final Event event = new Event(); long eventId = ioUringEventLoop.incrementEventIdCounter(); event.setId(eventId); event.setOp(EventType.WRITE); - //socket.writeEvent(ioUring, eventId, buf.memoryAddress(), buf.readerIndex(), buf.writerIndex()); + event.setAbstractIOUringChannel(this); + submissionQueue.add(eventId, EventType.WRITE, socket.getFd(), buf.memoryAddress(), buf.readerIndex(), + buf.writerIndex()); } } @@ -205,7 +193,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements @Override public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, - final ChannelPromise promise) { + final ChannelPromise promise) { } final void executeUringReadOperator() { @@ -222,7 +210,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements protected Object filterOutboundMessage(Object msg) { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf; + return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf; } throw new UnsupportedOperationException("unsupported message type"); @@ -243,9 +231,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements } } - public long getIoUring() { - return ioUring; - } + @Override + public abstract IOUringChannelConfig config(); @Override protected SocketAddress localAddress0() { @@ -256,4 +243,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements protected SocketAddress remoteAddress0() { return null; } -} + + public LinuxSocket getSocket() { + return socket; + } +} \ No newline at end of file diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index c895b3465e..9433dcb3a0 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -24,17 +24,16 @@ import io.netty.channel.unix.FileDescriptor; import java.net.SocketAddress; -public class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel { +public abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel { private volatile SocketAddress local; - AbstractIOUringServerChannel(final Channel parent, final LinuxSocket fd, final boolean active, final long ioUring) { - super(parent, fd, active, ioUring); + AbstractIOUringServerChannel(int fd) { + super(null, new LinuxSocket(fd)); } - @Override - public ChannelConfig config() { - return null; + AbstractIOUringServerChannel(LinuxSocket fd) { + super(null, fd); } @Override @@ -62,29 +61,35 @@ public class AbstractIOUringServerChannel extends AbstractIOUringChannel impleme return null; } + public AbstractIOUringChannel getChannel() { + return this; + } + + abstract Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception; + final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { private final byte[] acceptedAddress = new byte[26]; @Override public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, - final ChannelPromise promise) { + final ChannelPromise promise) { promise.setFailure(new UnsupportedOperationException()); } @Override public void uringEventExecution() { final IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); + IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); long eventId = ioUringEventLoop.incrementEventIdCounter(); final Event event = new Event(); event.setId(eventId); event.setOp(EventType.ACCEPT); + event.setAbstractIOUringChannel(getChannel()); - //Todo - // if (socket.acceptEvent(getIoUring(), eventId, acceptedAddress) == 0) { - // ioUringEventLoop.addNewEvent(event); - // Native.ioUringSubmit(getIoUring()); - // } + //todo get network addresses + submissionQueue.add(eventId, EventType.ACCEPT, getChannel().getSocket().getFd(), 0, 0, 0); + ioUringEventLoop.addNewEvent(event); } } -} +} \ No newline at end of file diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java index 3edb12e33d..c4df4d0c9c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Event.java @@ -15,9 +15,31 @@ */ package io.netty.channel.uring; +import io.netty.buffer.ByteBuf; + public class Event { private long id; + private ByteBuf readBuffer; + private AbstractIOUringChannel abstractIOUringChannel; + private EventType op; + + public AbstractIOUringChannel getAbstractIOUringChannel() { + return abstractIOUringChannel; + } + + public void setAbstractIOUringChannel(AbstractIOUringChannel abstractIOUringChannel) { + this.abstractIOUringChannel = abstractIOUringChannel; + } + + public ByteBuf getReadBuffer() { + return readBuffer; + } + + public void setReadBuffer(ByteBuf readBuffer) { + this.readBuffer = readBuffer; + } + public long getId() { return id; } @@ -33,6 +55,5 @@ public class Event { public void setOp(final EventType op) { this.op = op; } - - private EventType op; } + diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelConfig.java new file mode 100644 index 0000000000..4d313ad6f4 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringChannelConfig.java @@ -0,0 +1,15 @@ +package io.netty.channel.uring; + +import io.netty.channel.Channel; +import io.netty.channel.DefaultChannelConfig; +import io.netty.channel.RecvByteBufAllocator; + +public class IOUringChannelConfig extends DefaultChannelConfig { + public IOUringChannelConfig(Channel channel) { + super(channel); + } + + protected IOUringChannelConfig(Channel channel, RecvByteBufAllocator allocator) { + super(channel, allocator); + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 7e95b57434..2b281def5c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -31,10 +31,11 @@ class IOUringEventLoop extends SingleThreadEventLoop { // events should be unique to identify which event type that was private long eventIdCounter; private final LongObjectHashMap events = new LongObjectHashMap(); + private final RingBuffer ringBuffer; - protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp, - final int maxPendingTasks, final RejectedExecutionHandler rejectedExecutionHandler) { - super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); + protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { + super(parent, executor, addTaskWakesUp); + ringBuffer = Native.createRingBuffer(32); } public long incrementEventIdCounter() { @@ -71,4 +72,8 @@ class IOUringEventLoop extends SingleThreadEventLoop { // processing Tasks } } + + public RingBuffer getRingBuffer() { + return ringBuffer; + } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java new file mode 100644 index 0000000000..43720a1032 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java @@ -0,0 +1,116 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.channel.DefaultSelectStrategyFactory; +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopTaskQueueFactory; +import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.SelectStrategyFactory; +import io.netty.util.concurrent.EventExecutorChooserFactory; +import io.netty.util.concurrent.RejectedExecutionHandler; +import io.netty.util.concurrent.RejectedExecutionHandlers; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +public class IOUringEventLoopGroup extends MultithreadEventLoopGroup { + + + public IOUringEventLoopGroup() { + this(0); + } + + + public IOUringEventLoopGroup(int nThreads) { + this(nThreads, (ThreadFactory) null); + } + + + @SuppressWarnings("deprecation") + public IOUringEventLoopGroup(ThreadFactory threadFactory) { + this(0, threadFactory, 0); + } + + + @SuppressWarnings("deprecation") + public IOUringEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) { + this(nThreads, (ThreadFactory) null, selectStrategyFactory); + } + + + @SuppressWarnings("deprecation") + public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) { + this(nThreads, threadFactory, 0); + } + + public IOUringEventLoopGroup(int nThreads, Executor executor) { + this(nThreads, executor, DefaultSelectStrategyFactory.INSTANCE); + } + + @SuppressWarnings("deprecation") + public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, + SelectStrategyFactory selectStrategyFactory) { + this(nThreads, threadFactory, 0, selectStrategyFactory); + } + + @Deprecated + public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) { + this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE); + } + + @Deprecated + public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce, + SelectStrategyFactory selectStrategyFactory) { + super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory, RejectedExecutionHandlers.reject()); + } + + public IOUringEventLoopGroup(int nThreads, Executor executor, SelectStrategyFactory selectStrategyFactory) { + super(nThreads, executor, 0, selectStrategyFactory, RejectedExecutionHandlers.reject()); + } + + public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory) { + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, RejectedExecutionHandlers.reject()); + } + + public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler) { + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler); + } + + public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler, + EventLoopTaskQueueFactory queueFactory) { + super(nThreads, executor, chooserFactory, 0, + selectStrategyFactory, rejectedExecutionHandler, queueFactory); + } + + @Deprecated + public void setIoRatio(int ioRatio) { + if (ioRatio <= 0 || ioRatio > 100) { + throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); + } + } + + @Override + protected EventLoop newChild(Executor executor, Object... args) throws Exception { + //EventLoopTaskQueueFactory queueFactory = args.length == 4? (EventLoopTaskQueueFactory) args[3] : null; + return new IOUringEventLoop(this, executor, false); + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerChannelConfig.java new file mode 100644 index 0000000000..4aebe8ff64 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerChannelConfig.java @@ -0,0 +1,144 @@ +package io.netty.channel.uring; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelOption; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.socket.ServerSocketChannelConfig; +import io.netty.util.NetUtil; + +import java.io.IOException; +import java.util.Map; + +import static io.netty.channel.ChannelOption.*; +import static io.netty.util.internal.ObjectUtil.*; + +public class IOUringServerChannelConfig extends IOUringChannelConfig implements ServerSocketChannelConfig { + private volatile int backlog = NetUtil.SOMAXCONN; + private volatile int pendingFastOpenRequestsThreshold; + + IOUringServerChannelConfig(AbstractIOUringServerChannel channel) { + super(channel); + } + + @Override + public boolean isReuseAddress() { + try { + return ((IOUringSocketChannel) channel).socket.isReuseAddress(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringServerChannelConfig setReuseAddress(boolean reuseAddress) { + try { + ((IOUringSocketChannel) channel).socket.setReuseAddress(reuseAddress); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getReceiveBufferSize() { + try { + return ((IOUringSocketChannel) channel).socket.getReceiveBufferSize(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringServerChannelConfig setReceiveBufferSize(int receiveBufferSize) { + try { + ((IOUringSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getBacklog() { + return backlog; + } + + @Override + public IOUringServerChannelConfig setBacklog(int backlog) { + checkPositiveOrZero(backlog, "backlog"); + this.backlog = backlog; + return this; + } + + @Override + public IOUringServerChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) { + return this; + } + + @Override + public IOUringServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + @Deprecated + public IOUringServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public IOUringServerChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public IOUringServerChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public IOUringServerChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public IOUringServerChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + @Deprecated + public IOUringServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + @Deprecated + public IOUringServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public IOUringServerChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + super.setWriteBufferWaterMark(writeBufferWaterMark); + return this; + } + + @Override + public IOUringServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java index 769d9b6186..d83332278d 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannel.java @@ -17,28 +17,39 @@ package io.netty.channel.uring; import io.netty.channel.Channel; import io.netty.channel.socket.ServerSocketChannel; -import io.netty.channel.socket.ServerSocketChannelConfig; +import io.netty.channel.unix.Socket; + import java.net.InetSocketAddress; import java.net.SocketAddress; public class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel { - IOUringServerSocketChannel(Channel parent, LinuxSocket fd, boolean active, long ioUring) { - super(parent, fd, active, ioUring); + private final IOUringServerSocketChannelConfig config; + + public IOUringServerSocketChannel() { + super(Socket.newSocketStream().getFd()); + this.config = new IOUringServerSocketChannelConfig(this); } + @Override public void doBind(SocketAddress localAddress) throws Exception { super.doBind(localAddress); } + @Override + public IOUringServerSocketChannelConfig config() { + return config; + } + @Override public boolean isOpen() { return false; } + @Override - public ServerSocketChannelConfig config() { - return null; + Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception { + return new IOUringSocketChannel(this, new LinuxSocket(fd)); } @Override @@ -55,4 +66,4 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp public InetSocketAddress localAddress() { return (InetSocketAddress) super.localAddress(); } -} +} \ No newline at end of file diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java new file mode 100644 index 0000000000..fc208dec66 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringServerSocketChannelConfig.java @@ -0,0 +1,10 @@ +package io.netty.channel.uring; + +import io.netty.channel.socket.ServerSocketChannelConfig; + +public class IOUringServerSocketChannelConfig extends IOUringServerChannelConfig implements ServerSocketChannelConfig { + + IOUringServerSocketChannelConfig(AbstractIOUringServerChannel channel) { + super(channel); + } +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java index 455828bf14..5d84142331 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannel.java @@ -31,9 +31,11 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; public class IOUringSocketChannel extends AbstractIOUringChannel implements SocketChannel { + private final IOUringSocketChannelConfig config; - IOUringSocketChannel(final Channel parent, final LinuxSocket fd, final boolean active, final long ioUring) { - super(parent, fd, active, ioUring); + IOUringSocketChannel(final Channel parent, final LinuxSocket fd) { + super(parent, fd); + this.config = new IOUringSocketChannelConfig(this); } @Override @@ -42,8 +44,8 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock } @Override - public SocketChannelConfig config() { - return null; + public IOUringSocketChannelConfig config() { + return config; } @Override diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java new file mode 100644 index 0000000000..976e444cd3 --- /dev/null +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSocketChannelConfig.java @@ -0,0 +1,242 @@ +package io.netty.channel.uring; + +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelException; +import io.netty.channel.MessageSizeEstimator; +import io.netty.channel.RecvByteBufAllocator; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.socket.SocketChannelConfig; + +import java.io.IOException; + + +public class IOUringSocketChannelConfig extends IOUringChannelConfig implements SocketChannelConfig { + private volatile boolean allowHalfClosure; + + /** + * Creates a new instance. + */ + IOUringSocketChannelConfig(IOUringSocketChannel channel) { + super(channel); + } + + + @Override + public int getReceiveBufferSize() { + try { + return ((IOUringSocketChannel) channel).socket.getReceiveBufferSize(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getSendBufferSize() { + try { + return ((IOUringSocketChannel) channel).socket.getSendBufferSize(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getSoLinger() { + try { + return ((IOUringSocketChannel) channel).socket.getSoLinger(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public int getTrafficClass() { + try { + return ((IOUringSocketChannel) channel).socket.getTrafficClass(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isKeepAlive() { + try { + return ((IOUringSocketChannel) channel).socket.isKeepAlive(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isReuseAddress() { + try { + return ((IOUringSocketChannel) channel).socket.isReuseAddress(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isTcpNoDelay() { + try { + return ((IOUringSocketChannel) channel).socket.isTcpNoDelay(); + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringSocketChannelConfig setKeepAlive(boolean keepAlive) { + try { + ((IOUringSocketChannel) channel).socket.setKeepAlive(keepAlive); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringSocketChannelConfig setPerformancePreferences( + int connectionTime, int latency, int bandwidth) { + return this; + } + + @Override + public IOUringSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) { + try { + ((IOUringSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringSocketChannelConfig setReuseAddress(boolean reuseAddress) { + try { + ((IOUringSocketChannel) channel).socket.setReuseAddress(reuseAddress); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringSocketChannelConfig setSendBufferSize(int sendBufferSize) { + try { + ((IOUringSocketChannel) channel).socket.setSendBufferSize(sendBufferSize); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringSocketChannelConfig setSoLinger(int soLinger) { + try { + ((IOUringSocketChannel) channel).socket.setSoLinger(soLinger); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) { + try { + ((IOUringSocketChannel) channel).socket.setTcpNoDelay(tcpNoDelay); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public IOUringSocketChannelConfig setTrafficClass(int trafficClass) { + try { + ((IOUringSocketChannel) channel).socket.setTrafficClass(trafficClass); + return this; + } catch (IOException e) { + throw new ChannelException(e); + } + } + + @Override + public boolean isAllowHalfClosure() { + return allowHalfClosure; + } + + @Override + public IOUringSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) { + this.allowHalfClosure = allowHalfClosure; + return this; + } + + @Override + public IOUringSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) { + super.setConnectTimeoutMillis(connectTimeoutMillis); + return this; + } + + @Override + @Deprecated + public IOUringSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) { + super.setMaxMessagesPerRead(maxMessagesPerRead); + return this; + } + + @Override + public IOUringSocketChannelConfig setWriteSpinCount(int writeSpinCount) { + super.setWriteSpinCount(writeSpinCount); + return this; + } + + @Override + public IOUringSocketChannelConfig setAllocator(ByteBufAllocator allocator) { + super.setAllocator(allocator); + return this; + } + + @Override + public IOUringSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) { + super.setRecvByteBufAllocator(allocator); + return this; + } + + @Override + public IOUringSocketChannelConfig setAutoRead(boolean autoRead) { + super.setAutoRead(autoRead); + return this; + } + + @Override + public IOUringSocketChannelConfig setAutoClose(boolean autoClose) { + super.setAutoClose(autoClose); + return this; + } + + @Override + @Deprecated + public IOUringSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + super.setWriteBufferHighWaterMark(writeBufferHighWaterMark); + return this; + } + + @Override + @Deprecated + public IOUringSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + super.setWriteBufferLowWaterMark(writeBufferLowWaterMark); + return this; + } + + @Override + public IOUringSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) { + super.setWriteBufferWaterMark(writeBufferWaterMark); + return this; + } + + @Override + public IOUringSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) { + super.setMessageSizeEstimator(estimator); + return this; + } +} diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java index 0b0a55ee58..4c1c6ff0f8 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java @@ -231,5 +231,10 @@ public class FileDescriptor { private static native int read(int fd, ByteBuffer buf, int pos, int limit); private static native int readAddress(int fd, long address, int pos, int limit); + //only temporary + public int getFd() { + return fd; + } + private static native long newPipe(); }