Obtain static fields via JNI and not duplicate their values

Motivation:

To ensure we use the correct values when passing values from Java to C and the other way around it is better to use JNI to lookup the values.

Modifications:

Add NativeStaticallyRefererencedJniMethods and use it (just as we do in kqueue / epoll)

Results:

More robust code
This commit is contained in:
Norman Maurer 2020-09-02 14:14:29 +02:00
parent 44f2cba67a
commit 74fd0c1375
8 changed files with 226 additions and 115 deletions

View File

@ -57,6 +57,7 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/eventfd.h>
#include <poll.h>
static jmethodID ringBufferMethodId = NULL;
static jmethodID ioUringSubmissionQueueMethodId = NULL;
@ -362,7 +363,76 @@ static jint netty_create_file(JNIEnv *env, jclass class) {
return open("io-uring-test.txt", O_RDWR | O_TRUNC | O_CREAT, 0644);
}
static jint netty_io_uring_sockNonblock(JNIEnv* env, jclass clazz) {
return SOCK_NONBLOCK;
}
static jint netty_io_uring_sockCloexec(JNIEnv* env, jclass clazz) {
return SOCK_CLOEXEC;
}
static jint netty_io_uring_pollin(JNIEnv* env, jclass clazz) {
return POLLIN;
}
static jint netty_io_uring_pollout(JNIEnv* env, jclass clazz) {
return POLLOUT;
}
static jint netty_io_uring_pollrdhup(JNIEnv* env, jclass clazz) {
return POLLRDHUP;
}
static jint netty_io_uring_ioringOpWritev(JNIEnv* env, jclass clazz) {
return IORING_OP_WRITEV;
}
static jint netty_io_uring_ioringOpPollAdd(JNIEnv* env, jclass clazz) {
return IORING_OP_POLL_ADD;
}
static jint netty_io_uring_ioringOpPollRemove(JNIEnv* env, jclass clazz) {
return IORING_OP_POLL_REMOVE;
}
static jint netty_io_uring_ioringOpTimeout(JNIEnv* env, jclass clazz) {
return IORING_OP_TIMEOUT;
}
static jint netty_io_uring_ioringOpAccept(JNIEnv* env, jclass clazz) {
return IORING_OP_ACCEPT;
}
static jint netty_io_uring_ioringOpRead(JNIEnv* env, jclass clazz) {
return IORING_OP_READ;
}
static jint netty_io_uring_ioringOpWrite(JNIEnv* env, jclass clazz) {
return IORING_OP_WRITE;
}
static jint netty_io_uring_ioringOpConnect(JNIEnv* env, jclass clazz) {
return IORING_OP_CONNECT;
}
// JNI Method Registration Table Begin
static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "sockNonblock", "()I", (void *) netty_io_uring_sockNonblock },
{ "sockCloexec", "()I", (void *) netty_io_uring_sockCloexec },
{ "pollin", "()I", (void *) netty_io_uring_pollin },
{ "pollout", "()I", (void *) netty_io_uring_pollout },
{ "pollrdhup", "()I", (void *) netty_io_uring_pollrdhup },
{ "ioringOpWritev", "()I", (void *) netty_io_uring_ioringOpWritev },
{ "ioringOpPollAdd", "()I", (void *) netty_io_uring_ioringOpPollAdd },
{ "ioringOpPollRemove", "()I", (void *) netty_io_uring_ioringOpPollRemove },
{ "ioringOpTimeout", "()I", (void *) netty_io_uring_ioringOpTimeout },
{ "ioringOpAccept", "()I", (void *) netty_io_uring_ioringOpAccept },
{ "ioringOpRead", "()I", (void *) netty_io_uring_ioringOpRead },
{ "ioringOpWrite", "()I", (void *) netty_io_uring_ioringOpWrite },
{ "ioringOpConnect", "()I", (void *) netty_io_uring_ioringOpConnect }
};
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
static const JNINativeMethod method_table[] = {
{"ioUringSetup", "(I)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup},
{"ioUringExit", "(Lio/netty/channel/uring/RingBuffer;)V", (void *) netty_io_uring_ring_buffer_exit},
@ -413,6 +483,15 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) {
return JNI_ERR;
}
// We must register the statically referenced methods first!
if (netty_unix_util_register_natives(env,
packagePrefix,
"io/netty/channel/uring/NativeStaticallyReferencedJniMethods",
statically_referenced_fixed_method_table,
statically_referenced_fixed_method_table_size) != 0) {
goto done;
}
if (netty_unix_util_register_natives(env, packagePrefix,
"io/netty/channel/uring/Native",
method_table, method_table_size) != 0) {

View File

@ -218,14 +218,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
if (isRegistered()) {
IOUringSubmissionQueue submissionQueue = submissionQueue();
if ((ioState & POLL_IN_SCHEDULED) != 0) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN);
submissionQueue.addPollRemove(socket.intValue(), Native.POLLIN);
ioState &= ~POLL_IN_SCHEDULED;
}
if ((ioState & POLL_OUT_SCHEDULED) != 0) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT);
submissionQueue.addPollRemove(socket.intValue(), Native.POLLOUT);
ioState &= ~POLL_OUT_SCHEDULED;
}
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP);
submissionQueue.addPollRemove(socket.intValue(), Native.POLLRDHUP);
submissionQueue.submit();
}

View File

@ -20,18 +20,6 @@ import io.netty.util.internal.SystemPropertyUtil;
final class IOUring {
private static final Throwable UNAVAILABILITY_CAUSE;
static final int OP_WRITEV = 2;
static final int IO_POLL = 6;
static final int IO_TIMEOUT = 11;
static final int OP_ACCEPT = 13;
static final int OP_READ = 22;
static final int OP_WRITE = 23;
static final int OP_POLL_REMOVE = 7;
static final int OP_CONNECT = 16;
static final int POLLMASK_IN = 1;
static final int POLLMASK_OUT = 4;
static final int POLLMASK_RDHUP = 8192;
static {
Throwable cause = null;

View File

@ -17,6 +17,7 @@ package io.netty.channel.uring;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.unix.Errors;
import io.netty.channel.unix.FileDescriptor;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
@ -35,7 +36,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
//Todo set config ring buffer size
private static final int ringSize = 32;
private static final int ENOENT = -2;
private static final long ETIME = -62;
static final long ECANCELED = -125;
@ -168,83 +168,70 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
}
}
@Override
public boolean handle(int fd, int res, long flags, int op, int pollMask) {
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
switch (op) {
case IOUring.OP_ACCEPT:
// Fall-through
case IOUring.OP_READ:
AbstractIOUringChannel readChannel = channels.get(fd);
if (readChannel == null) {
break;
}
((AbstractIOUringChannel.AbstractUringUnsafe) readChannel.unsafe()).readComplete(res);
break;
case IOUring.OP_WRITEV:
// Fall-through
case IOUring.OP_WRITE:
AbstractIOUringChannel writeChannel = channels.get(fd);
if (writeChannel == null) {
break;
}
((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res);
break;
case IOUring.IO_TIMEOUT:
if (res == ETIME) {
prevDeadlineNanos = NONE;
}
break;
case IOUring.IO_POLL:
if (res == ECANCELED) {
logger.trace("IO_POLL cancelled");
break;
}
if (eventfd.intValue() == fd) {
pendingWakeup = false;
handleEventFd(submissionQueue);
} else {
AbstractIOUringChannel channel = channels.get(fd);
if (channel == null) {
break;
}
if ((pollMask & IOUring.POLLMASK_OUT) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollOut(res);
}
if ((pollMask & IOUring.POLLMASK_IN) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollIn(res);
}
if ((pollMask & IOUring.POLLMASK_RDHUP) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollRdHup(res);
}
}
break;
case IOUring.OP_POLL_REMOVE:
if (res == ENOENT) {
logger.trace("POLL_REMOVE not successful");
} else if (res == 0) {
logger.trace("POLL_REMOVE successful");
}
break;
case IOUring.OP_CONNECT:
AbstractIOUringChannel channel = channels.get(fd);
if (channel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).connectComplete(res);
}
break;
default:
break;
if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) {
handleRead(fd, res);
} else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) {
handleWrite(fd, res);
} else if (op == Native.IORING_OP_POLL_ADD) {
if (res == ECANCELED) {
logger.trace("IORING_POLL_ADD cancelled");
return true;
}
if (eventfd.intValue() == fd) {
pendingWakeup = false;
handleEventFd(submissionQueue);
} else {
handlePollAdd(fd, res, pollMask);
}
} else if (op == Native.IORING_OP_POLL_REMOVE) {
if (res == Errors.ERRNO_ENOENT_NEGATIVE) {
logger.trace("IORING_POLL_REMOVE not successful");
} else if (res == 0) {
logger.trace("IORING_POLL_REMOVE successful");
}
} else if (op == Native.IORING_OP_CONNECT) {
handleConnect(fd, res);
} else if (op == Native.IORING_OP_TIMEOUT) {
if (res == ETIME) {
prevDeadlineNanos = NONE;
}
}
return true;
}
private void handleRead(int fd, int res) {
AbstractIOUringChannel readChannel = channels.get(fd);
if (readChannel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) readChannel.unsafe()).readComplete(res);
}
}
private void handleWrite(int fd, int res) {
AbstractIOUringChannel writeChannel = channels.get(fd);
if (writeChannel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) writeChannel.unsafe()).writeComplete(res);
}
}
private void handlePollAdd(int fd, int res, int pollMask) {
AbstractIOUringChannel channel = channels.get(fd);
if (channel != null) {
if ((pollMask & Native.POLLOUT) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollOut(res);
}
if ((pollMask & Native.POLLIN) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollIn(res);
}
if ((pollMask & Native.POLLRDHUP) != 0) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).pollRdHup(res);
}
}
}
private void handleEventFd(IOUringSubmissionQueue submissionQueue) {
// We need to consume the data as otherwise we would see another event
// in the completionQueue without
@ -256,6 +243,13 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
submissionQueue.submit();
}
private void handleConnect(int fd, int res) {
AbstractIOUringChannel channel = channels.get(fd);
if (channel != null) {
((AbstractIOUringChannel.AbstractUringUnsafe) channel.unsafe()).connectComplete(res);
}
}
@Override
protected void cleanup() {
try {

View File

@ -32,8 +32,6 @@ final class IOUringSubmissionQueue {
private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
private static final int IOSQE_IO_LINK = 4;
//these offsets are used to access specific properties
//SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21
private static final int SQE_OP_CODE_FIELD = 0;
@ -68,9 +66,6 @@ final class IOUringSubmissionQueue {
private final long ringAddress;
private final int ringFd;
private static final int SOCK_NONBLOCK = 2048;
private static final int SOCK_CLOEXEC = 524288;
private final ByteBuffer timeoutMemory;
private final long timeoutMemoryAddress;
@ -123,9 +118,9 @@ final class IOUringSubmissionQueue {
PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
//user_data should be same as POLL_LINK fd
if (op == IOUring.OP_POLL_REMOVE) {
if (op == Native.IORING_OP_POLL_REMOVE) {
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
long uData = convertToUserData((byte) IOUring.IO_POLL, fd, pollMask);
long uData = convertToUserData((byte) Native.IORING_OP_POLL_ADD, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0));
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
@ -133,11 +128,11 @@ final class IOUringSubmissionQueue {
long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);
//c union set Rw-Flags or accept_flags
if (op != IOUring.OP_ACCEPT) {
if (op != Native.IORING_OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC);
}
}
@ -164,20 +159,20 @@ final class IOUringSubmissionQueue {
return false;
}
setTimeout(nanoSeconds);
setData(sqe, (byte) IOUring.IO_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0);
setData(sqe, (byte) Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0);
return true;
}
public boolean addPollIn(int fd) {
return addPoll(fd, IOUring.POLLMASK_IN);
return addPoll(fd, Native.POLLIN);
}
public boolean addPollRdHup(int fd) {
return addPoll(fd, IOUring.POLLMASK_RDHUP);
return addPoll(fd, Native.POLLRDHUP);
}
public boolean addPollOut(int fd) {
return addPoll(fd, IOUring.POLLMASK_OUT);
return addPoll(fd, Native.POLLOUT);
}
private boolean addPoll(int fd, int pollMask) {
@ -192,7 +187,7 @@ final class IOUringSubmissionQueue {
}
}
setData(sqe, (byte) IOUring.IO_POLL, pollMask, fd, 0, 0, 0);
setData(sqe, (byte) Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0);
return submitted;
}
@ -208,7 +203,7 @@ final class IOUringSubmissionQueue {
submitted = true;
}
}
setData(sqe, (byte) IOUring.OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0);
setData(sqe, (byte) Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0);
return submitted;
}
@ -223,7 +218,7 @@ final class IOUringSubmissionQueue {
submitted = true;
}
}
setData(sqe, (byte) IOUring.OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0);
setData(sqe, (byte) Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0);
return submitted;
}
@ -238,7 +233,7 @@ final class IOUringSubmissionQueue {
submitted = true;
}
}
setData(sqe, (byte) IOUring.OP_ACCEPT, 0, fd, 0, 0, 0);
setData(sqe, (byte) Native.IORING_OP_ACCEPT, 0, fd, 0, 0, 0);
return submitted;
}
@ -254,7 +249,7 @@ final class IOUringSubmissionQueue {
submitted = true;
}
}
setData(sqe, (byte) IOUring.OP_POLL_REMOVE, pollMask, fd, 0, 0, 0);
setData(sqe, (byte) Native.IORING_OP_POLL_REMOVE, pollMask, fd, 0, 0, 0);
return submitted;
}
@ -270,7 +265,7 @@ final class IOUringSubmissionQueue {
submitted = true;
}
}
setData(sqe, (byte) IOUring.OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength);
setData(sqe, (byte) Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength);
return submitted;
}
@ -286,7 +281,7 @@ final class IOUringSubmissionQueue {
submitted = true;
}
}
setData(sqe, (byte) IOUring.OP_WRITEV, 0, fd, iovecArrayAddress, length, 0);
setData(sqe, (byte) Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0);
return submitted;
}

View File

@ -15,9 +15,7 @@
*/
package io.netty.channel.uring;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.PeerCredentials;
import io.netty.channel.unix.Socket;
import io.netty.util.internal.NativeLibraryLoader;
import io.netty.util.internal.PlatformDependent;
@ -30,11 +28,9 @@ import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Locale;
import static io.netty.channel.unix.Socket.isIPv6Preferred;
final class Native {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32);
static {
@ -66,8 +62,20 @@ final class Native {
}
Socket.initialize();
}
static final int SOCK_NONBLOCK = NativeStaticallyReferencedJniMethods.sockNonblock();
static final int SOCK_CLOEXEC = NativeStaticallyReferencedJniMethods.sockCloexec();
static final int POLLIN = NativeStaticallyReferencedJniMethods.pollin();
static final int POLLOUT = NativeStaticallyReferencedJniMethods.pollout();
static final int POLLRDHUP = NativeStaticallyReferencedJniMethods.pollrdhup();
static final int IORING_OP_POLL_ADD = NativeStaticallyReferencedJniMethods.ioringOpPollAdd();
static final int IORING_OP_TIMEOUT = NativeStaticallyReferencedJniMethods.ioringOpTimeout();
static final int IORING_OP_ACCEPT = NativeStaticallyReferencedJniMethods.ioringOpAccept();
static final int IORING_OP_READ = NativeStaticallyReferencedJniMethods.ioringOpRead();
static final int IORING_OP_WRITE = NativeStaticallyReferencedJniMethods.ioringOpWrite();
static final int IORING_OP_POLL_REMOVE = NativeStaticallyReferencedJniMethods.ioringOpPollRemove();
static final int IORING_OP_CONNECT = NativeStaticallyReferencedJniMethods.ioringOpConnect();
static final int IORING_OP_WRITEV = NativeStaticallyReferencedJniMethods.ioringOpWritev();
public static RingBuffer createRingBuffer(int ringSize) {
//Todo throw Exception if it's null

View File

@ -0,0 +1,47 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
/**
* This class is necessary to break the following cyclic dependency:
* <ol>
* <li>JNI_OnLoad</li>
* <li>JNI Calls FindClass because RegisterNatives (used to register JNI methods) requires a class</li>
* <li>FindClass loads the class, but static members variables of that class attempt to call a JNI method which has not
* yet been registered.</li>
* <li>java.lang.UnsatisfiedLinkError is thrown because native method has not yet been registered.</li>
* </ol>
* Static members which call JNI methods must not be declared in this class!
*/
final class NativeStaticallyReferencedJniMethods {
private NativeStaticallyReferencedJniMethods() { }
static native int sockNonblock();
static native int sockCloexec();
static native int pollin();
static native int pollout();
static native int pollrdhup();
static native int ioringOpWritev();
static native int ioringOpPollAdd();
static native int ioringOpPollRemove();
static native int ioringOpTimeout();
static native int ioringOpAccept();
static native int ioringOpRead();
static native int ioringOpWrite();
static native int ioringOpConnect();
}

View File

@ -229,7 +229,7 @@ public class NativeTest {
FileDescriptor eventFd = Native.newEventFd();
submissionQueue.addPollIn(eventFd.intValue());
submissionQueue.submit();
submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN);
submissionQueue.addPollRemove(eventFd.intValue(), Native.POLLIN);
submissionQueue.submit();
final AtomicReference<AssertionError> errorRef = new AtomicReference<AssertionError>();
@ -238,9 +238,9 @@ public class NativeTest {
new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
if (op == IOUring.IO_POLL) {
if (op == Native.IORING_OP_POLL_ADD) {
assertEquals(IOUringEventLoop.ECANCELED, res);
} else if (op == IOUring.OP_POLL_REMOVE) {
} else if (op == Native.IORING_OP_POLL_REMOVE) {
assertEquals(0, res);
} else {
fail("op " + op);