Draft - Add native io_uring transport

Motivation:
to get a better feeling of how it could be implemented

Result:
- cant be build yet -> still work in progress
This commit is contained in:
Josef Grieb 2020-06-18 09:05:16 +02:00
parent 4dc6764d7b
commit 07e2e23a7d
15 changed files with 1030 additions and 0 deletions

View File

@ -419,6 +419,7 @@
<module>transport-blockhound-tests</module>
<module>microbench</module>
<module>bom</module>
<module>transport-native-io_uring</module>
</modules>
<dependencyManagement>

View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 The Netty Project
~
~ The Netty Project licenses this file to you under the Apache License,
~ version 2.0 (the "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at:
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.1.51.Final-SNAPSHOT</version>
</parent>
<artifactId>netty-transport-native-io_uring</artifactId>
<name>Netty/Transport/Native/io_uring</name>
<packaging>jar</packaging>
<properties>
<javaModuleName>io.netty.transport.io_uring</javaModuleName>
<!-- Needed as we use SelfSignedCertificate in our tests -->
<argLine.java9.extras>--add-exports java.base/sun.security.x509=ALL-UNNAMED</argLine.java9.extras>
<unix.common.lib.name>netty-unix-common</unix.common.lib.name>
<unix.common.lib.dir>${project.build.directory}/unix-common-lib</unix.common.lib.dir>
<unix.common.lib.unpacked.dir>${unix.common.lib.dir}/META-INF/native/lib</unix.common.lib.unpacked.dir>
<unix.common.include.unpacked.dir>${unix.common.lib.dir}/META-INF/native/include</unix.common.include.unpacked.dir>
<jni.compiler.args.cflags>CFLAGS=-O3 -Werror -fno-omit-frame-pointer -Wunused-variable -fvisibility=hidden
-I${unix.common.include.unpacked.dir}
</jni.compiler.args.cflags>
<jni.compiler.args.ldflags>LDFLAGS=-L${unix.common.lib.unpacked.dir} -Wl,--no-as-needed -lrt -Wl,--whole-archive
-l${unix.common.lib.name} -Wl,--no-whole-archive
</jni.compiler.args.ldflags>
<nativeSourceDirectory>${project.basedir}/src/main/c</nativeSourceDirectory>
<skipTests>true</skipTests>
</properties>
</project>

View File

@ -0,0 +1,110 @@
/* SPDX-License-Identifier: MIT */
#include "barrier.h"
#include <linux/io_uring.h>
#include <stdio.h>
#include <stddef.h>
#include <stdint.h>
#ifndef LIB_TEST
#define LIB_TEST
struct io_uring_sq {
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *kflags;
unsigned *kdropped;
unsigned *array;
struct io_uring_sqe *sqes;
unsigned sqe_head;
unsigned sqe_tail;
size_t ring_sz;
void *ring_ptr;
};
struct io_uring_cq {
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *koverflow;
struct io_uring_cqe *cqes;
size_t ring_sz;
void *ring_ptr;
};
struct io_uring {
struct io_uring_sq sq;
struct io_uring_cq cq;
unsigned flags;
int ring_fd;
};
void io_uring_unmap_rings(struct io_uring_sq *sq, struct io_uring_cq *cq) {
munmap(sq->ring_ptr, sq->ring_sz);
if (cq->ring_ptr && cq->ring_ptr != sq->ring_ptr)
munmap(cq->ring_ptr, cq->ring_sz);
}
int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq,
struct io_uring_cq *cq) {
size_t size;
int ret;
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
if (p->features & IORING_FEAT_SINGLE_MMAP) {
if (cq->ring_sz > sq->ring_sz)
sq->ring_sz = cq->ring_sz;
cq->ring_sz = sq->ring_sz;
}
sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
if (sq->ring_ptr == MAP_FAILED)
return -errno;
if (p->features & IORING_FEAT_SINGLE_MMAP) {
cq->ring_ptr = sq->ring_ptr;
} else {
cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
if (cq->ring_ptr == MAP_FAILED) {
cq->ring_ptr = NULL;
ret = -errno;
goto err;
}
}
sq->khead = sq->ring_ptr + p->sq_off.head;
sq->ktail = sq->ring_ptr + p->sq_off.tail;
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
sq->kflags = sq->ring_ptr + p->sq_off.flags;
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
sq->array = sq->ring_ptr + p->sq_off.array;
size = p->sq_entries * sizeof(struct io_uring_sqe);
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
fd, IORING_OFF_SQES);
if (sq->sqes == MAP_FAILED) {
ret = -errno;
err:
io_uring_unmap_rings(sq, cq);
return ret;
}
cq->khead = cq->ring_ptr + p->cq_off.head;
cq->ktail = cq->ring_ptr + p->cq_off.tail;
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
return 0;
}
#endif

View File

@ -0,0 +1,21 @@
#include <jni.h>
#include <stdint.h>
#include <stdlib.h>
#include <errno.h>
#include "io_uring.h"
#include <errno.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>

View File

@ -0,0 +1,268 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.UnixChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.UnresolvedAddressException;
import static io.netty.util.internal.ObjectUtil.*;
public abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel {
private volatile SocketAddress local;
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
final LinuxSocket socket;
protected volatile boolean active;
boolean uringInReadyPending;
private final long ioUring;
AbstractIOUringChannel(final Channel parent, LinuxSocket fd, boolean active, final long ioUring) {
super(parent);
this.socket = checkNotNull(fd, "fd");
this.active = active;
this.ioUring = ioUring;
}
public boolean isOpen() {
return socket.isOpen();
}
@Override
public boolean isActive() {
return active;
}
@Override
public ChannelMetadata metadata() {
return METADATA;
}
@Override
protected abstract AbstractUringUnsafe newUnsafe();
@Override
protected boolean isCompatible(final EventLoop loop) {
return loop instanceof IOUringEventLoop;
}
public void doReadBytes(ByteBuf byteBuf) {
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
if (byteBuf.hasMemoryAddress()) {
long eventId = ioUringEventLoop.incrementEventIdCounter();
final Event event = new Event();
event.setId(eventId);
event.setOp(EventType.READ);
int error = socket.readEvent(ioUring, eventId, byteBuf.memoryAddress(), byteBuf.writerIndex(),
byteBuf.capacity());
if (error == 0) {
ioUringEventLoop.addNewEvent(event);
}
}
}
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
return newDirectBuffer(buf, buf);
}
protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.release(holder);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf == null) {
return newDirectBuffer0(holder, buf, alloc, readableBytes);
}
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
final ByteBuf directBuf = alloc.directBuffer(capacity);
directBuf.writeBytes(buf, buf.readerIndex(), capacity);
ReferenceCountUtil.safeRelease(holder);
return directBuf;
}
@Override
protected void doDisconnect() throws Exception {
}
@Override
protected void doClose() throws Exception {
}
//Channel/ChannelHandlerContext.read() was called
@Override
protected void doBeginRead() throws Exception {
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
if (!uringInReadyPending) {
uringInReadyPending = true;
unsafe.executeUringReadOperator();
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
if (msg == null) {
// nothing left to write
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
while (readableBytes > 0) {
doWriteBytes(buf);
//have to move it to the eventloop
int newReadableBytes = buf.readableBytes();
in.progress(readableBytes - newReadableBytes);
readableBytes = newReadableBytes;
}
in.remove();
}
}
protected final void doWriteBytes(ByteBuf buf) throws Exception {
if (buf.hasMemoryAddress()) {
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
final Event event = new Event();
long eventId = ioUringEventLoop.incrementEventIdCounter();
event.setId(eventId);
event.setOp(EventType.WRITE);
socket.writeEvent(ioUring, eventId, buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
}
}
abstract class AbstractUringUnsafe extends AbstractUnsafe {
private IOUringRecvByteAllocatorHandle allocHandle;
private final Runnable readRunnable = new Runnable() {
@Override
public void run() {
uringEventExecution();
}
};
/**
* Create a new {@link } instance.
*
* @param handle The handle to wrap with EPOLL specific logic.
*/
IOUringRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
return new IOUringRecvByteAllocatorHandle(handle);
}
@Override
public IOUringRecvByteAllocatorHandle recvBufAllocHandle() {
if (allocHandle == null) {
allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
}
return allocHandle;
}
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise) {
}
final void executeUringReadOperator() {
if (!isActive()) {
return;
}
eventLoop().execute(readRunnable);
}
public abstract void uringEventExecution();
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
}
throw new UnsupportedOperationException(
"unsupported message type");
}
@Override
public void doBind(final SocketAddress localAddress) throws Exception {
if (local instanceof InetSocketAddress) {
checkResolvable((InetSocketAddress) local);
}
socket.bind(local);
this.local = socket.localAddress();
}
protected static void checkResolvable(InetSocketAddress addr) {
if (addr.isUnresolved()) {
throw new UnresolvedAddressException();
}
}
public long getIoUring() {
return ioUring;
}
@Override
protected SocketAddress localAddress0() {
return null;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.FileDescriptor;
import java.net.SocketAddress;
public class AbstractIOUringServerChannel extends AbstractIOUringChannel implements ServerChannel {
private volatile SocketAddress local;
AbstractIOUringServerChannel(final Channel parent, final LinuxSocket fd, final boolean active, final long ioUring) {
super(parent, fd, active, ioUring);
}
@Override
public ChannelConfig config() {
return null;
}
@Override
protected AbstractUringUnsafe newUnsafe() {
return new UringServerChannelUnsafe();
}
@Override
protected SocketAddress localAddress0() {
return null;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
throw new UnsupportedOperationException();
}
@Override
public FileDescriptor fd() {
return null;
}
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
private final byte[] acceptedAddress = new byte[26];
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
@Override
public void uringEventExecution() {
final IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
long eventId = ioUringEventLoop.incrementEventIdCounter();
final Event event = new Event();
event.setId(eventId);
event.setOp(EventType.ACCEPT);
if (socket.acceptEvent(getIoUring(), eventId, acceptedAddress) == 0) {
ioUringEventLoop.addNewEvent(event);
Native.submit(getIoUring());
}
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
public class Event {
private long id;
public long getId() {
return id;
}
public void setId(final long id) {
this.id = id;
}
public EventType getOp() {
return op;
}
public void setOp(final EventType op) {
this.op = op;
}
private EventType op;
}

View File

@ -0,0 +1,22 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
public enum EventType {
ACCEPT,
READ,
WRITE
}

View File

@ -0,0 +1,76 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import java.util.HashMap;
import java.util.concurrent.Executor;
class IOUringEventLoop extends SingleThreadEventLoop {
//C pointer
private final long io_uring;
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
//events should be unique to identify which event type that was
private long eventIdCounter;
private HashMap<Long, Event> events = new HashMap<Long, Event>();
protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp,
final int maxPendingTasks,
final RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.io_uring = Native.io_uring_setup(100);
}
public long incrementEventIdCounter() {
long eventId = eventIdCounter;
eventIdCounter++;
return eventId;
}
public void addNewEvent(Event event) {
events.put(event.getId(), event);
}
@Override
protected void run() {
for (; ; ) {
//wait until an event has finished
final long cqe = Native.wait_cqe(io_uring);
final Event event = events.get(Native.getEventId(cqe));
final int ret = Native.getRes(cqe);
switch (event.getOp()) {
case ACCEPT:
//serverChannel is necessary to call newChildchannel
//create a new accept event
break;
case READ:
//need to save the Bytebuf before I execute the read operation fireChannelRead(byteBuf)
break;
case WRITE:
//you have to store Bytebuf to continue writing
break;
}
//processing Tasks
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.unix.PreferredDirectByteBufAllocator;
import io.netty.util.UncheckedBooleanSupplier;
public class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.DelegatingHandle
implements RecvByteBufAllocator.ExtendedHandle {
private final PreferredDirectByteBufAllocator preferredDirectByteBufAllocator =
new PreferredDirectByteBufAllocator();
private final UncheckedBooleanSupplier defaultMaybeMoreDataSupplier = new UncheckedBooleanSupplier() {
@Override
public boolean get() {
return true;
}
};
IOUringRecvByteAllocatorHandle(RecvByteBufAllocator.ExtendedHandle handle) {
super(handle);
}
@Override
public final ByteBuf allocate(ByteBufAllocator alloc) {
// We need to ensure we always allocate a direct ByteBuf as we can only use a direct buffer to read via JNI.
preferredDirectByteBufAllocator.updateAllocator(alloc);
return delegate().allocate(preferredDirectByteBufAllocator);
}
@Override
public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return ((RecvByteBufAllocator.ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
}
@Override
public final boolean continueReading() {
return false;
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.channel.Channel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel {
IOUringServerSocketChannel(Channel parent, LinuxSocket fd, boolean active,
long ioUring) {
super(parent, fd, active, ioUring);
}
@Override
public void doBind(SocketAddress localAddress) throws Exception {
super.doBind(localAddress);
}
@Override
public boolean isOpen() {
return false;
}
@Override
public ServerSocketChannelConfig config() {
return null;
}
@Override
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
}

View File

@ -0,0 +1,135 @@
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class IOUringSocketChannel extends AbstractIOUringChannel implements SocketChannel {
IOUringSocketChannel(final Channel parent, final LinuxSocket fd, final boolean active, final long ioUring) {
super(parent, fd, active, ioUring);
}
@Override
public ServerSocketChannel parent() {
return (ServerSocketChannel) super.parent();
}
@Override
public SocketChannelConfig config() {
return null;
}
@Override
protected AbstractUringUnsafe newUnsafe() {
return new AbstractUringUnsafe() {
@Override
public void uringEventExecution() {
final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = allocHandle.allocate(allocator);
doReadBytes(byteBuf);
}
};
}
@Override
public void doBind(SocketAddress localAddress) throws Exception {
}
@Override
public boolean isInputShutdown() {
return false;
}
@Override
public ChannelFuture shutdownInput() {
return null;
}
@Override
public ChannelFuture shutdownInput(ChannelPromise promise) {
return null;
}
@Override
public boolean isOutputShutdown() {
return false;
}
@Override
public ChannelFuture shutdownOutput() {
return null;
}
@Override
public ChannelFuture shutdownOutput(ChannelPromise promise) {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public ChannelFuture shutdown() {
return null;
}
@Override
public ChannelFuture shutdown(ChannelPromise promise) {
return null;
}
@Override
public FileDescriptor fd() {
return null;
}
@Override
protected SocketAddress localAddress0() {
return null;
}
@Override
protected SocketAddress remoteAddress0() {
return null;
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
}
@Override
public InetSocketAddress localAddress() {
return (InetSocketAddress) super.localAddress();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.channel.unix.Socket;
public class LinuxSocket extends Socket {
private long fd;
public LinuxSocket(final int fd) {
super(fd);
this.fd = fd;
}
public int readEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
return Native.read(ring, fd, eventId, bufferAddress, pos, limit);
}
public int writeEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
return Native.write(ring, fd, eventId, bufferAddress, pos, limit);
}
public int acceptEvent(long ring, long eventId, byte[] addr) {
return Native.accept(ring, eventId, addr);
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
public final class Native {
public static native long io_uring_setup(int entries);
public static native long getSQE(long io_uring);
public static native long getQC(long io_uring);
public static native int read(long io_uring, long fd, long eventId, long bufferAddress, int pos,
int limit);
public static native int write(long io_uring, long fd, long eventId, long bufferAddress, int pos,
int limit);
public static native int accept(long io_uring, long fd, byte[] addr);
//return id
public static native long wait_cqe(long io_uring);
public static native long deleteCqe(long io_uring, long cqeAddress);
public static native long getEventId(long cqeAddress);
public static native int getRes(long cqeAddress);
public static native long close(long io_uring);
public static native long submit(long io_uring);
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* io_uring
*/
package io.netty.channel.uring;