Move ring buffer logic to Java

Motivation:
JNI ring buffer implementation is inflexible and not really efficient

Modifications:

-RingBuffer instance is created in JNI which contains io_uring ring buffer information
-move the JNI ring buffer logic to Java
-added Todos
-using unsafe memory barriers loadFence and storeFence
-revert epoll file

Result:

this java ring buffer implementation is more flexible and efficient
This commit is contained in:
Josef Grieb 2020-07-07 12:58:25 +02:00
parent 962a3433ca
commit 692238f6da
15 changed files with 1187 additions and 1079 deletions

View File

@ -471,6 +471,18 @@ public final class PlatformDependent {
return new ConcurrentHashMap<K, V>(map);
}
public static void loadFence() {
PlatformDependent0.loadFence();
}
public static void storeFence() {
PlatformDependent0.storeFence();
}
public static void fullFence() {
PlatformDependent0.fullFence();
}
/**
* Try to deallocate the specified direct {@link ByteBuffer}. Please note this method does nothing if
* the current platform does not support this operation or the specified buffer is not a direct buffer.

View File

@ -597,6 +597,18 @@ final class PlatformDependent0 {
UNSAFE.putObject(o, offset, x);
}
static void loadFence() {
UNSAFE.loadFence();
}
static void storeFence() {
UNSAFE.storeFence();
}
static void fullFence() {
UNSAFE.fullFence();
}
static void copyMemory(long srcAddr, long dstAddr, long length) {
// Manual safe-point polling is only needed prior Java9:
// See https://bugs.openjdk.java.net/browse/JDK-8149596

File diff suppressed because it is too large Load Diff

View File

@ -46,141 +46,12 @@
#include <sys/stat.h>
#include <sys/types.h>
// From netty jni unix socket
static jsize addressLength(const struct sockaddr_storage *addr) {
int len = netty_unix_socket_ipAddressLength(addr);
if (len == 4) {
// Only encode port into it
return len + 4;
}
// we encode port + scope into it
return len + 8;
}
/*
* Sync internal state with kernel ring state on the SQ side. Returns the
* number of pending items in the SQ ring, for the shared ring.
*/
int io_uring_flush_sq(struct io_uring *ring) {
struct io_uring_sq *sq = &ring->sq;
const unsigned mask = *sq->kring_mask;
unsigned ktail, to_submit;
if (sq->sqe_head == sq->sqe_tail) {
ktail = *sq->ktail;
goto out;
}
/*
* Fill in sqes that we have queued up, adding them to the kernel ring
*/
ktail = *sq->ktail;
to_submit = sq->sqe_tail - sq->sqe_head;
while (to_submit--) {
sq->array[ktail & mask] = sq->sqe_head & mask;
ktail++;
sq->sqe_head++;
}
/*
* Ensure that the kernel sees the SQE updates before it sees the tail
* update.
*/
io_uring_smp_store_release(sq->ktail, ktail);
out:
return ktail - *sq->khead;
}
// From netty unix socket jni
static void initInetSocketAddressArray(JNIEnv *env,
const struct sockaddr_storage *addr,
jbyteArray bArray, int offset,
jsize len) {
int port;
if (addr->ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)addr;
port = ntohs(s->sin_port);
// Encode address and port into the array
unsigned char a[4];
a[0] = port >> 24;
a[1] = port >> 16;
a[2] = port >> 8;
a[3] = port;
(*env)->SetByteArrayRegion(env, bArray, offset, 4,
(jbyte *)&s->sin_addr.s_addr);
(*env)->SetByteArrayRegion(env, bArray, offset + 4, 4, (jbyte *)&a);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)addr;
port = ntohs(s->sin6_port);
if (len == 8) {
// IPv4-mapped-on-IPv6
// Encode port into the array and write it into the jbyteArray
unsigned char a[4];
a[0] = port >> 24;
a[1] = port >> 16;
a[2] = port >> 8;
a[3] = port;
// we only need the last 4 bytes for mapped address
(*env)->SetByteArrayRegion(env, bArray, offset, 4,
(jbyte *)&(s->sin6_addr.s6_addr[12]));
(*env)->SetByteArrayRegion(env, bArray, offset + 4, 4, (jbyte *)&a);
} else {
// Encode scopeid and port into the array
unsigned char a[8];
a[0] = s->sin6_scope_id >> 24;
a[1] = s->sin6_scope_id >> 16;
a[2] = s->sin6_scope_id >> 8;
a[3] = s->sin6_scope_id;
a[4] = port >> 24;
a[5] = port >> 16;
a[6] = port >> 8;
a[7] = port;
(*env)->SetByteArrayRegion(env, bArray, offset, 16,
(jbyte *)&(s->sin6_addr.s6_addr));
(*env)->SetByteArrayRegion(env, bArray, offset + 16, 8, (jbyte *)&a);
}
}
}
static struct io_uring_sqe *__io_uring_get_sqe(struct io_uring_sq *sq,
unsigned int __head) {
unsigned int __next = (sq)->sqe_tail + 1;
struct io_uring_sqe *__sqe = NULL;
if (__next - __head <= *(sq)->kring_entries) {
__sqe = &(sq)->sqes[(sq)->sqe_tail & *(sq)->kring_mask];
if (!__sqe) {
printf("SQE is null \n");
}
(sq)->sqe_tail = __next;
}
return __sqe;
}
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring) {
struct io_uring_sq *sq = &ring->sq;
return __io_uring_get_sqe(sq, sq->sqe_head);
}
static inline void io_uring_prep_rw(int op, struct io_uring_sqe *sqe, int fd,
const void *addr, unsigned len,
__u64 offset) {
sqe->opcode = op;
sqe->flags = 0;
sqe->ioprio = 0;
sqe->fd = fd;
sqe->off = offset;
sqe->addr = (unsigned long)addr;
sqe->len = len;
sqe->rw_flags = 0;
sqe->user_data = 0;
sqe->__pad2[0] = sqe->__pad2[1] = sqe->__pad2[2] = 0;
}
static jmethodID ringBufferMethodId = NULL;
static jmethodID ioUringSubmissionQueueMethodId = NULL;
static jmethodID ioUringCommpletionQueueMethodId = NULL;
static jclass ringBufferClass = NULL;
static jclass ioUringCompletionQueueClass = NULL;
static jclass ioUringSubmissionQueueClass = NULL;
void io_uring_unmap_rings(struct io_uring_sq *sq, struct io_uring_cq *cq) {
munmap(sq->ring_ptr, sq->ring_sz);
@ -258,242 +129,78 @@ void setup_io_uring(int ring_fd, struct io_uring *io_uring_ring,
}
}
void io_uring_prep_write(struct io_uring_sqe *sqe, int fd, const void *buf,
unsigned nbytes, off_t offset) {
io_uring_prep_rw(IORING_OP_WRITE, sqe, fd, buf, nbytes, offset);
static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint to_submit,
jint min_complete, jint flags) {
return sys_io_uring_enter(ring_fd, to_submit, min_complete, flags, NULL);
}
void io_uring_prep_read(struct io_uring_sqe *sqe, int fd, void *buf,
unsigned nbytes, off_t offset) {
io_uring_prep_rw(IORING_OP_READ, sqe, fd, buf, nbytes, offset);
}
void io_uring_sqe_set_data(struct io_uring_sqe *sqe, unsigned long data) {
sqe->user_data = (unsigned long)data;
}
void queue_read(int file_fd, struct io_uring *ring, void *buffer, jint event_id,
jint pos, jint limit) {
struct io_uring_sqe *sqe = NULL;
sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "Could not get SQE.\n");
return;
}
io_uring_prep_read(sqe, file_fd, buffer + pos, (size_t)(limit - pos), 0);
io_uring_sqe_set_data(sqe, (int)event_id);
}
void queue_write(int file_fd, struct io_uring *ring, void *buffer,
jint event_id, jint pos, jint limit) {
struct io_uring_sqe *sqe;
sqe = io_uring_get_sqe(ring);
if (!sqe) {
fprintf(stderr, "Could not get SQE.\n");
return;
}
io_uring_prep_write(sqe, file_fd, buffer + pos, (size_t)(limit - pos), 0);
io_uring_sqe_set_data(sqe, (unsigned long)event_id);
}
int __io_uring_peek_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr) {
struct io_uring_cqe *cqe;
unsigned head;
int err = 0;
do {
io_uring_for_each_cqe(ring, head, cqe) break;
break;
} while (1);
*cqe_ptr = cqe;
return err;
}
long io_uring_wait_cqe(struct io_uring *ring, unsigned wait_nr) {
struct io_uring_cqe *cqe = NULL;
int ret = 0, err;
unsigned flags = 0;
err = __io_uring_peek_cqe(ring, &cqe);
if (err) {
printf("error peek \n");
return -errno;
}
if (cqe) {
return (long)cqe;
}
flags = IORING_ENTER_GETEVENTS;
ret = sys_io_uring_enter(ring->ring_fd, 0, wait_nr, flags, NULL);
if (ret < 0) {
return -1;
} else if (ret == 0) {
err = __io_uring_peek_cqe(ring, &cqe);
if (err) {
printf("error peek \n");
return -1;
}
if (cqe) {
return (long)cqe;
}
}
return -1;
}
/*
* Submit sqes acquired from io_uring_get_sqe() to the kernel.
*
* Returns number of sqes submitted
*/
int io_uring_submit(struct io_uring *ring) {
int submitted = io_uring_flush_sq(ring);
int ret;
ret = sys_io_uring_enter(ring->ring_fd, submitted, 0, 0, NULL);
if (ret < 0)
return -errno;
return ret;
}
// all jni methods
static jlong netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) {
static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) {
struct io_uring_params p;
memset(&p, 0, sizeof(p));
int ring_fd = sys_io_uring_setup((int)entries, &p);
struct io_uring *io_uring_ring =
(struct io_uring *)malloc(sizeof(struct io_uring));
io_uring_ring->flags = 0;
io_uring_ring->sq.sqe_tail = 0;
io_uring_ring->sq.sqe_head = 0;
setup_io_uring(ring_fd, io_uring_ring, &p);
return (long)io_uring_ring;
}
static jint netty_read_operation(JNIEnv *jenv, jclass clazz, jlong uring,
jlong fd, jlong event_id, jlong buffer_address,
jint pos, jint limit) {
queue_read((int)fd, (struct io_uring *)uring, (void *)buffer_address,
event_id, pos, limit);
return 0;
}
static jint netty_write_operation(JNIEnv *jenv, jclass clazz, jlong uring,
jlong fd, jlong event_id,
jlong buffer_address, jint pos, jint limit) {
queue_write((int)fd, (struct io_uring *)uring, (void *)buffer_address,
event_id, pos, limit);
return 0;
}
static jint netty_accept_operation(JNIEnv *env, jclass clazz, jlong uring,
jlong fd, jbyteArray byte_array) {
jint socketFd;
jsize len;
jbyte len_b;
int err;
struct sockaddr_storage addr;
socklen_t address_len = sizeof(addr);
socketFd = accept(fd, (struct sockaddr *)&addr, &address_len);
if ((err = errno) != EINTR) {
return -err;
//Todo
if (ring_fd < -1) {
//throw Exception
return NULL;
}
len = addressLength(&addr);
len_b = (jbyte)len;
struct io_uring io_uring_ring;
//memset instead
io_uring_ring.flags = 0;
io_uring_ring.sq.sqe_tail = 0;
io_uring_ring.sq.sqe_head = 0;
setup_io_uring(ring_fd, &io_uring_ring, &p);
// Fill in remote address details
(*env)->SetByteArrayRegion(env, byte_array, 0, 1, (jbyte *)&len_b);
initInetSocketAddressArray(env, &addr, byte_array, 1, len);
return socketFd;
}
jobject ioUringSubmissionQueue = (*env)->NewObject(
env, ioUringSubmissionQueueClass, ioUringSubmissionQueueMethodId,
(jlong)io_uring_ring.sq.khead, (jlong)io_uring_ring.sq.ktail,
(jlong)io_uring_ring.sq.kring_mask,
(jlong)io_uring_ring.sq.kring_entries, (jlong)io_uring_ring.sq.kflags,
(jlong)io_uring_ring.sq.kdropped, (jlong)io_uring_ring.sq.array,
(jlong)io_uring_ring.sq.sqes, (jlong)io_uring_ring.sq.ring_sz,
(jlong)io_uring_ring.cq.ring_ptr, (jint)ring_fd);
static jlong netty_wait_cqe(JNIEnv *env, jclass clazz, jlong uring) {
return (jlong)io_uring_wait_cqe((struct io_uring *)uring, 1);
}
jobject ioUringCompletionQueue = (*env)->NewObject(
env, ioUringCompletionQueueClass, ioUringCommpletionQueueMethodId,
(jlong)io_uring_ring.cq.khead, (jlong)io_uring_ring.cq.ktail,
(jlong)io_uring_ring.cq.kring_mask,
(jlong)io_uring_ring.cq.kring_entries,
(jlong)io_uring_ring.cq.koverflow, (jlong)io_uring_ring.cq.cqes,
(jlong)io_uring_ring.cq.ring_sz, (jlong)io_uring_ring.cq.ring_ptr,
(jint)ring_fd);
static jlong netty_delete_cqe(JNIEnv *env, jclass clazz, jlong uring,
jlong cqe_address) {
struct io_uring_cqe *cqe = (struct io_uring_cqe *)cqe_address;
io_uring_cqe_seen((struct io_uring *)uring, cqe);
return 0;
}
jobject ringBuffer =
(*env)->NewObject(env, ringBufferClass, ringBufferMethodId,
ioUringSubmissionQueue, ioUringCompletionQueue);
static jlong netty_get_event_id(JNIEnv *env, jclass classz, jlong cqe_address) {
struct io_uring_cqe *cqe = (struct io_uring_cqe *)cqe_address;
return (long)cqe->user_data;
}
static jint netty_get_res(JNIEnv *env, jclass classz, jlong cqe_address) {
struct io_uring_cqe *cqe = (struct io_uring_cqe *)cqe_address;
return (long)cqe->res;
}
static jlong netty_close(JNIEnv *env, jclass classz, jlong io_uring) {
struct io_uring *ring = (struct io_uring *)io_uring;
struct io_uring_sq *sq = &ring->sq;
struct io_uring_cq *cq = &ring->cq;
munmap(sq->sqes, *sq->kring_entries * sizeof(struct io_uring_sqe));
io_uring_unmap_rings(sq, cq);
close(ring->ring_fd);
}
static jlong netty_submit(JNIEnv *jenv, jclass classz, jlong uring) {
return io_uring_submit((struct io_uring *)uring);
return ringBuffer;
}
static jlong netty_create_file(JNIEnv *env, jclass class) {
return open("io-uring-test.txt", O_RDWR | O_TRUNC | O_CREAT, 0644);
}
// end jni methods
static void netty_io_uring_native_JNI_OnUnLoad(JNIEnv *env) {
// OnUnLoad
}
// JNI Registered Methods Begin
static jint netty_io_uring_close(JNIEnv *env, jclass clazz, jint fd) {
return 111;
}
// JNI Registered Methods End
// JNI Method Registration Table Begin
static const JNINativeMethod method_table[] = {
{"ioUringSetup", "(I)J", (void *)netty_io_uring_setup},
{"ioUringClose", "(J)J", (void *)netty_io_uring_close},
{"ioUringRead", "(JJJJII)I", (void *)netty_read_operation},
{"ioUringWrite", "(JJJJII)I", (void *)netty_write_operation},
{"ioUringAccept", "(JJ[B)I", (void *)netty_accept_operation},
{"ioUringWaitCqe", "(J)J", (void *)netty_wait_cqe},
{"ioUringDeleteCqe", "(JJ)J", (void *)netty_delete_cqe},
{"ioUringGetEventId", "(J)J", (void *)netty_get_event_id},
{"ioUringGetRes", "(J)I", (void *)netty_get_res},
{"ioUringSubmit", "(J)J", (void *)netty_submit},
{"createFile", "()J", (void *)netty_create_file}};
{"ioUringSetup", "(I)Lio/netty/channel/uring/RingBuffer;",
(void *)netty_io_uring_setup},
{"createFile", "()J", (void *)netty_create_file},
{"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter}};
static const jint method_table_size =
sizeof(method_table) / sizeof(method_table[0]);
// JNI Method Registration Table End
JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) {
JNIEnv *env;
char *nettyClassName = NULL;
if ((*vm)->GetEnv(vm, (void **)&env, NETTY_JNI_VERSION) != JNI_OK) {
return JNI_ERR;
}
@ -521,5 +228,31 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) {
method_table, method_table_size) != 0) {
printf("netty register natives error\n");
}
NETTY_PREPEND(packagePrefix, "io/netty/channel/uring/RingBuffer",
nettyClassName, done);
NETTY_LOAD_CLASS(env, ringBufferClass, nettyClassName, done);
NETTY_GET_METHOD(env, ringBufferClass, ringBufferMethodId, "<init>",
"(Lio/netty/channel/uring/IOUringSubmissionQueue;Lio/netty/"
"channel/uring/IOUringCompletionQueue;)V",
done);
NETTY_PREPEND(packagePrefix, "io/netty/channel/uring/IOUringSubmissionQueue",
nettyClassName, done);
NETTY_LOAD_CLASS(env, ioUringSubmissionQueueClass, nettyClassName, done);
NETTY_GET_METHOD(env, ioUringSubmissionQueueClass,
ioUringSubmissionQueueMethodId, "<init>", "(JJJJJJJJIJI)V",
done);
NETTY_PREPEND(packagePrefix, "io/netty/channel/uring/IOUringCompletionQueue",
nettyClassName, done);
NETTY_LOAD_CLASS(env, ioUringCompletionQueueClass, nettyClassName, done);
NETTY_GET_METHOD(env, ioUringCompletionQueueClass,
ioUringCommpletionQueueMethodId, "<init>", "(JJJJJJIJI)V",
done);
done:
//unload
return NETTY_JNI_VERSION;
}

View File

@ -83,11 +83,12 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
event.setId(eventId);
event.setOp(EventType.READ);
int error = socket.readEvent(ioUring, eventId, byteBuf.memoryAddress(), byteBuf.writerIndex(),
byteBuf.capacity());
if (error == 0) {
ioUringEventLoop.addNewEvent(event);
}
//Todo
//int error = socket.readEvent(ioUring, eventId, byteBuf.memoryAddress(), byteBuf.writerIndex(),
// byteBuf.capacity());
// if (error == 0) {
// ioUringEventLoop.addNewEvent(event);
// }
}
}
@ -171,7 +172,7 @@ public abstract class AbstractIOUringChannel extends AbstractChannel implements
long eventId = ioUringEventLoop.incrementEventIdCounter();
event.setId(eventId);
event.setOp(EventType.WRITE);
socket.writeEvent(ioUring, eventId, buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
//socket.writeEvent(ioUring, eventId, buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
}
}

View File

@ -80,10 +80,11 @@ public class AbstractIOUringServerChannel extends AbstractIOUringChannel impleme
event.setId(eventId);
event.setOp(EventType.ACCEPT);
if (socket.acceptEvent(getIoUring(), eventId, acceptedAddress) == 0) {
ioUringEventLoop.addNewEvent(event);
Native.ioUringSubmit(getIoUring());
}
//Todo
// if (socket.acceptEvent(getIoUring(), eventId, acceptedAddress) == 0) {
// ioUringEventLoop.addNewEvent(event);
// Native.ioUringSubmit(getIoUring());
// }
}
}
}

View File

@ -16,7 +16,15 @@
package io.netty.channel.uring;
public enum EventType {
ACCEPT,
READ,
WRITE
ACCEPT(13),
READ(22),
WRITE(23);
private final int op;
EventType(int op) {
this.op = op;
}
public int getOp() {
return op;
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.util.internal.PlatformDependent;
public class IOUringCompletionQueue {
private final int CQE_USER_DATA_FIELD = 0;
private final int CQE_RES_FIELD = 8;
private final int CQE_FLAGS_FIELD = 12;
private final int CQE_SIZE = 16;
private final int IORING_ENTER_GETEVENTS = 1;
// (k -> kernel)
private final long kHeadAddress;
private final long kTailAddress;
private final long kringMaskAddress;
private final long kringEntries;
private final long kOverflowAddress;
private final long completionQueueArrayAddress;
private final int ringSize;
private final long ringAddress;
private final int ringFd;
public IOUringCompletionQueue(long kHeadAddress, long kTailAddress, long kringMaskAddress, long kringEntries,
long kOverflowAddress, long completionQueueArrayAddress, int ringSize, long ringAddress, int ringFd) {
this.kHeadAddress = kHeadAddress;
this.kTailAddress = kTailAddress;
this.kringMaskAddress = kringMaskAddress;
this.kringEntries = kringEntries;
this.kOverflowAddress = kOverflowAddress;
this.completionQueueArrayAddress = completionQueueArrayAddress;
this.ringSize = ringSize;
this.ringAddress = ringAddress;
this.ringFd = ringFd;
}
private IOUringCqe peek() {
long cqe = 0;
long head = toUnsignedLong(PlatformDependent.getInt(kHeadAddress));
//aquire memory barrier https://openjdk.java.net/jeps/171
PlatformDependent.loadFence();
if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) {
long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress));
cqe = index * CQE_SIZE + completionQueueArrayAddress;
long eventId = PlatformDependent.getLong(cqe + CQE_USER_DATA_FIELD);
int res = PlatformDependent.getInt(cqe + CQE_RES_FIELD);
long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD));
//Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
PlatformDependent.storeFence();
PlatformDependent.putInt(kHeadAddress, (int) (head + 1));
return new IOUringCqe(eventId, res, flags);
}
return null;
}
public IOUringCqe ioUringWaitCqe() {
IOUringCqe ioUringCqe = peek();
if (ioUringCqe != null) {
return ioUringCqe;
}
int ret = Native.ioUringEnter(ringFd, 0, 1, IORING_ENTER_GETEVENTS);
if (ret < 0) {
//Todo throw exception!
return null;
} else if (ret == 0) {
ioUringCqe = peek();
if (ioUringCqe != null) {
return ioUringCqe;
}
}
//Todo throw Exception!
return null;
}
public long getKHeadAddress() {
return this.kHeadAddress;
}
public long getKTailAddress() {
return this.kTailAddress;
}
public long getKringMaskAddress() {
return this.kringMaskAddress;
}
public long getKringEntries() {
return this.kringEntries;
}
public long getKOverflowAddress() {
return this.kOverflowAddress;
}
public long getCompletionQueueArrayAddress() {
return this.completionQueueArrayAddress;
}
public int getRingSize() {
return this.ringSize;
}
public long getRingAddress() {
return this.ringAddress;
}
//Todo Integer.toUnsignedLong -> maven checkstyle error
public static long toUnsignedLong(int x) {
return ((long) x) & 0xffffffffL;
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
public class IOUringCqe {
private final long eventId;
private final int res;
private final long flags;
public IOUringCqe(long eventId, int res, long flags) {
this.eventId = eventId;
this.res = res;
this.flags = flags;
}
public long getEventId() {
return this.eventId;
}
public int getRes() {
return this.res;
}
public long getFlags() {
return this.flags;
}
}

View File

@ -27,8 +27,6 @@ import java.util.concurrent.Executor;
class IOUringEventLoop extends SingleThreadEventLoop {
// C pointer
private final long io_uring;
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
// events should be unique to identify which event type that was
@ -38,7 +36,6 @@ class IOUringEventLoop extends SingleThreadEventLoop {
protected IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp,
final int maxPendingTasks, final RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
this.io_uring = Native.ioUringSetup(32);
}
public long incrementEventIdCounter() {
@ -53,24 +50,25 @@ class IOUringEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
//Todo
for (;;) {
// wait until an event has finished
final long cqe = Native.ioUringWaitCqe(io_uring);
final Event event = events.get(Native.ioUringGetEventId(cqe));
final int ret = Native.ioUringGetRes(cqe);
switch (event.getOp()) {
case ACCEPT:
// serverChannel is necessary to call newChildchannel
// create a new accept event
break;
case READ:
// need to save the Bytebuf before I execute the read operation
// fireChannelRead(byteBuf)
break;
case WRITE:
// you have to store Bytebuf to continue writing
break;
}
//final long cqe = Native.ioUringWaitCqe(io_uring);
//final Event event = events.get(Native.ioUringGetEventId(cqe));
//final int ret = Native.ioUringGetRes(cqe);
// switch (event.getOp()) {
// case ACCEPT:
// // serverChannel is necessary to call newChildchannel
// // create a new accept event
// break;
// case READ:
// // need to save the Bytebuf before I execute the read operation
// // fireChannelRead(byteBuf)
// break;
// case WRITE:
// // you have to store Bytebuf to continue writing
// break;
// }
// processing Tasks
}
}

View File

@ -0,0 +1,218 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.util.internal.PlatformDependent;
public class IOUringSubmissionQueue {
private final int SQE_SIZE = 64;
private final int INT_SIZE = 4;
private final int SQE_OP_CODE_FIELD = 0;
private final int SQE_FLAGS_FIELD = 1;
private final int SQE_IOPRIO_FIELD = 2; // u16
private final int SQE_FD_FIELD = 4; // s32
private final int SQE_OFFSET_FIELD = 8;
private final int SQE_ADDRESS_FIELD = 16;
private final int SQE_LEN_FIELD = 24;
private final int SQE_RW_FLAGS_FIELD = 28;
private final int SQE_USER_DATA_FIELD = 32;
private final int SQE_PAD_FIELD = 40;
// (k -> kernel)
private final long kHeadAddress;
private final long kTailAddress;
private final long kRingMaskAddress;
private final long kRingEntriesAddress;
private final long fFlagsAdress;
private final long kDroppedAddress;
private final long arrayAddress;
private final long submissionQueueArrayAddress;
private long sqeHead;
private long sqeTail;
private final int ringSize;
private final long ringAddress;
private final int ringFd;
public IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, int ringSize,
long ringAddress, int ringFd) {
this.kHeadAddress = kHeadAddress;
this.kTailAddress = kTailAddress;
this.kRingMaskAddress = kRingMaskAddress;
this.kRingEntriesAddress = kRingEntriesAddress;
this.fFlagsAdress = fFlagsAdress;
this.kDroppedAddress = kDroppedAddress;
this.arrayAddress = arrayAddress;
this.submissionQueueArrayAddress = submissionQueueArrayAddress;
this.ringSize = ringSize;
this.ringAddress = ringAddress;
this.ringFd = ringFd;
}
public long getSqe() {
long next = sqeTail + 1;
long kRingEntries = toUnsignedLong(PlatformDependent.getInt(kRingEntriesAddress));
long sqe = 0;
if ((next - sqeHead) <= kRingEntries) {
long index = sqeTail & toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress));
sqe = SQE_SIZE * index + submissionQueueArrayAddress;
sqeTail = next;
}
return sqe;
}
private void setData(long sqe, long eventId, EventType type, int fd, long bufferAddress, int length, long offset) {
//Todo cleaner
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp());
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0);
PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress);
PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId);
// pad field array -> all fields should be zero
long offsetIndex = 0;
for (int i = 0; i < 3; i++) {
PlatformDependent.putLong(sqe + SQE_PAD_FIELD + offsetIndex, 0);
offsetIndex += 8;
}
System.out.println("OPField: " + PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD));
System.out.println("UserDataField: " + PlatformDependent.getByte(sqe + SQE_USER_DATA_FIELD));
}
public boolean add(long eventId, EventType type, int fd, long bufferAddress, int pos, int limit) {
long sqe = getSqe();
if (sqe == 0) {
return false;
}
setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0);
return true;
}
private int flushSqe() {
long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress));
long kHead = toUnsignedLong(PlatformDependent.getInt(kHeadAddress));
long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress));
System.out.println("Ktail: " + kTail);
System.out.println("Ktail: " + kHead);
System.out.println("SqeHead: " + sqeHead);
System.out.println("SqeTail: " + sqeTail);
if (sqeHead == sqeTail) {
return (int) (kTail - kHead);
}
long toSubmit = sqeTail - sqeHead;
while (toSubmit > 0) {
long index = kTail & kRingMask;
PlatformDependent.putInt(arrayAddress + index * INT_SIZE, (int) (sqeHead & kRingMask));
sqeHead++;
kTail++;
toSubmit--;
}
//release memory barrier
PlatformDependent.storeFence();
PlatformDependent.putInt(kTailAddress, (int) kTail);
return (int) (kTail - kHead);
}
public void submit() {
int submitted = flushSqe();
System.out.println("Submitted: " + submitted);
int ret = Native.ioUringEnter(ringFd, submitted, 0, 0);
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall");
}
}
public void setSqeHead(long sqeHead) {
this.sqeHead = sqeHead;
}
public void setSqeTail(long sqeTail) {
this.sqeTail = sqeTail;
}
public long getKHeadAddress() {
return this.kHeadAddress;
}
public long getKTailAddress() {
return this.kTailAddress;
}
public long getKRingMaskAddress() {
return this.kRingMaskAddress;
}
public long getKRingEntriesAddress() {
return this.kRingEntriesAddress;
}
public long getFFlagsAdress() {
return this.fFlagsAdress;
}
public long getKDroppedAddress() {
return this.kDroppedAddress;
}
public long getArrayAddress() {
return this.arrayAddress;
}
public long getSubmissionQueueArrayAddress() {
return this.submissionQueueArrayAddress;
}
public long getSqeHead() {
return this.sqeHead;
}
public long getSqeTail() {
return this.sqeTail;
}
public int getRingSize() {
return this.ringSize;
}
public long getRingAddress() {
return this.ringAddress;
}
//Todo Integer.toUnsignedLong -> maven checkstyle error
public static long toUnsignedLong(int x) {
return ((long) x) & 0xffffffffL;
}
}

View File

@ -18,23 +18,25 @@ package io.netty.channel.uring;
import io.netty.channel.unix.Socket;
public class LinuxSocket extends Socket {
private final long fd;
private final int fd;
public LinuxSocket(final int fd) {
super(fd);
this.fd = fd;
}
public int readEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
return Native.ioUringRead(ring, fd, eventId, bufferAddress, pos, limit);
}
//Todo
public int writeEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
return Native.ioUringWrite(ring, fd, eventId, bufferAddress, pos, limit);
}
// public int readEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
// return Native.ioUringRead(ring, fd, eventId, bufferAddress, pos, limit);
// }
public int acceptEvent(long ring, long eventId, byte[] addr) {
return Native.ioUringAccept(ring, eventId, addr);
}
// public int writeEvent(long ring, long eventId, long bufferAddress, int pos, int limit) {
// return Native.ioUringWrite(ring, fd, eventId, bufferAddress, pos, limit);
// }
// public int acceptEvent(long ring, long eventId, byte[] addr) {
// return Native.ioUringAccept(ring, eventId, addr);
// }
}

View File

@ -30,34 +30,24 @@ import java.util.Locale;
public final class Native {
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32);
static {
loadNativeLibrary();
}
public static native long ioUringSetup(int entries);
public static RingBuffer createRingBuffer(int ringSize) {
//Todo throw Exception if it's null
return ioUringSetup(ringSize);
}
public static native int ioUringRead(long io_uring, long fd, long eventId, long bufferAddress, int pos, int limit);
public static RingBuffer createRingBuffer() {
//Todo throw Exception if it's null
return ioUringSetup(DEFAULT_RING_SIZE);
}
public static native int ioUringWrite(long io_uring, long fd, long eventId, long bufferAddress, int pos, int limit);
private static native RingBuffer ioUringSetup(int entries);
public static native int ioUringAccept(long io_uring, long fd, byte[] addr);
// return id
public static native long ioUringWaitCqe(long io_uring);
public static native long ioUringDeleteCqe(long io_uring, long cqeAddress);
public static native long ioUringGetEventId(long cqeAddress);
public static native int ioUringGetRes(long cqeAddress);
public static native long ioUringClose(long io_uring);
public static native long ioUringSubmit(long io_uring);
public static native long ioUringGetSQE(long io_uring);
public static native long ioUringGetQC(long io_uring);
public static native int ioUringEnter(int ringFd, int toSubmit, int minComplete, int flags);
// for testing(it is only temporary)
public static native long createFile();
@ -66,9 +56,9 @@ public final class Native {
// utility
}
// From epoll native library
// From io_uring native library
private static void loadNativeLibrary() {
String name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim();
String name = PlatformDependent.normalizedOs().toLowerCase(Locale.UK).trim();
if (!name.startsWith("linux")) {
throw new IllegalStateException("Only supported on Linux");
}
@ -78,13 +68,13 @@ public final class Native {
try {
NativeLibraryLoader.load(sharedLibName, cl);
} catch (UnsatisfiedLinkError e1) {
// try {
// NativeLibraryLoader.load(staticLibName, cl);
// System.out.println("Failed to load io_uring");
// } catch (UnsatisfiedLinkError e2) {
// ThrowableUtil.addSuppressed(e1, e2);
// throw e1;
// }
try {
NativeLibraryLoader.load(staticLibName, cl);
System.out.println("Failed to load io_uring");
} catch (UnsatisfiedLinkError e2) {
ThrowableUtil.addSuppressed(e1, e2);
throw e1;
}
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.util.internal.PlatformDependent;
public class RingBuffer {
private final IOUringSubmissionQueue ioUringSubmissionQueue;
private final IOUringCompletionQueue ioUringCompletionQueue;
public RingBuffer(IOUringSubmissionQueue ioUringSubmissionQueue, IOUringCompletionQueue ioUringCompletionQueue) {
this.ioUringSubmissionQueue = ioUringSubmissionQueue;
this.ioUringCompletionQueue = ioUringCompletionQueue;
}
public IOUringSubmissionQueue getIoUringSubmissionQueue() {
return this.ioUringSubmissionQueue;
}
public IOUringCompletionQueue getIoUringCompletionQueue() {
return this.ioUringCompletionQueue;
}
}

View File

@ -18,8 +18,8 @@ package io.netty.channel.uring;
import org.junit.Test;
import java.io.FileInputStream;
import java.io.File;
import sun.misc.SharedSecrets;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
@ -29,30 +29,35 @@ import static org.junit.Assert.*;
public class NativeTest {
@Test
public void test_io_uring() {
long uring = Native.ioUringSetup(32);
long fd = Native.createFile();
System.out.println("Fd: " + fd);
public void canWriteFile() {
//Todo add read operation test
final long eventId = 1;
ByteBufAllocator allocator = new UnpooledByteBufAllocator(true);
UnpooledUnsafeDirectByteBuf directByteBufPooled = new UnpooledUnsafeDirectByteBuf(allocator, 500, 1000);
System.out.println("MemoryAddress: " + directByteBufPooled.hasMemoryAddress());
String inputString = "Hello World!";
byte[] byteArrray = inputString.getBytes();
directByteBufPooled.writeBytes(byteArrray);
Native.ioUringWrite(uring, fd, 1, directByteBufPooled.memoryAddress(), directByteBufPooled.readerIndex(),
directByteBufPooled.writerIndex());
int fd = (int) Native.createFile();
System.out.println("Filedescriptor: " + fd);
Native.ioUringSubmit(uring);
RingBuffer ringBuffer = Native.createRingBuffer(32);
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
long cqe = Native.ioUringWaitCqe(uring);
assertNotNull(ringBuffer);
assertNotNull(submissionQueue);
assertNotNull(completionQueue);
// ystem.out.println("Res: " + Native.ioUringGetRes(cqe));
assertEquals(12, Native.ioUringGetRes(cqe));
assertTrue(submissionQueue.add(eventId, EventType.WRITE, fd, directByteBufPooled.memoryAddress(),
directByteBufPooled.readerIndex(), directByteBufPooled.writerIndex()));
submissionQueue.submit();
Native.ioUringClose(uring);
IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
assertNotNull(ioUringCqe);
assertEquals(inputString.length(), ioUringCqe.getRes());
assertEquals(1, ioUringCqe.getEventId());
}
}