Add channel configs
Motivation: -missing channel configs -we dont store byteBuf and channel for the read operation to execute channel.fireChannelReadComplete Modifications: -add channels configs -new Event properties for the processing completionQueue Result: with these changes it will be much easier to implement the eventloop
This commit is contained in:
parent
8a56cd1959
commit
247e14a2b1
@ -42,13 +42,11 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
final LinuxSocket socket;
|
||||
protected volatile boolean active;
|
||||
boolean uringInReadyPending;
|
||||
private final long ioUring;
|
||||
|
||||
AbstractIOUringChannel(final Channel parent, LinuxSocket fd, boolean active, final long ioUring) {
|
||||
AbstractIOUringChannel(final Channel parent, LinuxSocket fd) {
|
||||
super(parent);
|
||||
this.socket = checkNotNull(fd, "fd");
|
||||
this.active = active;
|
||||
this.ioUring = ioUring;
|
||||
this.active = true;
|
||||
}
|
||||
|
||||
public boolean isOpen() {
|
||||
@ -75,6 +73,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
|
||||
public void doReadBytes(ByteBuf byteBuf) {
|
||||
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
|
||||
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
|
||||
|
||||
unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
|
||||
|
||||
if (byteBuf.hasMemoryAddress()) {
|
||||
@ -82,13 +82,10 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
final Event event = new Event();
|
||||
event.setId(eventId);
|
||||
event.setOp(EventType.READ);
|
||||
|
||||
//Todo
|
||||
//int error = socket.readEvent(ioUring, eventId, byteBuf.memoryAddress(), byteBuf.writerIndex(),
|
||||
// byteBuf.capacity());
|
||||
// if (error == 0) {
|
||||
// ioUringEventLoop.addNewEvent(event);
|
||||
// }
|
||||
event.setReadBuffer(byteBuf);
|
||||
event.setAbstractIOUringChannel(this);
|
||||
submissionQueue.add(eventId, EventType.READ, socket.getFd(), byteBuf.memoryAddress(),
|
||||
byteBuf.writerIndex(), byteBuf.capacity());
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,34 +142,25 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||
Object msg = in.current();
|
||||
if (msg == null) {
|
||||
// nothing left to write
|
||||
return;
|
||||
}
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
int readableBytes = buf.readableBytes();
|
||||
while (readableBytes > 0) {
|
||||
doWriteBytes(buf);
|
||||
|
||||
// have to move it to the eventloop
|
||||
int newReadableBytes = buf.readableBytes();
|
||||
in.progress(readableBytes - newReadableBytes);
|
||||
readableBytes = newReadableBytes;
|
||||
if (in.size() == 1) {
|
||||
Object msg = in.current();
|
||||
if (msg instanceof ByteBuf) {
|
||||
doWriteBytes((ByteBuf) msg);
|
||||
}
|
||||
in.remove();
|
||||
}
|
||||
}
|
||||
|
||||
protected final void doWriteBytes(ByteBuf buf) throws Exception {
|
||||
if (buf.hasMemoryAddress()) {
|
||||
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
|
||||
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
|
||||
final Event event = new Event();
|
||||
long eventId = ioUringEventLoop.incrementEventIdCounter();
|
||||
event.setId(eventId);
|
||||
event.setOp(EventType.WRITE);
|
||||
//socket.writeEvent(ioUring, eventId, buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
|
||||
event.setAbstractIOUringChannel(this);
|
||||
submissionQueue.add(eventId, EventType.WRITE, socket.getFd(), buf.memoryAddress(), buf.readerIndex(),
|
||||
buf.writerIndex());
|
||||
}
|
||||
}
|
||||
|
||||
@ -205,7 +193,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
|
||||
@Override
|
||||
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
|
||||
final ChannelPromise promise) {
|
||||
final ChannelPromise promise) {
|
||||
}
|
||||
|
||||
final void executeUringReadOperator() {
|
||||
@ -222,7 +210,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
protected Object filterOutboundMessage(Object msg) {
|
||||
if (msg instanceof ByteBuf) {
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
|
||||
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
|
||||
}
|
||||
|
||||
throw new UnsupportedOperationException("unsupported message type");
|
||||
@ -243,9 +231,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
}
|
||||
}
|
||||
|
||||
public long getIoUring() {
|
||||
return ioUring;
|
||||
}
|
||||
@Override
|
||||
public abstract IOUringChannelConfig config();
|
||||
|
||||
@Override
|
||||
protected SocketAddress localAddress0() {
|
||||
@ -256,4 +243,8 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
|
||||
protected SocketAddress remoteAddress0() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public LinuxSocket getSocket() {
|
||||
return socket;
|
||||
}
|
||||
}
|
@ -24,17 +24,16 @@ import io.netty.channel.unix.FileDescriptor;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
|
||||
public class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel {
|
||||
public abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel {
|
||||
|
||||
private volatile SocketAddress local;
|
||||
|
||||
AbstractIOUringServerChannel(final Channel parent, final LinuxSocket fd, final boolean active, final long ioUring) {
|
||||
super(parent, fd, active, ioUring);
|
||||
AbstractIOUringServerChannel(int fd) {
|
||||
super(null, new LinuxSocket(fd));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelConfig config() {
|
||||
return null;
|
||||
AbstractIOUringServerChannel(LinuxSocket fd) {
|
||||
super(null, fd);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -62,29 +61,35 @@ public class AbstractIOUringServerChannel extends AbstractIOUringChannel impleme
|
||||
return null;
|
||||
}
|
||||
|
||||
public AbstractIOUringChannel getChannel() {
|
||||
return this;
|
||||
}
|
||||
|
||||
abstract Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception;
|
||||
|
||||
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
|
||||
private final byte[] acceptedAddress = new byte[26];
|
||||
|
||||
@Override
|
||||
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
|
||||
final ChannelPromise promise) {
|
||||
final ChannelPromise promise) {
|
||||
promise.setFailure(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void uringEventExecution() {
|
||||
final IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
|
||||
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
|
||||
|
||||
long eventId = ioUringEventLoop.incrementEventIdCounter();
|
||||
final Event event = new Event();
|
||||
event.setId(eventId);
|
||||
event.setOp(EventType.ACCEPT);
|
||||
event.setAbstractIOUringChannel(getChannel());
|
||||
|
||||
//Todo
|
||||
// if (socket.acceptEvent(getIoUring(), eventId, acceptedAddress) == 0) {
|
||||
// ioUringEventLoop.addNewEvent(event);
|
||||
// Native.ioUringSubmit(getIoUring());
|
||||
// }
|
||||
//todo get network addresses
|
||||
submissionQueue.add(eventId, EventType.ACCEPT, getChannel().getSocket().getFd(), 0, 0, 0);
|
||||
ioUringEventLoop.addNewEvent(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -15,9 +15,31 @@
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
public class Event {
|
||||
private long id;
|
||||
|
||||
private ByteBuf readBuffer;
|
||||
private AbstractIOUringChannel abstractIOUringChannel;
|
||||
private EventType op;
|
||||
|
||||
public AbstractIOUringChannel getAbstractIOUringChannel() {
|
||||
return abstractIOUringChannel;
|
||||
}
|
||||
|
||||
public void setAbstractIOUringChannel(AbstractIOUringChannel abstractIOUringChannel) {
|
||||
this.abstractIOUringChannel = abstractIOUringChannel;
|
||||
}
|
||||
|
||||
public ByteBuf getReadBuffer() {
|
||||
return readBuffer;
|
||||
}
|
||||
|
||||
public void setReadBuffer(ByteBuf readBuffer) {
|
||||
this.readBuffer = readBuffer;
|
||||
}
|
||||
|
||||
public long getId() {
|
||||
return id;
|
||||
}
|
||||
@ -33,6 +55,5 @@ public class Event {
|
||||
public void setOp(final EventType op) {
|
||||
this.op = op;
|
||||
}
|
||||
|
||||
private EventType op;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,15 @@
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.DefaultChannelConfig;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
|
||||
public class IOUringChannelConfig extends DefaultChannelConfig {
|
||||
public IOUringChannelConfig(Channel channel) {
|
||||
super(channel);
|
||||
}
|
||||
|
||||
protected IOUringChannelConfig(Channel channel, RecvByteBufAllocator allocator) {
|
||||
super(channel, allocator);
|
||||
}
|
||||
}
|
@ -31,10 +31,11 @@ class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
// events should be unique to identify which event type that was
|
||||
private long eventIdCounter;
|
||||
private final LongObjectHashMap<Event> events = new LongObjectHashMap<Event>();
|
||||
private final RingBuffer ringBuffer;
|
||||
|
||||
protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp,
|
||||
final int maxPendingTasks, final RejectedExecutionHandler rejectedExecutionHandler) {
|
||||
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
|
||||
protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
|
||||
super(parent, executor, addTaskWakesUp);
|
||||
ringBuffer = Native.createRingBuffer(32);
|
||||
}
|
||||
|
||||
public long incrementEventIdCounter() {
|
||||
@ -71,4 +72,8 @@ class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
// processing Tasks
|
||||
}
|
||||
}
|
||||
|
||||
public RingBuffer getRingBuffer() {
|
||||
return ringBuffer;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,116 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.DefaultSelectStrategyFactory;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopTaskQueueFactory;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.SelectStrategyFactory;
|
||||
import io.netty.util.concurrent.EventExecutorChooserFactory;
|
||||
import io.netty.util.concurrent.RejectedExecutionHandler;
|
||||
import io.netty.util.concurrent.RejectedExecutionHandlers;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
public class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
|
||||
|
||||
public IOUringEventLoopGroup() {
|
||||
this(0);
|
||||
}
|
||||
|
||||
|
||||
public IOUringEventLoopGroup(int nThreads) {
|
||||
this(nThreads, (ThreadFactory) null);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public IOUringEventLoopGroup(ThreadFactory threadFactory) {
|
||||
this(0, threadFactory, 0);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public IOUringEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) {
|
||||
this(nThreads, (ThreadFactory) null, selectStrategyFactory);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
|
||||
this(nThreads, threadFactory, 0);
|
||||
}
|
||||
|
||||
public IOUringEventLoopGroup(int nThreads, Executor executor) {
|
||||
this(nThreads, executor, DefaultSelectStrategyFactory.INSTANCE);
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory,
|
||||
SelectStrategyFactory selectStrategyFactory) {
|
||||
this(nThreads, threadFactory, 0, selectStrategyFactory);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
|
||||
this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce,
|
||||
SelectStrategyFactory selectStrategyFactory) {
|
||||
super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory, RejectedExecutionHandlers.reject());
|
||||
}
|
||||
|
||||
public IOUringEventLoopGroup(int nThreads, Executor executor, SelectStrategyFactory selectStrategyFactory) {
|
||||
super(nThreads, executor, 0, selectStrategyFactory, RejectedExecutionHandlers.reject());
|
||||
}
|
||||
|
||||
public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
|
||||
SelectStrategyFactory selectStrategyFactory) {
|
||||
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, RejectedExecutionHandlers.reject());
|
||||
}
|
||||
|
||||
public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
|
||||
SelectStrategyFactory selectStrategyFactory,
|
||||
RejectedExecutionHandler rejectedExecutionHandler) {
|
||||
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler);
|
||||
}
|
||||
|
||||
public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
|
||||
SelectStrategyFactory selectStrategyFactory,
|
||||
RejectedExecutionHandler rejectedExecutionHandler,
|
||||
EventLoopTaskQueueFactory queueFactory) {
|
||||
super(nThreads, executor, chooserFactory, 0,
|
||||
selectStrategyFactory, rejectedExecutionHandler, queueFactory);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setIoRatio(int ioRatio) {
|
||||
if (ioRatio <= 0 || ioRatio > 100) {
|
||||
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
|
||||
//EventLoopTaskQueueFactory queueFactory = args.length == 4? (EventLoopTaskQueueFactory) args[3] : null;
|
||||
return new IOUringEventLoop(this, executor, false);
|
||||
}
|
||||
}
|
@ -0,0 +1,144 @@
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.MessageSizeEstimator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.socket.ServerSocketChannelConfig;
|
||||
import io.netty.util.NetUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static io.netty.channel.ChannelOption.*;
|
||||
import static io.netty.util.internal.ObjectUtil.*;
|
||||
|
||||
public class IOUringServerChannelConfig extends IOUringChannelConfig implements ServerSocketChannelConfig {
|
||||
private volatile int backlog = NetUtil.SOMAXCONN;
|
||||
private volatile int pendingFastOpenRequestsThreshold;
|
||||
|
||||
IOUringServerChannelConfig(AbstractIOUringServerChannel channel) {
|
||||
super(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReuseAddress() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.isReuseAddress();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setReuseAddress(boolean reuseAddress) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setReuseAddress(reuseAddress);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getReceiveBufferSize() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.getReceiveBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setReceiveBufferSize(int receiveBufferSize) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBacklog() {
|
||||
return backlog;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setBacklog(int backlog) {
|
||||
checkPositiveOrZero(backlog, "backlog");
|
||||
this.backlog = backlog;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setPerformancePreferences(int connectionTime, int latency, int bandwidth) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
|
||||
super.setConnectTimeoutMillis(connectTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringServerChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
|
||||
super.setMaxMessagesPerRead(maxMessagesPerRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setWriteSpinCount(int writeSpinCount) {
|
||||
super.setWriteSpinCount(writeSpinCount);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setAllocator(ByteBufAllocator allocator) {
|
||||
super.setAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
|
||||
super.setRecvByteBufAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setAutoRead(boolean autoRead) {
|
||||
super.setAutoRead(autoRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringServerChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
||||
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringServerChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
|
||||
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
|
||||
super.setWriteBufferWaterMark(writeBufferWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
|
||||
super.setMessageSizeEstimator(estimator);
|
||||
return this;
|
||||
}
|
||||
}
|
@ -17,28 +17,39 @@ package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.channel.socket.ServerSocketChannelConfig;
|
||||
import io.netty.channel.unix.Socket;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
public class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel {
|
||||
IOUringServerSocketChannel(Channel parent, LinuxSocket fd, boolean active, long ioUring) {
|
||||
super(parent, fd, active, ioUring);
|
||||
private final IOUringServerSocketChannelConfig config;
|
||||
|
||||
public IOUringServerSocketChannel() {
|
||||
super(Socket.newSocketStream().getFd());
|
||||
this.config = new IOUringServerSocketChannelConfig(this);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void doBind(SocketAddress localAddress) throws Exception {
|
||||
super.doBind(localAddress);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringServerSocketChannelConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ServerSocketChannelConfig config() {
|
||||
return null;
|
||||
Channel newChildChannel(int fd, IOUringSubmissionQueue submissionQueue) throws Exception {
|
||||
return new IOUringSocketChannel(this, new LinuxSocket(fd));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -55,4 +66,4 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp
|
||||
public InetSocketAddress localAddress() {
|
||||
return (InetSocketAddress) super.localAddress();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,10 @@
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.socket.ServerSocketChannelConfig;
|
||||
|
||||
public class IOUringServerSocketChannelConfig extends IOUringServerChannelConfig implements ServerSocketChannelConfig {
|
||||
|
||||
IOUringServerSocketChannelConfig(AbstractIOUringServerChannel channel) {
|
||||
super(channel);
|
||||
}
|
||||
}
|
@ -31,9 +31,11 @@ import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
|
||||
public class IOUringSocketChannel extends AbstractIOUringChannel implements SocketChannel {
|
||||
private final IOUringSocketChannelConfig config;
|
||||
|
||||
IOUringSocketChannel(final Channel parent, final LinuxSocket fd, final boolean active, final long ioUring) {
|
||||
super(parent, fd, active, ioUring);
|
||||
IOUringSocketChannel(final Channel parent, final LinuxSocket fd) {
|
||||
super(parent, fd);
|
||||
this.config = new IOUringSocketChannelConfig(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -42,8 +44,8 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock
|
||||
}
|
||||
|
||||
@Override
|
||||
public SocketChannelConfig config() {
|
||||
return null;
|
||||
public IOUringSocketChannelConfig config() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,242 @@
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.MessageSizeEstimator;
|
||||
import io.netty.channel.RecvByteBufAllocator;
|
||||
import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.socket.SocketChannelConfig;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
public class IOUringSocketChannelConfig extends IOUringChannelConfig implements SocketChannelConfig {
|
||||
private volatile boolean allowHalfClosure;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
*/
|
||||
IOUringSocketChannelConfig(IOUringSocketChannel channel) {
|
||||
super(channel);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getReceiveBufferSize() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.getReceiveBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSendBufferSize() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.getSendBufferSize();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSoLinger() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.getSoLinger();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTrafficClass() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.getTrafficClass();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isKeepAlive() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.isKeepAlive();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReuseAddress() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.isReuseAddress();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTcpNoDelay() {
|
||||
try {
|
||||
return ((IOUringSocketChannel) channel).socket.isTcpNoDelay();
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setKeepAlive(boolean keepAlive) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setKeepAlive(keepAlive);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setPerformancePreferences(
|
||||
int connectionTime, int latency, int bandwidth) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setReceiveBufferSize(receiveBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setReuseAddress(boolean reuseAddress) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setReuseAddress(reuseAddress);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setSendBufferSize(int sendBufferSize) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setSendBufferSize(sendBufferSize);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setSoLinger(int soLinger) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setSoLinger(soLinger);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setTcpNoDelay(tcpNoDelay);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setTrafficClass(int trafficClass) {
|
||||
try {
|
||||
((IOUringSocketChannel) channel).socket.setTrafficClass(trafficClass);
|
||||
return this;
|
||||
} catch (IOException e) {
|
||||
throw new ChannelException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAllowHalfClosure() {
|
||||
return allowHalfClosure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setAllowHalfClosure(boolean allowHalfClosure) {
|
||||
this.allowHalfClosure = allowHalfClosure;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setConnectTimeoutMillis(int connectTimeoutMillis) {
|
||||
super.setConnectTimeoutMillis(connectTimeoutMillis);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringSocketChannelConfig setMaxMessagesPerRead(int maxMessagesPerRead) {
|
||||
super.setMaxMessagesPerRead(maxMessagesPerRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setWriteSpinCount(int writeSpinCount) {
|
||||
super.setWriteSpinCount(writeSpinCount);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setAllocator(ByteBufAllocator allocator) {
|
||||
super.setAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setRecvByteBufAllocator(RecvByteBufAllocator allocator) {
|
||||
super.setRecvByteBufAllocator(allocator);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setAutoRead(boolean autoRead) {
|
||||
super.setAutoRead(autoRead);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setAutoClose(boolean autoClose) {
|
||||
super.setAutoClose(autoClose);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringSocketChannelConfig setWriteBufferHighWaterMark(int writeBufferHighWaterMark) {
|
||||
super.setWriteBufferHighWaterMark(writeBufferHighWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public IOUringSocketChannelConfig setWriteBufferLowWaterMark(int writeBufferLowWaterMark) {
|
||||
super.setWriteBufferLowWaterMark(writeBufferLowWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
|
||||
super.setWriteBufferWaterMark(writeBufferWaterMark);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IOUringSocketChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator) {
|
||||
super.setMessageSizeEstimator(estimator);
|
||||
return this;
|
||||
}
|
||||
}
|
@ -231,5 +231,10 @@ public class FileDescriptor {
|
||||
private static native int read(int fd, ByteBuffer buf, int pos, int limit);
|
||||
private static native int readAddress(int fd, long address, int pos, int limit);
|
||||
|
||||
//only temporary
|
||||
public int getFd() {
|
||||
return fd;
|
||||
}
|
||||
|
||||
private static native long newPipe();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user