From 7bc4521a99c17683fa5407e5b5edc61af13ab3d8 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 16 Sep 2020 11:57:55 +0200 Subject: [PATCH] Cleanup code (#10581) Motivation: Just some cleanup needed in general Modifications: - Make methods package-private when the class is package-private - Use spaces and not tabs everywhere - Fix eventfd_write usage as the implementation was only needed like this for EPOLL when used with edge-triggered - Correctly handle EINTR Result: Cleaner code --- .../src/main/c/netty_io_uring_native.c | 136 ++++++++---------- .../src/main/c/syscall.c | 5 +- .../src/main/c/syscall.h | 3 +- .../java/io/netty/channel/uring/IOUring.java | 2 +- .../channel/uring/IOUringCompletionQueue.java | 9 +- .../netty/channel/uring/IOUringEventLoop.java | 8 +- .../channel/uring/IOUringSubmissionQueue.java | 27 ++-- .../io/netty/channel/uring/LinuxSocket.java | 3 +- .../java/io/netty/channel/uring/Native.java | 4 - .../io/netty/channel/uring/RingBuffer.java | 2 +- 10 files changed, 84 insertions(+), 115 deletions(-) diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index b9be31b17b..86319e4722 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -78,67 +78,61 @@ void io_uring_unmap_rings(struct io_uring_sq *sq, struct io_uring_cq *cq) { } } -static int io_uring_mmap(int fd, struct io_uring_params *p, - struct io_uring_sq *sq, struct io_uring_cq *cq) -{ - size_t size; - int ret; +static int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq, struct io_uring_cq *cq) { + size_t size; + int ret; - sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned); - cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe); + sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned); + cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe); - if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) { - if (cq->ring_sz > sq->ring_sz) { - sq->ring_sz = cq->ring_sz; - } - cq->ring_sz = sq->ring_sz; - } - sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, - MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); - if (sq->ring_ptr == MAP_FAILED) { - return -errno; + if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) { + if (cq->ring_sz > sq->ring_sz) { + sq->ring_sz = cq->ring_sz; + } + cq->ring_sz = sq->ring_sz; + } + sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING); + if (sq->ring_ptr == MAP_FAILED) { + return -errno; } - if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) { - cq->ring_ptr = sq->ring_ptr; - } else { - cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, - MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); - if (cq->ring_ptr == MAP_FAILED) { - cq->ring_ptr = NULL; - ret = -errno; - goto err; - } - } + if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) { + cq->ring_ptr = sq->ring_ptr; + } else { + cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING); + if (cq->ring_ptr == MAP_FAILED) { + cq->ring_ptr = NULL; + ret = -errno; + goto err; + } + } - sq->khead = sq->ring_ptr + p->sq_off.head; - sq->ktail = sq->ring_ptr + p->sq_off.tail; - sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask; - sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries; - sq->kflags = sq->ring_ptr + p->sq_off.flags; - sq->kdropped = sq->ring_ptr + p->sq_off.dropped; - sq->array = sq->ring_ptr + p->sq_off.array; + sq->khead = sq->ring_ptr + p->sq_off.head; + sq->ktail = sq->ring_ptr + p->sq_off.tail; + sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask; + sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries; + sq->kflags = sq->ring_ptr + p->sq_off.flags; + sq->kdropped = sq->ring_ptr + p->sq_off.dropped; + sq->array = sq->ring_ptr + p->sq_off.array; - size = p->sq_entries * sizeof(struct io_uring_sqe); - sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, - MAP_SHARED | MAP_POPULATE, fd, - IORING_OFF_SQES); - if (sq->sqes == MAP_FAILED) { - ret = -errno; - goto err; - } + size = p->sq_entries * sizeof(struct io_uring_sqe); + sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES); + if (sq->sqes == MAP_FAILED) { + ret = -errno; + goto err; + } - cq->khead = cq->ring_ptr + p->cq_off.head; - cq->ktail = cq->ring_ptr + p->cq_off.tail; - cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask; - cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries; - cq->koverflow = cq->ring_ptr + p->cq_off.overflow; - cq->cqes = cq->ring_ptr + p->cq_off.cqes; + cq->khead = cq->ring_ptr + p->cq_off.head; + cq->ktail = cq->ring_ptr + p->cq_off.tail; + cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask; + cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries; + cq->koverflow = cq->ring_ptr + p->cq_off.overflow; + cq->cqes = cq->ring_ptr + p->cq_off.cqes; - return 0; + return 0; err: - io_uring_unmap_rings(sq, cq); - return ret; + io_uring_unmap_rings(sq, cq); + return ret; } int setup_io_uring(int ring_fd, struct io_uring *io_uring_ring, @@ -155,7 +149,7 @@ static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint if (result >= 0) { return result; } - } while((err = errno) == EINTR); + } while ((err = errno) == EINTR); return -err; } @@ -170,28 +164,15 @@ static jint netty_epoll_native_blocking_event_fd(JNIEnv* env, jclass clazz) { } static void netty_io_uring_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) { - uint64_t val; - - for (;;) { - jint ret = eventfd_write(fd, (eventfd_t) value); - - if (ret < 0) { - // We need to read before we can write again, let's try to read and then write again and if this - // fails we will bail out. - // - // See http://man7.org/linux/man-pages/man2/eventfd.2.html. - if (errno == EAGAIN) { - if (eventfd_read(fd, &val) == 0 || errno == EAGAIN) { - // Try again - continue; - } - netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_read(...) failed: ", errno); - } else { - netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write(...) failed: ", errno); - } + int result; + int err; + do { + result = eventfd_write(fd, (eventfd_t) value); + if (result >= 0) { + return; } - break; - } + } while ((err = errno) == EINTR); + netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write(...) failed: ", err); } static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass clazz, @@ -370,7 +351,6 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "ioringOpClose", "()I", (void *) netty_io_uring_ioringOpClose }, { "ioringEnterGetevents", "()I", (void *) netty_io_uring_ioringEnterGetevents }, { "iosqeAsync", "()I", (void *) netty_io_uring_iosqeAsync } - }; static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); @@ -378,10 +358,10 @@ static const JNINativeMethod method_table[] = { {"ioUringSetup", "(I)[[J", (void *) netty_io_uring_setup}, {"ioUringExit", "(JIJIJII)V", (void *) netty_io_uring_ring_buffer_exit}, {"createFile", "()I", (void *) netty_create_file}, - {"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter}, + {"ioUringEnter", "(IIII)I", (void *) netty_io_uring_enter}, {"blockingEventFd", "()I", (void *) netty_epoll_native_blocking_event_fd}, {"eventFdWrite", "(IJ)V", (void *) netty_io_uring_eventFdWrite } - }; +}; static const jint method_table_size = sizeof(method_table) / sizeof(method_table[0]); // JNI Method Registration Table End @@ -432,7 +412,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) { if (netty_unix_util_register_natives(env, packagePrefix, "io/netty/channel/uring/Native", method_table, method_table_size) != 0) { - printf("netty register natives error\n"); + goto done; } // Load all c modules that we depend upon diff --git a/transport-native-io_uring/src/main/c/syscall.c b/transport-native-io_uring/src/main/c/syscall.c index f6946f7228..62d2173ff8 100644 --- a/transport-native-io_uring/src/main/c/syscall.c +++ b/transport-native-io_uring/src/main/c/syscall.c @@ -29,7 +29,6 @@ int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete, _NSIG / 8); } -int sys_io_uring_register(int fd, unsigned opcode, const void *arg, - unsigned nr_args) { - return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args); +int sys_io_uring_register(int fd, unsigned opcode, const void *arg, unsigned nr_args) { + return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args); } diff --git a/transport-native-io_uring/src/main/c/syscall.h b/transport-native-io_uring/src/main/c/syscall.h index ec5cb3d62d..f3d875f60c 100644 --- a/transport-native-io_uring/src/main/c/syscall.h +++ b/transport-native-io_uring/src/main/c/syscall.h @@ -24,6 +24,5 @@ extern int sys_io_uring_setup(unsigned entries, struct io_uring_params *p); extern int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete, unsigned flags, sigset_t *sig); -extern int sys_io_uring_register(int fd, unsigned int opcode, const void *arg, - unsigned int nr_args); +extern int sys_io_uring_register(int fd, unsigned int opcode, const void *arg, unsigned int nr_args); #endif diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java index 042f55a9ee..86223c0fb3 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java @@ -18,7 +18,7 @@ package io.netty.channel.uring; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; -final class IOUring { +public final class IOUring { private static final Throwable UNAVAILABILITY_CAUSE; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index e14765a303..f29d91b9b2 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -63,7 +63,7 @@ final class IOUringCompletionQueue { return ringHead != PlatformDependent.getIntVolatile(kTailAddress); } - public int process(IOUringCompletionQueueCallback callback) { + int process(IOUringCompletionQueueCallback callback) { int tail = PlatformDependent.getIntVolatile(kTailAddress); int i = 0; while (ringHead != tail) { @@ -92,7 +92,7 @@ final class IOUringCompletionQueue { void handle(int fd, int res, int flags, int op, int mask); } - public boolean ioUringWaitCqe() { + boolean ioUringWaitCqe() { //IORING_ENTER_GETEVENTS -> wait until an event is completely processed int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS); if (ret < 0) { @@ -104,9 +104,4 @@ final class IOUringCompletionQueue { //Todo throw Exception! return false; } - - //Todo Integer.toUnsignedLong -> maven checkstyle error - public static long toUnsignedLong(int x) { - return ((long) x) & 0xffffffffL; - } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 459fd328d5..88704d0a24 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -97,14 +97,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements : PlatformDependent.newMpscQueue(maxPendingTasks); } - public void add(AbstractIOUringChannel ch) { + void add(AbstractIOUringChannel ch) { logger.trace("Add Channel: {} ", ch.socket.intValue()); int fd = ch.socket.intValue(); channels.put(fd, ch); } - public void remove(AbstractIOUringChannel ch) { + void remove(AbstractIOUringChannel ch) { logger.trace("Remove Channel: {}", ch.socket.intValue()); int fd = ch.socket.intValue(); @@ -298,7 +298,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements PlatformDependent.freeMemory(eventfdReadBuf); } - public RingBuffer getRingBuffer() { + RingBuffer getRingBuffer() { return ringBuffer; } @@ -310,7 +310,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } } - public IovArray iovArray() { + IovArray iovArray() { IovArray iovArray = iovArrays.next(); if (iovArray == null) { ringBuffer.ioUringSubmissionQueue().submit(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index a2d5be7001..ce024a9a8f 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -135,20 +135,20 @@ final class IOUringSubmissionQueue { logger.trace("Offset: {}", offset); } - public boolean addTimeout(long nanoSeconds) { + boolean addTimeout(long nanoSeconds) { setTimeout(nanoSeconds); return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0); } - public boolean addPollIn(int fd) { + boolean addPollIn(int fd) { return addPoll(fd, Native.POLLIN); } - public boolean addPollRdHup(int fd) { + boolean addPollRdHup(int fd) { return addPoll(fd, Native.POLLRDHUP); } - public boolean addPollOut(int fd) { + boolean addPollOut(int fd) { return addPoll(fd, Native.POLLOUT); } @@ -157,43 +157,43 @@ final class IOUringSubmissionQueue { } //return true -> submit() was called - public boolean addRead(int fd, long bufferAddress, int pos, int limit) { + boolean addRead(int fd, long bufferAddress, int pos, int limit) { return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0); } - public boolean addWrite(int fd, long bufferAddress, int pos, int limit) { + boolean addWrite(int fd, long bufferAddress, int pos, int limit) { return enqueueSqe(Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0); } - public boolean addAccept(int fd, long address, long addressLength) { + boolean addAccept(int fd, long address, long addressLength) { return enqueueSqe(Native.IORING_OP_ACCEPT, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC, fd, address, 0, addressLength); } //fill the address which is associated with server poll link user_data - public boolean addPollRemove(int fd, int pollMask) { + boolean addPollRemove(int fd, int pollMask) { return enqueueSqe(Native.IORING_OP_POLL_REMOVE, 0, fd, convertToUserData(Native.IORING_OP_POLL_ADD, fd, pollMask), 0, 0); } - public boolean addConnect(int fd, long socketAddress, long socketAddressLength) { + boolean addConnect(int fd, long socketAddress, long socketAddressLength) { return enqueueSqe(Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength); } - public boolean addWritev(int fd, long iovecArrayAddress, int length) { + boolean addWritev(int fd, long iovecArrayAddress, int length) { return enqueueSqe(Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0); } - public boolean addClose(int fd) { + boolean addClose(int fd) { return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0); } - public int submit() { + int submit() { int submit = tail - head; return submit > 0 ? submit(submit, 0, 0) : 0; } - public int submitAndWait() { + int submitAndWait() { int submit = tail - head; if (submit > 0) { return submit(submit, 1, Native.IORING_ENTER_GETEVENTS); @@ -223,7 +223,6 @@ final class IOUringSubmissionQueue { private void setTimeout(long timeoutNanoSeconds) { long seconds, nanoSeconds; - //Todo if (timeoutNanoSeconds == 0) { seconds = 0; nanoSeconds = 0; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java index a49de22436..0143cdb864 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/LinuxSocket.java @@ -49,6 +49,7 @@ final class LinuxSocket extends Socket { return ipv6 ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4; } + @Override public boolean markClosed() { return super.markClosed(); } @@ -71,7 +72,7 @@ final class LinuxSocket extends Socket { setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface)); } - public int initAddress(byte[] address, int scopeId, int port, long addressMemory) { + int initAddress(byte[] address, int scopeId, int port, long addressMemory) { return initAddress(intValue(), ipv6, address, scopeId, port, addressMemory); } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index 9c7bda4844..c4e3ce5070 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -149,10 +149,6 @@ final class Native { // for testing(it is only temporary) public static native int createFile(); - public static Socket newSocketStream() { - return Socket.newSocketStream(); - } - private Native() { // utility } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java index a933a69bee..67f5bfcfb2 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/RingBuffer.java @@ -25,7 +25,7 @@ final class RingBuffer { this.ioUringCompletionQueue = ioUringCompletionQueue; } - IOUringSubmissionQueue ioUringSubmissionQueue() { + IOUringSubmissionQueue ioUringSubmissionQueue() { return this.ioUringSubmissionQueue; }