Cleanup code (#10581)

Motivation:

Just some cleanup needed in general

Modifications:

- Make methods package-private when the class is package-private
- Use spaces and not tabs everywhere
- Fix eventfd_write usage as the implementation was only needed like
this for EPOLL when used with edge-triggered
- Correctly handle EINTR

Result:

Cleaner code
This commit is contained in:
Norman Maurer 2020-09-16 11:57:55 +02:00
parent d2219f089e
commit 7bc4521a99
10 changed files with 84 additions and 115 deletions

View File

@ -78,67 +78,61 @@ void io_uring_unmap_rings(struct io_uring_sq *sq, struct io_uring_cq *cq) {
}
}
static int io_uring_mmap(int fd, struct io_uring_params *p,
struct io_uring_sq *sq, struct io_uring_cq *cq)
{
size_t size;
int ret;
static int io_uring_mmap(int fd, struct io_uring_params *p, struct io_uring_sq *sq, struct io_uring_cq *cq) {
size_t size;
int ret;
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
sq->ring_sz = p->sq_off.array + p->sq_entries * sizeof(unsigned);
cq->ring_sz = p->cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe);
if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) {
if (cq->ring_sz > sq->ring_sz) {
sq->ring_sz = cq->ring_sz;
}
cq->ring_sz = sq->ring_sz;
}
sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
if (sq->ring_ptr == MAP_FAILED) {
return -errno;
if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) {
if (cq->ring_sz > sq->ring_sz) {
sq->ring_sz = cq->ring_sz;
}
cq->ring_sz = sq->ring_sz;
}
sq->ring_ptr = mmap(0, sq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQ_RING);
if (sq->ring_ptr == MAP_FAILED) {
return -errno;
}
if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) {
cq->ring_ptr = sq->ring_ptr;
} else {
cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
if (cq->ring_ptr == MAP_FAILED) {
cq->ring_ptr = NULL;
ret = -errno;
goto err;
}
}
if ((p->features & IORING_FEAT_SINGLE_MMAP) == 1) {
cq->ring_ptr = sq->ring_ptr;
} else {
cq->ring_ptr = mmap(0, cq->ring_sz, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_CQ_RING);
if (cq->ring_ptr == MAP_FAILED) {
cq->ring_ptr = NULL;
ret = -errno;
goto err;
}
}
sq->khead = sq->ring_ptr + p->sq_off.head;
sq->ktail = sq->ring_ptr + p->sq_off.tail;
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
sq->kflags = sq->ring_ptr + p->sq_off.flags;
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
sq->array = sq->ring_ptr + p->sq_off.array;
sq->khead = sq->ring_ptr + p->sq_off.head;
sq->ktail = sq->ring_ptr + p->sq_off.tail;
sq->kring_mask = sq->ring_ptr + p->sq_off.ring_mask;
sq->kring_entries = sq->ring_ptr + p->sq_off.ring_entries;
sq->kflags = sq->ring_ptr + p->sq_off.flags;
sq->kdropped = sq->ring_ptr + p->sq_off.dropped;
sq->array = sq->ring_ptr + p->sq_off.array;
size = p->sq_entries * sizeof(struct io_uring_sqe);
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, fd,
IORING_OFF_SQES);
if (sq->sqes == MAP_FAILED) {
ret = -errno;
goto err;
}
size = p->sq_entries * sizeof(struct io_uring_sqe);
sq->sqes = mmap(0, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, IORING_OFF_SQES);
if (sq->sqes == MAP_FAILED) {
ret = -errno;
goto err;
}
cq->khead = cq->ring_ptr + p->cq_off.head;
cq->ktail = cq->ring_ptr + p->cq_off.tail;
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
cq->khead = cq->ring_ptr + p->cq_off.head;
cq->ktail = cq->ring_ptr + p->cq_off.tail;
cq->kring_mask = cq->ring_ptr + p->cq_off.ring_mask;
cq->kring_entries = cq->ring_ptr + p->cq_off.ring_entries;
cq->koverflow = cq->ring_ptr + p->cq_off.overflow;
cq->cqes = cq->ring_ptr + p->cq_off.cqes;
return 0;
return 0;
err:
io_uring_unmap_rings(sq, cq);
return ret;
io_uring_unmap_rings(sq, cq);
return ret;
}
int setup_io_uring(int ring_fd, struct io_uring *io_uring_ring,
@ -155,7 +149,7 @@ static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
} while ((err = errno) == EINTR);
return -err;
}
@ -170,28 +164,15 @@ static jint netty_epoll_native_blocking_event_fd(JNIEnv* env, jclass clazz) {
}
static void netty_io_uring_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) {
uint64_t val;
for (;;) {
jint ret = eventfd_write(fd, (eventfd_t) value);
if (ret < 0) {
// We need to read before we can write again, let's try to read and then write again and if this
// fails we will bail out.
//
// See http://man7.org/linux/man-pages/man2/eventfd.2.html.
if (errno == EAGAIN) {
if (eventfd_read(fd, &val) == 0 || errno == EAGAIN) {
// Try again
continue;
}
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_read(...) failed: ", errno);
} else {
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write(...) failed: ", errno);
}
int result;
int err;
do {
result = eventfd_write(fd, (eventfd_t) value);
if (result >= 0) {
return;
}
break;
}
} while ((err = errno) == EINTR);
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write(...) failed: ", err);
}
static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass clazz,
@ -370,7 +351,6 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "ioringOpClose", "()I", (void *) netty_io_uring_ioringOpClose },
{ "ioringEnterGetevents", "()I", (void *) netty_io_uring_ioringEnterGetevents },
{ "iosqeAsync", "()I", (void *) netty_io_uring_iosqeAsync }
};
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
@ -378,10 +358,10 @@ static const JNINativeMethod method_table[] = {
{"ioUringSetup", "(I)[[J", (void *) netty_io_uring_setup},
{"ioUringExit", "(JIJIJII)V", (void *) netty_io_uring_ring_buffer_exit},
{"createFile", "()I", (void *) netty_create_file},
{"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter},
{"ioUringEnter", "(IIII)I", (void *) netty_io_uring_enter},
{"blockingEventFd", "()I", (void *) netty_epoll_native_blocking_event_fd},
{"eventFdWrite", "(IJ)V", (void *) netty_io_uring_eventFdWrite }
};
};
static const jint method_table_size =
sizeof(method_table) / sizeof(method_table[0]);
// JNI Method Registration Table End
@ -432,7 +412,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) {
if (netty_unix_util_register_natives(env, packagePrefix,
"io/netty/channel/uring/Native",
method_table, method_table_size) != 0) {
printf("netty register natives error\n");
goto done;
}
// Load all c modules that we depend upon

View File

@ -29,7 +29,6 @@ int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
_NSIG / 8);
}
int sys_io_uring_register(int fd, unsigned opcode, const void *arg,
unsigned nr_args) {
return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
int sys_io_uring_register(int fd, unsigned opcode, const void *arg, unsigned nr_args) {
return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
}

View File

@ -24,6 +24,5 @@
extern int sys_io_uring_setup(unsigned entries, struct io_uring_params *p);
extern int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
unsigned flags, sigset_t *sig);
extern int sys_io_uring_register(int fd, unsigned int opcode, const void *arg,
unsigned int nr_args);
extern int sys_io_uring_register(int fd, unsigned int opcode, const void *arg, unsigned int nr_args);
#endif

View File

@ -18,7 +18,7 @@ package io.netty.channel.uring;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
final class IOUring {
public final class IOUring {
private static final Throwable UNAVAILABILITY_CAUSE;

View File

@ -63,7 +63,7 @@ final class IOUringCompletionQueue {
return ringHead != PlatformDependent.getIntVolatile(kTailAddress);
}
public int process(IOUringCompletionQueueCallback callback) {
int process(IOUringCompletionQueueCallback callback) {
int tail = PlatformDependent.getIntVolatile(kTailAddress);
int i = 0;
while (ringHead != tail) {
@ -92,7 +92,7 @@ final class IOUringCompletionQueue {
void handle(int fd, int res, int flags, int op, int mask);
}
public boolean ioUringWaitCqe() {
boolean ioUringWaitCqe() {
//IORING_ENTER_GETEVENTS -> wait until an event is completely processed
int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
if (ret < 0) {
@ -104,9 +104,4 @@ final class IOUringCompletionQueue {
//Todo throw Exception!
return false;
}
//Todo Integer.toUnsignedLong -> maven checkstyle error
public static long toUnsignedLong(int x) {
return ((long) x) & 0xffffffffL;
}
}

View File

@ -97,14 +97,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
public void add(AbstractIOUringChannel ch) {
void add(AbstractIOUringChannel ch) {
logger.trace("Add Channel: {} ", ch.socket.intValue());
int fd = ch.socket.intValue();
channels.put(fd, ch);
}
public void remove(AbstractIOUringChannel ch) {
void remove(AbstractIOUringChannel ch) {
logger.trace("Remove Channel: {}", ch.socket.intValue());
int fd = ch.socket.intValue();
@ -298,7 +298,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
PlatformDependent.freeMemory(eventfdReadBuf);
}
public RingBuffer getRingBuffer() {
RingBuffer getRingBuffer() {
return ringBuffer;
}
@ -310,7 +310,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
}
}
public IovArray iovArray() {
IovArray iovArray() {
IovArray iovArray = iovArrays.next();
if (iovArray == null) {
ringBuffer.ioUringSubmissionQueue().submit();

View File

@ -135,20 +135,20 @@ final class IOUringSubmissionQueue {
logger.trace("Offset: {}", offset);
}
public boolean addTimeout(long nanoSeconds) {
boolean addTimeout(long nanoSeconds) {
setTimeout(nanoSeconds);
return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0);
}
public boolean addPollIn(int fd) {
boolean addPollIn(int fd) {
return addPoll(fd, Native.POLLIN);
}
public boolean addPollRdHup(int fd) {
boolean addPollRdHup(int fd) {
return addPoll(fd, Native.POLLRDHUP);
}
public boolean addPollOut(int fd) {
boolean addPollOut(int fd) {
return addPoll(fd, Native.POLLOUT);
}
@ -157,43 +157,43 @@ final class IOUringSubmissionQueue {
}
//return true -> submit() was called
public boolean addRead(int fd, long bufferAddress, int pos, int limit) {
boolean addRead(int fd, long bufferAddress, int pos, int limit) {
return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0);
}
public boolean addWrite(int fd, long bufferAddress, int pos, int limit) {
boolean addWrite(int fd, long bufferAddress, int pos, int limit) {
return enqueueSqe(Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0);
}
public boolean addAccept(int fd, long address, long addressLength) {
boolean addAccept(int fd, long address, long addressLength) {
return enqueueSqe(Native.IORING_OP_ACCEPT, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC, fd,
address, 0, addressLength);
}
//fill the address which is associated with server poll link user_data
public boolean addPollRemove(int fd, int pollMask) {
boolean addPollRemove(int fd, int pollMask) {
return enqueueSqe(Native.IORING_OP_POLL_REMOVE, 0, fd,
convertToUserData(Native.IORING_OP_POLL_ADD, fd, pollMask), 0, 0);
}
public boolean addConnect(int fd, long socketAddress, long socketAddressLength) {
boolean addConnect(int fd, long socketAddress, long socketAddressLength) {
return enqueueSqe(Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength);
}
public boolean addWritev(int fd, long iovecArrayAddress, int length) {
boolean addWritev(int fd, long iovecArrayAddress, int length) {
return enqueueSqe(Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0);
}
public boolean addClose(int fd) {
boolean addClose(int fd) {
return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0);
}
public int submit() {
int submit() {
int submit = tail - head;
return submit > 0 ? submit(submit, 0, 0) : 0;
}
public int submitAndWait() {
int submitAndWait() {
int submit = tail - head;
if (submit > 0) {
return submit(submit, 1, Native.IORING_ENTER_GETEVENTS);
@ -223,7 +223,6 @@ final class IOUringSubmissionQueue {
private void setTimeout(long timeoutNanoSeconds) {
long seconds, nanoSeconds;
//Todo
if (timeoutNanoSeconds == 0) {
seconds = 0;
nanoSeconds = 0;

View File

@ -49,6 +49,7 @@ final class LinuxSocket extends Socket {
return ipv6 ? InternetProtocolFamily.IPv6 : InternetProtocolFamily.IPv4;
}
@Override
public boolean markClosed() {
return super.markClosed();
}
@ -71,7 +72,7 @@ final class LinuxSocket extends Socket {
setInterface(intValue(), ipv6, nativeAddress.address(), nativeAddress.scopeId(), interfaceIndex(netInterface));
}
public int initAddress(byte[] address, int scopeId, int port, long addressMemory) {
int initAddress(byte[] address, int scopeId, int port, long addressMemory) {
return initAddress(intValue(), ipv6, address, scopeId, port, addressMemory);
}

View File

@ -149,10 +149,6 @@ final class Native {
// for testing(it is only temporary)
public static native int createFile();
public static Socket newSocketStream() {
return Socket.newSocketStream();
}
private Native() {
// utility
}

View File

@ -25,7 +25,7 @@ final class RingBuffer {
this.ioUringCompletionQueue = ioUringCompletionQueue;
}
IOUringSubmissionQueue ioUringSubmissionQueue() {
IOUringSubmissionQueue ioUringSubmissionQueue() {
return this.ioUringSubmissionQueue;
}