Added io_uring JNI

Motivation:

prototype is not buildable and JNI io_uring implementation is missing

Modifications:

-added io_uring implementation(source from https://github.com/axboe/liburing)
-eventloop stores netty io_uring pointer which is used for two ring buffers to execute events like read and write operations in JNI
-memory barriers already included in JNI(will be changed in the future)
-pom file adopted from native epoll

Result:

prototype can finally be built
This commit is contained in:
Josef Grieb 2020-06-26 11:23:41 +02:00
parent 187ec6dffd
commit 962a3433ca
17 changed files with 1847 additions and 746 deletions

View File

@ -419,7 +419,7 @@
<module>transport-blockhound-tests</module>
<module>microbench</module>
<module>bom</module>
<module>transport-native-io_uring</module>
<module>transport-native-io_uring</module>
</modules>
<dependencyManagement>

File diff suppressed because it is too large Load Diff

View File

@ -35,16 +35,334 @@
<unix.common.lib.dir>${project.build.directory}/unix-common-lib</unix.common.lib.dir>
<unix.common.lib.unpacked.dir>${unix.common.lib.dir}/META-INF/native/lib</unix.common.lib.unpacked.dir>
<unix.common.include.unpacked.dir>${unix.common.lib.dir}/META-INF/native/include</unix.common.include.unpacked.dir>
<jni.compiler.args.cflags>CFLAGS=-O3 -Werror -fno-omit-frame-pointer -Wunused-variable -fvisibility=hidden
-I${unix.common.include.unpacked.dir}
<jni.compiler.args.cflags>CFLAGS=-O3 -Werror -fno-omit-frame-pointer -Wunused-variable -fvisibility=hidden -I${unix.common.include.unpacked.dir}
</jni.compiler.args.cflags>
<jni.compiler.args.ldflags>LDFLAGS=-L${unix.common.lib.unpacked.dir} -Wl,--no-as-needed -lrt -Wl,--whole-archive
-l${unix.common.lib.name} -Wl,--no-whole-archive
<jni.compiler.args.ldflags>LDFLAGS=-L${unix.common.lib.unpacked.dir} -Wl,--no-as-needed -lrt -Wl,--whole-archive -l${unix.common.lib.name} -Wl,--no-whole-archive
</jni.compiler.args.ldflags>
<nativeSourceDirectory>${project.basedir}/src/main/c</nativeSourceDirectory>
<skipTests>true</skipTests>
</properties>
<profiles>
<!--
Netty must be released from RHEL 6.8 x86_64 or compatible so that:
1) we ship x86_64 version of epoll transport officially, and
2) we ensure the ABI compatibility with older GLIBC versions.
The shared library built on a distribution with newer GLIBC
will not run on older distributions.
-->
<profile>
<id>restricted-release-io_uring</id>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.4.1</version>
<dependencies>
<!-- Provides the 'requireFilesContent' enforcer rule. -->
<dependency>
<groupId>com.ceilfors.maven.plugin</groupId>
<artifactId>enforcer-rules</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce-release-environment</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireProperty>
<regexMessage>
Release process must be performed on linux-x86_64.
</regexMessage>
<property>os.detected.classifier</property>
<regex>^linux-x86_64$</regex>
</requireProperty>
<requireFilesContent>
<message>
Release process must be performed on RHEL 6.8 or its derivatives.
</message>
<files>
<file>/etc/redhat-release</file>
</files>
<content>release 6.9</content>
</requireFilesContent>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>linux</id>
<activation>
<os>
<family>linux</family>
</os>
</activation>
<properties>
<skipTests>false</skipTests>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<!-- unpack the unix-common static library and include files -->
<execution>
<id>unpack</id>
<phase>generate-sources</phase>
<goals>
<goal>unpack-dependencies</goal>
</goals>
<configuration>
<includeGroupIds>${project.groupId}</includeGroupIds>
<includeArtifactIds>netty-transport-native-unix-common</includeArtifactIds>
<classifier>${jni.classifier}</classifier>
<outputDirectory>${unix.common.lib.dir}</outputDirectory>
<includes>META-INF/native/**</includes>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.fusesource.hawtjni</groupId>
<artifactId>maven-hawtjni-plugin</artifactId>
<executions>
<execution>
<id>build-native-lib</id>
<configuration>
<name>netty_transport_native_io_uring_${os.detected.arch}</name>
<nativeSourceDirectory>${nativeSourceDirectory}</nativeSourceDirectory>
<libDirectory>${project.build.outputDirectory}</libDirectory>
<!-- We use Maven's artifact classifier instead.
This hack will make the hawtjni plugin to put the native library
under 'META-INF/native' rather than 'META-INF/native/${platform}'. -->
<platform>.</platform>
<configureArgs>
<arg>${jni.compiler.args.ldflags}</arg>
<arg>${jni.compiler.args.cflags}</arg>
<configureArg>--libdir=${project.build.directory}/native-build/target/lib</configureArg>
</configureArgs>
</configuration>
<goals>
<goal>generate</goal>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- Generate the JAR that contains the native library in it. -->
<execution>
<id>native-jar</id>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
<manifestEntries>
<Bundle-NativeCode>META-INF/native/libnetty_transport_native_io_uring_${os.detected.arch}.so; osname=Linux; processor=${os.detected.arch},*</Bundle-NativeCode>
<Automatic-Module-Name>${javaModuleName}</Automatic-Module-Name>
</manifestEntries>
<index>true</index>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
<classifier>${jni.classifier}</classifier>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${project.version}</version>
<classifier>${jni.classifier}</classifier>
<!--
The unix-common with classifier dependency is optional because it is not a runtime dependency, but a build time
dependency to get the static library which is built directly into the shared library generated by this project.
-->
<optional>true</optional>
</dependency>
</dependencies>
</profile>
<profile>
<id>linux-aarch64</id>
<properties>
<jni.classifier>${os.detected.name}-aarch64</jni.classifier>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.4.1</version>
<dependencies>
<!-- Provides the 'requireFilesContent' enforcer rule. -->
<dependency>
<groupId>com.ceilfors.maven.plugin</groupId>
<artifactId>enforcer-rules</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce-release-environment</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireProperty>
<regexMessage>
Cross compile and Release process must be performed on linux-x86_64.
</regexMessage>
<property>os.detected.classifier</property>
<regex>^linux-x86_64.*</regex>
</requireProperty>
<requireFilesContent>
<message>
Cross compile and Release process must be performed on RHEL 7.6 or its derivatives.
</message>
<files>
<file>/etc/redhat-release</file>
</files>
<content>release 7.6</content>
</requireFilesContent>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<!-- unpack the unix-common static library and include files -->
<execution>
<id>unpack</id>
<phase>generate-sources</phase>
<goals>
<goal>unpack-dependencies</goal>
</goals>
<configuration>
<includeGroupIds>${project.groupId}</includeGroupIds>
<includeArtifactIds>netty-transport-native-unix-common</includeArtifactIds>
<classifier>${jni.classifier}</classifier>
<outputDirectory>${unix.common.lib.dir}</outputDirectory>
<includes>META-INF/native/**</includes>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>true</overWriteSnapshots>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.fusesource.hawtjni</groupId>
<artifactId>maven-hawtjni-plugin</artifactId>
<executions>
<execution>
<id>build-native-lib</id>
<configuration>
<name>netty_transport_native_io_uring_aarch_64</name>
<nativeSourceDirectory>${nativeSourceDirectory}</nativeSourceDirectory>
<libDirectory>${project.build.outputDirectory}</libDirectory>
<!-- We use Maven's artifact classifier instead.
This hack will make the hawtjni plugin to put the native library
under 'META-INF/native' rather than 'META-INF/native/${platform}'. -->
<platform>.</platform>
<configureArgs>
<arg>${jni.compiler.args.ldflags}</arg>
<arg>${jni.compiler.args.cflags}</arg>
<configureArg>--libdir=${project.build.directory}/native-build/target/lib</configureArg>
<configureArg>--host=aarch64-linux-gnu</configureArg>
</configureArgs>
</configuration>
<goals>
<goal>generate</goal>
<goal>build</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- Generate the JAR that contains the native library in it. -->
<execution>
<id>native-jar</id>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
</manifest>
<manifestEntries>
<Bundle-NativeCode>META-INF/native/libnetty_transport_native_io_uring_aarch_64.so; osname=Linux; processor=aarch_64,*</Bundle-NativeCode>
<Automatic-Module-Name>${javaModuleName}</Automatic-Module-Name>
</manifestEntries>
<index>true</index>
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
<classifier>${jni.classifier}</classifier>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${project.version}</version>
<classifier>${jni.classifier}</classifier>
<!--
The unix-common with classifier dependency is optional because it is not a runtime dependency, but a build time
dependency to get the static library which is built directly into the shared library generated by this project.
-->
<optional>true</optional>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
@ -86,4 +404,42 @@
</dependency>
</dependencies>
<build>
<plugins>
<!-- Also include c files in source jar -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${nativeSourceDirectory}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- Generate the fallback JAR that does not contain the native library. -->
<execution>
<id>default-jar</id>
<configuration>
<excludes>
<exclude>META-INF/native/**</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,34 @@
/* SPDX-License-Identifier: MIT */
#ifndef LIBURING_BARRIER_H
#define LIBURING_BARRIER_H
#include <stdatomic.h>
/*
From the kernel documentation file refcount-vs-atomic.rst:
A RELEASE memory ordering guarantees that all prior loads and
stores (all po-earlier instructions) on the same CPU are completed
before the operation. It also guarantees that all po-earlier
stores on the same CPU and all propagated stores from other CPUs
must propagate to all other CPUs before the release operation
(A-cumulative property). This is implemented using
:c:func:`smp_store_release`.
An ACQUIRE memory ordering guarantees that all post loads and
stores (all po-later instructions) on the same CPU are
completed after the acquire operation. It also guarantees that all
po-later stores on the same CPU must propagate to all other CPUs
after the acquire operation executes. This is implemented using
:c:func:`smp_acquire__after_ctrl_dep`.
*/
#define IO_URING_WRITE_ONCE(var, val) \
atomic_store_explicit(&(var), (val), memory_order_relaxed)
#define IO_URING_READ_ONCE(var) \
atomic_load_explicit(&(var), memory_order_relaxed)
#define io_uring_smp_store_release(p, v) \
atomic_store_explicit((p), (v), memory_order_release)
#define io_uring_smp_load_acquire(p) \
atomic_load_explicit((p), memory_order_acquire)
#endif /* defined(LIBURING_BARRIER_H) */

View File

@ -1,110 +1,82 @@
/* SPDX-License-Identifier: MIT */
#include "barrier.h"
#include <linux/io_uring.h>
#include <stdio.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#ifndef LIB_TEST
#define LIB_TEST
struct io_uring_sq {
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *kflags;
unsigned *kdropped;
unsigned *array;
struct io_uring_sqe *sqes;
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *kflags;
unsigned *kdropped;
unsigned *array;
struct io_uring_sqe *sqes;
unsigned sqe_head;
unsigned sqe_tail;
unsigned sqe_head;
unsigned sqe_tail;
size_t ring_sz;
void *ring_ptr;
size_t ring_sz;
void *ring_ptr;
};
struct io_uring_cq {
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *koverflow;
struct io_uring_cqe *cqes;
unsigned *khead;
unsigned *ktail;
unsigned *kring_mask;
unsigned *kring_entries;
unsigned *koverflow;
struct io_uring_cqe *cqes;
size_t ring_sz;
void *ring_ptr;
size_t ring_sz;
void *ring_ptr;
};
struct io_uring {
struct io_uring_sq sq;
struct io_uring_cq cq;
unsigned flags;
int ring_fd;
struct io_uring_sq sq;
struct io_uring_cq cq;
unsigned flags;
int ring_fd;
};
void io_uring_unmap_rings(struct io_uring_sq *sq, struct io_uring_cq *cq) {
munmap(sq->ring_ptr, sq->ring_sz);
if (cq->ring_ptr && cq->ring_ptr != sq->ring_ptr)
munmap(cq->ring_ptr, cq->ring_sz);
#define io_uring_for_each_cqe(ring, head, cqe) \
/* \
* io_uring_smp_load_acquire() enforces the order of tail \
* and CQE reads. \
*/ \
for (head = *(ring)->cq.khead; \
(cqe = (head != io_uring_smp_load_acquire((ring)->cq.ktail) \
? &(ring)->cq.cqes[head & (*(ring)->cq.kring_mask)] \
: NULL)); \
head++)
/*
* Must be called after io_uring_for_each_cqe()
*/
static inline void io_uring_cq_advance(struct io_uring *ring, unsigned nr) {
if (nr) {
struct io_uring_cq *cq = &ring->cq;
/*
* Ensure that the kernel only sees the new value of the head
* index after the CQEs have been read.
*/
io_uring_smp_store_release(cq->khead, *cq->khead + nr);
}
}
int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq,
struct io_uring_cq *cq) {
size_t size;
int ret;
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
if (p->features & IORING_FEAT_SINGLE_MMAP) {
if (cq->ring_sz > sq->ring_sz)
sq->ring_sz = cq->ring_sz;
cq->ring_sz = sq->ring_sz;
}
sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
if (sq->ring_ptr == MAP_FAILED)
return -errno;
if (p->features & IORING_FEAT_SINGLE_MMAP) {
cq->ring_ptr = sq->ring_ptr;
} else {
cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
if (cq->ring_ptr == MAP_FAILED) {
cq->ring_ptr = NULL;
ret = -errno;
goto err;
}
}
sq->khead = sq->ring_ptr + p->sq_off.head;
sq->ktail = sq->ring_ptr + p->sq_off.tail;
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
sq->kflags = sq->ring_ptr + p->sq_off.flags;
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
sq->array = sq->ring_ptr + p->sq_off.array;
size = p->sq_entries * sizeof(struct io_uring_sqe);
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
fd, IORING_OFF_SQES);
if (sq->sqes == MAP_FAILED) {
ret = -errno;
err:
io_uring_unmap_rings(sq, cq);
return ret;
}
cq->khead = cq->ring_ptr + p->cq_off.head;
cq->ktail = cq->ring_ptr + p->cq_off.tail;
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
return 0;
/*
* Must be called after io_uring_{peek,wait}_cqe() after the cqe has
* been processed by the application.
*/
static void io_uring_cqe_seen(struct io_uring *ring, struct io_uring_cqe *cqe) {
if (cqe)
io_uring_cq_advance(ring, 1);
}
#endif

View File

@ -1,9 +1,29 @@
#include <jni.h>
#include <stdint.h>
#include <stdlib.h>
#include <errno.h>
#define _GNU_SOURCE // RTLD_DEFAULT
#include "io_uring.h"
#include "netty_unix_errors.h"
#include "netty_unix_filedescriptor.h"
#include "netty_unix_jni.h"
#include "netty_unix_socket.h"
#include "netty_unix_util.h"
#include <dlfcn.h>
#include <errno.h>
#include <fcntl.h>
#include <jni.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "syscall.h"
#include <errno.h>
#include <fcntl.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
@ -13,9 +33,493 @@
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <syscall.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <fcntl.h>
#include <stdio.h>
#include <sys/stat.h>
#include <sys/types.h>
// From netty jni unix socket
static jsize addressLength(const struct sockaddr_storage *addr) {
int len = netty_unix_socket_ipAddressLength(addr);
if (len == 4) {
// Only encode port into it
return len + 4;
}
// we encode port + scope into it
return len + 8;
}
/*
* Sync internal state with kernel ring state on the SQ side. Returns the
* number of pending items in the SQ ring, for the shared ring.
*/
int io_uring_flush_sq(struct io_uring *ring) {
struct io_uring_sq *sq = &ring->sq;
const unsigned mask = *sq->kring_mask;
unsigned ktail, to_submit;
if (sq->sqe_head == sq->sqe_tail) {
ktail = *sq->ktail;
goto out;
}
/*
* Fill in sqes that we have queued up, adding them to the kernel ring
*/
ktail = *sq->ktail;
to_submit = sq->sqe_tail - sq->sqe_head;
while (to_submit--) {
sq->array[ktail & mask] = sq->sqe_head & mask;
ktail++;
sq->sqe_head++;
}
/*
* Ensure that the kernel sees the SQE updates before it sees the tail
* update.
*/
io_uring_smp_store_release(sq->ktail, ktail);
out:
return ktail - *sq->khead;
}
// From netty unix socket jni
static void initInetSocketAddressArray(JNIEnv *env,
const struct sockaddr_storage *addr,
jbyteArray bArray, int offset,
jsize len) {
int port;
if (addr->ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)addr;
port = ntohs(s->sin_port);
// Encode address and port into the array
unsigned char a[4];
a[0] = port >> 24;
a[1] = port >> 16;
a[2] = port >> 8;
a[3] = port;
(*env)->SetByteArrayRegion(env, bArray, offset, 4,
(jbyte *)&s->sin_addr.s_addr);
(*env)->SetByteArrayRegion(env, bArray, offset + 4, 4, (jbyte *)&a);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)addr;
port = ntohs(s->sin6_port);
if (len == 8) {
// IPv4-mapped-on-IPv6
// Encode port into the array and write it into the jbyteArray
unsigned char a[4];
a[0] = port >> 24;
a[1] = port >> 16;
a[2] = port >> 8;
a[3] = port;
// we only need the last 4 bytes for mapped address
(*env)->SetByteArrayRegion(env, bArray, offset, 4,
(jbyte *)&(s->sin6_addr.s6_addr[12]));
(*env)->SetByteArrayRegion(env, bArray, offset + 4, 4, (jbyte *)&a);
} else {
// Encode scopeid and port into the array
unsigned char a[8];
a[0] = s->sin6_scope_id >> 24;
a[1] = s->sin6_scope_id >> 16;
a[2] = s->sin6_scope_id >> 8;
a[3] = s->sin6_scope_id;
a[4] = port >> 24;
a[5] = port >> 16;
a[6] = port >> 8;
a[7] = port;
(*env)->SetByteArrayRegion(env, bArray, offset, 16,
(jbyte *)&(s->sin6_addr.s6_addr));
(*env)->SetByteArrayRegion(env, bArray, offset + 16, 8, (jbyte *)&a);
}
}
}
static struct io_uring_sqe *__io_uring_get_sqe(struct io_uring_sq *sq,
unsigned int __head) {
unsigned int __next = (sq)->sqe_tail + 1;
struct io_uring_sqe *__sqe = NULL;
if (__next - __head <= *(sq)->kring_entries) {
__sqe = &(sq)->sqes[(sq)->sqe_tail & *(sq)->kring_mask];
if (!__sqe) {
printf("SQE is null \n");
}
(sq)->sqe_tail = __next;
}
return __sqe;
}
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring) {
struct io_uring_sq *sq = &ring->sq;
return __io_uring_get_sqe(sq, sq->sqe_head);
}
static inline void io_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd,
const void *addr, unsigned len,
__u64 offset) {
sqe->opcode = op;
sqe->flags = 0;
sqe->ioprio = 0;
sqe->fd = fd;
sqe->off = offset;
sqe->addr = (unsigned long)addr;
sqe->len = len;
sqe->rw_flags = 0;
sqe->user_data = 0;
sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
}
void io_uring_unmap_rings(struct io_uring_sq *sq, struct io_uring_cq *cq) {
munmap(sq->ring_ptr, sq->ring_sz);
if (cq->ring_ptr && cq->ring_ptr != sq->ring_ptr)
munmap(cq->ring_ptr, cq->ring_sz);
}
int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq,
struct io_uring_cq *cq) {
size_t size;
int ret;
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
if (p->features & IORING_FEAT_SINGLE_MMAP) {
if (cq->ring_sz > sq->ring_sz)
sq->ring_sz = cq->ring_sz;
cq->ring_sz = sq->ring_sz;
}
sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
if (sq->ring_ptr == MAP_FAILED)
return -errno;
if (p->features & IORING_FEAT_SINGLE_MMAP) {
cq->ring_ptr = sq->ring_ptr;
} else {
cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
if (cq->ring_ptr == MAP_FAILED) {
cq->ring_ptr = NULL;
ret = -errno;
goto err;
}
}
sq->khead = sq->ring_ptr + p->sq_off.head;
sq->ktail = sq->ring_ptr + p->sq_off.tail;
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
sq->kflags = sq->ring_ptr + p->sq_off.flags;
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
sq->array = sq->ring_ptr + p->sq_off.array;
size = p->sq_entries * sizeof(struct io_uring_sqe);
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
fd, IORING_OFF_SQES);
if (sq->sqes == MAP_FAILED) {
ret = -errno;
err:
io_uring_unmap_rings(sq, cq);
return ret;
}
cq->khead = cq->ring_ptr + p->cq_off.head;
cq->ktail = cq->ring_ptr + p->cq_off.tail;
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
return 0;
}
void setup_io_uring(int ring_fd, struct io_uring *io_uring_ring,
struct io_uring_params *p) {
int ret;
ret = io_uring_mmap(ring_fd, p, &io_uring_ring->sq, &io_uring_ring->cq);
if (!ret) {
io_uring_ring->flags = p->flags;
io_uring_ring->ring_fd = ring_fd;
} else {
perror("setup_io_uring error \n");
}
}
void io_uring_prep_write(struct io_uring_sqe *sqe, int fd, const void *buf,
unsigned nbytes, off_t offset) {
io_uring_prep_rw(IORING_OP_WRITE, sqe, fd, buf, nbytes, offset);
}
void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf,
unsigned nbytes, off_t offset) {
io_uring_prep_rw(IORING_OP_READ, sqe, fd, buf, nbytes, offset);
}
void io_uring_sqe_set_data(struct io_uring_sqe *sqe, unsigned long data) {
sqe->user_data = (unsigned long)data;
}
void queue_read(int file_fd, struct io_uring *ring, void *buffer, jint event_id,
jint pos, jint limit) {
struct io_uring_sqe *sqe = NULL;
sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "Could not get SQE.\n");
return;
}
io_uring_prep_read(sqe, file_fd, buffer + pos, (size_t)(limit - pos), 0);
io_uring_sqe_set_data(sqe, (int)event_id);
}
void queue_write(int file_fd, struct io_uring *ring, void *buffer,
jint event_id, jint pos, jint limit) {
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "Could not get SQE.\n");
return;
}
io_uring_prep_write(sqe, file_fd, buffer + pos, (size_t)(limit - pos), 0);
io_uring_sqe_set_data(sqe, (unsigned long)event_id);
}
int __io_uring_peek_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr) {
struct io_uring_cqe *cqe;
unsigned head;
int err = 0;
do {
io_uring_for_each_cqe(ring, head, cqe) break;
break;
} while (1);
*cqe_ptr = cqe;
return err;
}
long io_uring_wait_cqe(struct io_uring *ring, unsigned wait_nr) {
struct io_uring_cqe *cqe = NULL;
int ret = 0, err;
unsigned flags = 0;
err = __io_uring_peek_cqe(ring, &cqe);
if (err) {
printf("error peek \n");
return -errno;
}
if (cqe) {
return (long)cqe;
}
flags = IORING_ENTER_GETEVENTS;
ret = sys_io_uring_enter(ring->ring_fd, 0, wait_nr, flags, NULL);
if (ret < 0) {
return -1;
} else if (ret == 0) {
err = __io_uring_peek_cqe(ring, &cqe);
if (err) {
printf("error peek \n");
return -1;
}
if (cqe) {
return (long)cqe;
}
}
return -1;
}
/*
* Submit sqes acquired from io_uring_get_sqe() to the kernel.
*
* Returns number of sqes submitted
*/
int io_uring_submit(struct io_uring *ring) {
int submitted = io_uring_flush_sq(ring);
int ret;
ret = sys_io_uring_enter(ring->ring_fd, submitted, 0, 0, NULL);
if (ret < 0)
return -errno;
return ret;
}
// all jni methods
static jlong netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) {
struct io_uring_params p;
memset(&p, 0, sizeof(p));
int ring_fd = sys_io_uring_setup((int)entries, &p);
struct io_uring *io_uring_ring =
(struct io_uring *)malloc(sizeof(struct io_uring));
io_uring_ring->flags = 0;
io_uring_ring->sq.sqe_tail = 0;
io_uring_ring->sq.sqe_head = 0;
setup_io_uring(ring_fd, io_uring_ring, &p);
return (long)io_uring_ring;
}
static jint netty_read_operation(JNIEnv *jenv, jclass clazz, jlong uring,
jlong fd, jlong event_id, jlong buffer_address,
jint pos, jint limit) {
queue_read((int)fd, (struct io_uring *)uring, (void *)buffer_address,
event_id, pos, limit);
return 0;
}
static jint netty_write_operation(JNIEnv *jenv, jclass clazz, jlong uring,
jlong fd, jlong event_id,
jlong buffer_address, jint pos, jint limit) {
queue_write((int)fd, (struct io_uring *)uring, (void *)buffer_address,
event_id, pos, limit);
return 0;
}
static jint netty_accept_operation(JNIEnv *env, jclass clazz, jlong uring,
jlong fd, jbyteArray byte_array) {
jint socketFd;
jsize len;
jbyte len_b;
int err;
struct sockaddr_storage addr;
socklen_t address_len = sizeof(addr);
socketFd = accept(fd, (struct sockaddr *)&addr, &address_len);
if ((err = errno) != EINTR) {
return -err;
}
len = addressLength(&addr);
len_b = (jbyte)len;
// Fill in remote address details
(*env)->SetByteArrayRegion(env, byte_array, 0, 1, (jbyte *)&len_b);
initInetSocketAddressArray(env, &addr, byte_array, 1, len);
return socketFd;
}
static jlong netty_wait_cqe(JNIEnv *env, jclass clazz, jlong uring) {
return (jlong)io_uring_wait_cqe((struct io_uring *)uring, 1);
}
static jlong netty_delete_cqe(JNIEnv *env, jclass clazz, jlong uring,
jlong cqe_address) {
struct io_uring_cqe *cqe = (struct io_uring_cqe *)cqe_address;
io_uring_cqe_seen((struct io_uring *)uring, cqe);
return 0;
}
static jlong netty_get_event_id(JNIEnv *env, jclass classz, jlong cqe_address) {
struct io_uring_cqe *cqe = (struct io_uring_cqe *)cqe_address;
return (long)cqe->user_data;
}
static jint netty_get_res(JNIEnv *env, jclass classz, jlong cqe_address) {
struct io_uring_cqe *cqe = (struct io_uring_cqe *)cqe_address;
return (long)cqe->res;
}
static jlong netty_close(JNIEnv *env, jclass classz, jlong io_uring) {
struct io_uring *ring = (struct io_uring *)io_uring;
struct io_uring_sq *sq = &ring->sq;
struct io_uring_cq *cq = &ring->cq;
munmap(sq->sqes, *sq->kring_entries * sizeof(struct io_uring_sqe));
io_uring_unmap_rings(sq, cq);
close(ring->ring_fd);
}
static jlong netty_submit(JNIEnv *jenv, jclass classz, jlong uring) {
return io_uring_submit((struct io_uring *)uring);
}
static jlong netty_create_file(JNIEnv *env, jclass class) {
return open("io-uring-test.txt", O_RDWR | O_TRUNC | O_CREAT, 0644);
}
// end jni methods
static void netty_io_uring_native_JNI_OnUnLoad(JNIEnv *env) {
// OnUnLoad
}
// JNI Registered Methods Begin
static jint netty_io_uring_close(JNIEnv *env, jclass clazz, jint fd) {
return 111;
}
// JNI Registered Methods End
// JNI Method Registration Table Begin
static const JNINativeMethod method_table[] = {
{"ioUringSetup", "(I)J", (void *)netty_io_uring_setup},
{"ioUringClose", "(J)J", (void *)netty_io_uring_close},
{"ioUringRead", "(JJJJII)I", (void *)netty_read_operation},
{"ioUringWrite", "(JJJJII)I", (void *)netty_write_operation},
{"ioUringAccept", "(JJ[B)I", (void *)netty_accept_operation},
{"ioUringWaitCqe", "(J)J", (void *)netty_wait_cqe},
{"ioUringDeleteCqe", "(JJ)J", (void *)netty_delete_cqe},
{"ioUringGetEventId", "(J)J", (void *)netty_get_event_id},
{"ioUringGetRes", "(J)I", (void *)netty_get_res},
{"ioUringSubmit", "(J)J", (void *)netty_submit},
{"createFile", "()J", (void *)netty_create_file}};
static const jint method_table_size =
sizeof(method_table) / sizeof(method_table[0]);
// JNI Method Registration Table End
JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) {
JNIEnv *env;
if ((*vm)->GetEnv(vm, (void **)&env, NETTY_JNI_VERSION) != JNI_OK) {
return JNI_ERR;
}
char *packagePrefix = NULL;
Dl_info dlinfo;
jint status = 0;
if (!dladdr((void *)netty_io_uring_native_JNI_OnUnLoad, &dlinfo)) {
fprintf(stderr,
"FATAL: transport-native-epoll JNI call to dladdr failed!\n");
return JNI_ERR;
}
packagePrefix = netty_unix_util_parse_package_prefix(
dlinfo.dli_fname, "netty_transport_native_io_uring", &status);
if (status == JNI_ERR) {
fprintf(stderr,
"FATAL: netty_transport_native_io_uring JNI encountered unexpected "
"dlinfo.dli_fname: %s\n",
dlinfo.dli_fname);
return JNI_ERR;
}
if (netty_unix_util_register_natives(env, packagePrefix,
"io/netty/channel/uring/Native",
method_table, method_table_size) != 0) {
printf("netty register natives error\n");
}
return NETTY_JNI_VERSION;
}

View File

@ -0,0 +1,50 @@
/* SPDX-License-Identifier: MIT */
/*
* Will go away once libc support is there
*/
#include "syscall.h"
#include <signal.h>
#include <sys/syscall.h>
#include <sys/uio.h>
#include <unistd.h>
#ifdef __alpha__
/*
* alpha is the only exception, all other architectures
* have common numbers for new system calls.
*/
#ifndef __NR_io_uring_setup
#define __NR_io_uring_setup 535
#endif
#ifndef __NR_io_uring_enter
#define __NR_io_uring_enter 536
#endif
#ifndef __NR_io_uring_register
#define __NR_io_uring_register 537
#endif
#else /* !__alpha__ */
#ifndef __NR_io_uring_setup
#define __NR_io_uring_setup 425
#endif
#ifndef __NR_io_uring_enter
#define __NR_io_uring_enter 426
#endif
#ifndef __NR_io_uring_register
#define __NR_io_uring_register 427
#endif
#endif
int sys_io_uring_register(int fd, unsigned opcode, const void *arg,
unsigned nr_args) {
return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
}
int sys_io_uring_setup(unsigned entries, struct io_uring_params *p) {
return syscall(__NR_io_uring_setup, entries, p);
}
int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
unsigned flags, sigset_t *sig) {
return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags, sig,
_NSIG / 8);
}

View File

@ -0,0 +1,17 @@
/* SPDX-License-Identifier: MIT */
#include <linux/io_uring.h>
#include <signal.h>
#ifndef LIBURING_SYSCALL_H
#define LIBURING_SYSCALL_H
/*
* System calls
*/
// extern int sys_io_uring_setup(unsigned entries, struct io_uring_params *p);
extern int sys_io_uring_setup(unsigned entries, struct io_uring_params *p);
extern int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
unsigned flags, sigset_t *sig);
extern int sys_io_uring_register(int fd, unsigned int opcode, const void *arg,
unsigned int nr_args);
#endif

View File

@ -84,20 +84,17 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
event.setOp(EventType.READ);
int error = socket.readEvent(ioUring, eventId, byteBuf.memoryAddress(), byteBuf.writerIndex(),
byteBuf.capacity());
byteBuf.capacity());
if (error == 0) {
ioUringEventLoop.addNewEvent(event);
}
}
}
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
return newDirectBuffer(buf, buf);
}
protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
@ -129,15 +126,13 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
@Override
protected void doDisconnect() throws Exception {
}
@Override
protected void doClose() throws Exception {
}
//Channel/ChannelHandlerContext.read() was called
// Channel/ChannelHandlerContext.read() was called
@Override
protected void doBeginRead() throws Exception {
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
@ -160,7 +155,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
while (readableBytes > 0) {
doWriteBytes(buf);
//have to move it to the eventloop
// have to move it to the eventloop
int newReadableBytes = buf.readableBytes();
in.progress(readableBytes - newReadableBytes);
readableBytes = newReadableBytes;
@ -190,7 +185,6 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
}
};
/**
* Create a new {@link } instance.
*
@ -200,7 +194,6 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
return new IOUringRecvByteAllocatorHandle(handle);
}
@Override
public IOUringRecvByteAllocatorHandle recvBufAllocHandle() {
if (allocHandle == null) {
@ -211,8 +204,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise) {
final ChannelPromise promise) {
}
final void executeUringReadOperator() {
@ -225,16 +217,14 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
public abstract void uringEventExecution();
}
@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
}
throw new UnsupportedOperationException(
"unsupported message type");
throw new UnsupportedOperationException("unsupported message type");
}
@Override

View File

@ -67,7 +67,7 @@ public class AbstractIOUringServerChannel extends AbstractIOUringChannel impleme
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise) {
final ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
@ -82,7 +82,7 @@ public class AbstractIOUringServerChannel extends AbstractIOUringChannel impleme
if (socket.acceptEvent(getIoUring(), eventId, acceptedAddress) == 0) {
ioUringEventLoop.addNewEvent(event);
Native.submit(getIoUring());
Native.ioUringSubmit(getIoUring());
}
}
}

View File

@ -19,6 +19,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.LongObjectHashMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import java.util.HashMap;
@ -26,19 +27,18 @@ import java.util.concurrent.Executor;
class IOUringEventLoop extends SingleThreadEventLoop {
//C pointer
// C pointer
private final long io_uring;
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
//events should be unique to identify which event type that was
// events should be unique to identify which event type that was
private long eventIdCounter;
private HashMap<Long, Event> events = new HashMap<Long, Event>();
private final LongObjectHashMap<Event> events = new LongObjectHashMap<Event>();
protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp,
final int maxPendingTasks,
final RejectedExecutionHandler rejectedExecutionHandler) {
final int maxPendingTasks, final RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.io_uring = Native.io_uring_setup(100);
this.io_uring = Native.ioUringSetup(32);
}
public long incrementEventIdCounter() {
@ -53,24 +53,25 @@ class IOUringEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (; ; ) {
//wait until an event has finished
final long cqe = Native.wait_cqe(io_uring);
final Event event = events.get(Native.getEventId(cqe));
final int ret = Native.getRes(cqe);
for (;;) {
// wait until an event has finished
final long cqe = Native.ioUringWaitCqe(io_uring);
final Event event = events.get(Native.ioUringGetEventId(cqe));
final int ret = Native.ioUringGetRes(cqe);
switch (event.getOp()) {
case ACCEPT:
//serverChannel is necessary to call newChildchannel
//create a new accept event
break;
case READ:
//need to save the Bytebuf before I execute the read operation fireChannelRead(byteBuf)
break;
case WRITE:
//you have to store Bytebuf to continue writing
break;
case ACCEPT:
// serverChannel is necessary to call newChildchannel
// create a new accept event
break;
case READ:
// need to save the Bytebuf before I execute the read operation
// fireChannelRead(byteBuf)
break;
case WRITE:
// you have to store Bytebuf to continue writing
break;
}
//processing Tasks
// processing Tasks
}
}
}

View File

@ -18,13 +18,11 @@ package io.netty.channel.uring;
import io.netty.channel.Channel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.ServerSocketChannelConfig;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel {
IOUringServerSocketChannel(Channel parent, LinuxSocket fd, boolean active,
long ioUring) {
IOUringServerSocketChannel(Channel parent, LinuxSocket fd, boolean active, long ioUring) {
super(parent, fd, active, ioUring);
}
@ -33,7 +31,6 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp
super.doBind(localAddress);
}
@Override
public boolean isOpen() {
return false;
@ -49,7 +46,6 @@ public class IOUringServerSocketChannel extends AbstractIOUringServerChannel imp
return (ServerSocketChannel) super.parent();
}
@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();

View File

@ -30,7 +30,6 @@ import io.netty.channel.unix.FileDescriptor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public class IOUringSocketChannel extends AbstractIOUringChannel implements SocketChannel {
IOUringSocketChannel(final Channel parent, final LinuxSocket fd, final boolean active, final long ioUring) {
@ -47,14 +46,12 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock
return null;
}
@Override
protected AbstractUringUnsafe newUnsafe() {
return new AbstractUringUnsafe() {
@Override
public void uringEventExecution() {
final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator();
@ -63,18 +60,14 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock
ByteBuf byteBuf = allocHandle.allocate(allocator);
doReadBytes(byteBuf);
}
};
}
@Override
public void doBind(SocketAddress localAddress) throws Exception {
}
@Override
public boolean isInputShutdown() {
return false;
@ -145,5 +138,3 @@ public class IOUringSocketChannel extends AbstractIOUringChannel implements Sock
return (InetSocketAddress) super.localAddress();
}
}

View File

@ -18,7 +18,7 @@ package io.netty.channel.uring;
import io.netty.channel.unix.Socket;
public class LinuxSocket extends Socket {
private long fd;
private final long fd;
public LinuxSocket(final int fd) {
super(fd);
@ -26,15 +26,15 @@ public class LinuxSocket extends Socket {
}
public int readEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
return Native.read(ring, fd, eventId, bufferAddress, pos, limit);
return Native.ioUringRead(ring, fd, eventId, bufferAddress, pos, limit);
}
public int writeEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
return Native.write(ring, fd, eventId, bufferAddress, pos, limit);
return Native.ioUringWrite(ring, fd, eventId, bufferAddress, pos, limit);
}
public int acceptEvent(long ring, long eventId, byte[] addr) {
return Native.accept(ring, eventId, addr);
return Native.ioUringAccept(ring, eventId, addr);
}
}

View File

@ -15,31 +15,76 @@
*/
package io.netty.channel.uring;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.util.internal.NativeLibraryLoader;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Locale;
public final class Native {
public static native long io_uring_setup(int entries);
public static native long getSQE(long io_uring);
static {
loadNativeLibrary();
}
public static native long getQC(long io_uring);
public static native long ioUringSetup(int entries);
public static native int read(long io_uring, long fd, long eventId, long bufferAddress, int pos,
int limit);
public static native int ioUringRead(long io_uring, long fd, long eventId, long bufferAddress, int pos, int limit);
public static native int write(long io_uring, long fd, long eventId, long bufferAddress, int pos,
int limit);
public static native int ioUringWrite(long io_uring, long fd, long eventId, long bufferAddress, int pos, int limit);
public static native int accept(long io_uring, long fd, byte[] addr);
public static native int ioUringAccept(long io_uring, long fd, byte[] addr);
//return id
public static native long wait_cqe(long io_uring);
// return id
public static native long ioUringWaitCqe(long io_uring);
public static native long deleteCqe(long io_uring, long cqeAddress);
public static native long ioUringDeleteCqe(long io_uring, long cqeAddress);
public static native long getEventId(long cqeAddress);
public static native long ioUringGetEventId(long cqeAddress);
public static native int getRes(long cqeAddress);
public static native int ioUringGetRes(long cqeAddress);
public static native long close(long io_uring);
public static native long ioUringClose(long io_uring);
public static native long submit(long io_uring);
public static native long ioUringSubmit(long io_uring);
public static native long ioUringGetSQE(long io_uring);
public static native long ioUringGetQC(long io_uring);
// for testing(it is only temporary)
public static native long createFile();
private Native() {
// utility
}
// From epoll native library
private static void loadNativeLibrary() {
String name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim();
if (!name.startsWith("linux")) {
throw new IllegalStateException("Only supported on Linux");
}
String staticLibName = "netty_transport_native_io_uring";
String sharedLibName = staticLibName + '_' + PlatformDependent.normalizedArch();
ClassLoader cl = PlatformDependent.getClassLoader(Native.class);
try {
NativeLibraryLoader.load(sharedLibName, cl);
} catch (UnsatisfiedLinkError e1) {
// try {
// NativeLibraryLoader.load(staticLibName, cl);
// System.out.println("Failed to load io_uring");
// } catch (UnsatisfiedLinkError e2) {
// ThrowableUtil.addSuppressed(e1, e2);
// throw e1;
// }
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.File;
import sun.misc.SharedSecrets;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.buffer.UnpooledUnsafeDirectByteBuf;
import static org.junit.Assert.*;
public class NativeTest {
@Test
public void test_io_uring() {
long uring = Native.ioUringSetup(32);
long fd = Native.createFile();
System.out.println("Fd: " + fd);
ByteBufAllocator allocator = new UnpooledByteBufAllocator(true);
UnpooledUnsafeDirectByteBuf directByteBufPooled = new UnpooledUnsafeDirectByteBuf(allocator, 500, 1000);
System.out.println("MemoryAddress: " + directByteBufPooled.hasMemoryAddress());
String inputString = "Hello World!";
byte[] byteArrray = inputString.getBytes();
directByteBufPooled.writeBytes(byteArrray);
Native.ioUringWrite(uring, fd, 1, directByteBufPooled.memoryAddress(), directByteBufPooled.readerIndex(),
directByteBufPooled.writerIndex());
Native.ioUringSubmit(uring);
long cqe = Native.ioUringWaitCqe(uring);
// ystem.out.println("Res: " + Native.ioUringGetRes(cqe));
assertEquals(12, Native.ioUringGetRes(cqe));
Native.ioUringClose(uring);
}
}