Add wakeup and timeout
Motivation: wake up the blocking call io_uring which is called when a new task is added(not from the eventloop thread) Modification: -Added timeout operation for scheduled task(not tested yet) -Added Poll operation -Added two tests to reproduce the polling signal issue Result: io_uring_enter doesnt get any polling signal from eventFdWrite if both functions are executed in different threads
This commit is contained in:
parent
d3c28143a8
commit
bc9ada411b
@ -55,6 +55,7 @@
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/eventfd.h>
|
||||
|
||||
static jmethodID ringBufferMethodId = NULL;
|
||||
static jmethodID ioUringSubmissionQueueMethodId = NULL;
|
||||
@ -152,6 +153,63 @@ static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint
|
||||
return sys_io_uring_enter(ring_fd, to_submit, min_complete, flags, NULL);
|
||||
}
|
||||
|
||||
static jint netty_epoll_native_eventFd(JNIEnv* env, jclass clazz) {
|
||||
jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
|
||||
|
||||
if (eventFD < 0) {
|
||||
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", errno);
|
||||
}
|
||||
return eventFD;
|
||||
}
|
||||
|
||||
static jint netty_io_uring_register_event_fd(JNIEnv *env, jclass class1, jint ring_fd, jint event_fd) {
|
||||
int ret;
|
||||
ret = sys_io_uring_register(ring_fd, IORING_REGISTER_EVENTFD,
|
||||
&event_fd, 1);
|
||||
if (ret < 0) {
|
||||
return -errno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static jint netty_io_uring_unregister_event_fd(JNIEnv *env, jclass class1, jint ring_fd) {
|
||||
int ret;
|
||||
|
||||
ret = sys_io_uring_register(ring_fd, IORING_UNREGISTER_EVENTFD,
|
||||
NULL, 0);
|
||||
if (ret < 0) {
|
||||
return -errno;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) {
|
||||
uint64_t val;
|
||||
|
||||
for (;;) {
|
||||
jint ret = eventfd_write(fd, (eventfd_t) value);
|
||||
|
||||
if (ret < 0) {
|
||||
// We need to read before we can write again, let's try to read and then write again and if this
|
||||
// fails we will bail out.
|
||||
//
|
||||
// See http://man7.org/linux/man-pages/man2/eventfd.2.html.
|
||||
if (errno == EAGAIN) {
|
||||
if (eventfd_read(fd, &val) == 0 || errno == EAGAIN) {
|
||||
// Try again
|
||||
continue;
|
||||
}
|
||||
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_read(...) failed: ", errno);
|
||||
} else {
|
||||
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd_write(...) failed: ", errno);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static int nettyBlockingSocket(int domain, int type, int protocol) {
|
||||
return socket(domain, type, protocol);
|
||||
}
|
||||
@ -209,9 +267,14 @@ static void netty_io_uring_native_JNI_OnUnLoad(JNIEnv *env) {
|
||||
|
||||
// JNI Method Registration Table Begin
|
||||
static const JNINativeMethod method_table[] = {
|
||||
{"ioUringSetup", "(I)Lio/netty/channel/uring/RingBuffer;", (void *)netty_io_uring_setup},
|
||||
{"createFile", "()I", (void *)netty_create_file},
|
||||
{"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter}};
|
||||
{"ioUringSetup", "(I)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup},
|
||||
{"createFile", "()I", (void *) netty_create_file},
|
||||
{"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter},
|
||||
{"eventFd", "()I", (void *) netty_epoll_native_eventFd},
|
||||
{ "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite },
|
||||
{"ioUringRegisterEventFd", "(II)I", (void *) netty_io_uring_register_event_fd},
|
||||
{"ioUringUnregisterEventFd", "(I)I", (void *) netty_io_uring_unregister_event_fd}
|
||||
};
|
||||
static const jint method_table_size =
|
||||
sizeof(method_table) / sizeof(method_table[0]);
|
||||
// JNI Method Registration Table End
|
||||
|
@ -28,3 +28,8 @@ int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
|
||||
return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags, sig,
|
||||
_NSIG / 8);
|
||||
}
|
||||
|
||||
int sys_io_uring_register(int fd, unsigned opcode, const void *arg,
|
||||
unsigned nr_args) {
|
||||
return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args);
|
||||
}
|
||||
|
@ -24,5 +24,6 @@
|
||||
extern int sys_io_uring_setup(unsigned entries, struct io_uring_params *p);
|
||||
extern int sys_io_uring_enter(int fd, unsigned to_submit, unsigned min_complete,
|
||||
unsigned flags, sigset_t *sig);
|
||||
|
||||
extern int sys_io_uring_register(int fd, unsigned int opcode, const void *arg,
|
||||
unsigned int nr_args);
|
||||
#endif
|
||||
|
@ -18,7 +18,9 @@ package io.netty.channel.uring;
|
||||
enum EventType {
|
||||
ACCEPT(13),
|
||||
READ(22),
|
||||
WRITE(23);
|
||||
WRITE(23),
|
||||
TIMEOUT(11),
|
||||
POLL(6);
|
||||
private final int op;
|
||||
EventType(int op) {
|
||||
this.op = op;
|
||||
|
@ -20,12 +20,13 @@ import io.netty.channel.ChannelOutboundBuffer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
import io.netty.util.collection.LongObjectHashMap;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static io.netty.channel.unix.Errors.*;
|
||||
|
||||
@ -35,11 +36,34 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
private long eventIdCounter;
|
||||
private final LongObjectHashMap<Event> events = new LongObjectHashMap<Event>();
|
||||
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
|
||||
private RingBuffer ringBuffer;
|
||||
private final RingBuffer ringBuffer;
|
||||
|
||||
private static final long AWAKE = -1L;
|
||||
private static final long NONE = Long.MAX_VALUE;
|
||||
private static long ETIME = -62;
|
||||
|
||||
// nextWakeupNanos is:
|
||||
// AWAKE when EL is awake
|
||||
// NONE when EL is waiting with no wakeup scheduled
|
||||
// other value T when EL is waiting with wakeup scheduled at time T
|
||||
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
|
||||
private final FileDescriptor eventfd;
|
||||
|
||||
private long prevDeadlineNanos = NONE;
|
||||
private boolean pendingWakeup;
|
||||
//private final FileDescriptor eventFd;
|
||||
|
||||
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
|
||||
super(parent, executor, addTaskWakesUp);
|
||||
ringBuffer = Native.createRingBuffer(32);
|
||||
eventfd = Native.newEventFd();
|
||||
long eventId = incrementEventIdCounter();
|
||||
Event event = new Event();
|
||||
event.setOp(EventType.POLL);
|
||||
event.setId(eventId);
|
||||
addNewEvent(event);
|
||||
ringBuffer.getIoUringSubmissionQueue().addPoll(eventfd.intValue(), eventId);
|
||||
ringBuffer.getIoUringSubmissionQueue().submit();
|
||||
}
|
||||
|
||||
public long incrementEventIdCounter() {
|
||||
@ -87,103 +111,45 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
@Override
|
||||
protected void run() {
|
||||
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
|
||||
final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
for (;;) {
|
||||
final IOUringCompletionQueue ioUringCompletionQueue = ringBuffer.getIoUringCompletionQueue();
|
||||
final IOUringCqe ioUringCqe = ioUringCompletionQueue.peek(); // or waiting
|
||||
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
|
||||
if (curDeadlineNanos == -1L) {
|
||||
curDeadlineNanos = NONE; // nothing on the calendar
|
||||
}
|
||||
nextWakeupNanos.set(curDeadlineNanos);
|
||||
long ioStartTime = 0;
|
||||
|
||||
if (ioUringCqe != null) {
|
||||
final Event event = events.get(ioUringCqe.getEventId());
|
||||
System.out.println("Completion EventId: " + ioUringCqe.getEventId());
|
||||
|
||||
if (event != null) {
|
||||
switch (event.getOp()) {
|
||||
case ACCEPT:
|
||||
System.out.println("EventLoop Accept Res: " + ioUringCqe.getRes());
|
||||
if (ioUringCqe.getRes() != -1 && ioUringCqe.getRes() != ERRNO_EAGAIN_NEGATIVE &&
|
||||
ioUringCqe.getRes() != ERRNO_EWOULDBLOCK_NEGATIVE) {
|
||||
AbstractIOUringServerChannel abstractIOUringServerChannel =
|
||||
(AbstractIOUringServerChannel) event.getAbstractIOUringChannel();
|
||||
System.out.println("EventLoop Fd: " + abstractIOUringServerChannel.getSocket().intValue());
|
||||
final IOUringRecvByteAllocatorHandle allocHandle =
|
||||
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
|
||||
.recvBufAllocHandle();
|
||||
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
|
||||
|
||||
allocHandle.lastBytesRead(ioUringCqe.getRes());
|
||||
if (allocHandle.lastBytesRead() > 0) {
|
||||
allocHandle.incMessagesRead(1);
|
||||
try {
|
||||
pipeline.fireChannelRead(abstractIOUringServerChannel
|
||||
.newChildChannel(allocHandle.lastBytesRead()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
allocHandle.readComplete();
|
||||
pipeline.fireChannelReadComplete();
|
||||
}
|
||||
}
|
||||
if (!hasTasks()) {
|
||||
try {
|
||||
if (curDeadlineNanos != prevDeadlineNanos) {
|
||||
prevDeadlineNanos = curDeadlineNanos;
|
||||
Event event = new Event();
|
||||
long eventId = incrementEventIdCounter();
|
||||
event.setId(eventId);
|
||||
ringBuffer.getIoUringSubmissionQueue()
|
||||
.add(eventId, EventType.ACCEPT, event.getAbstractIOUringChannel()
|
||||
.getSocket().intValue(),
|
||||
0,
|
||||
0,
|
||||
0);
|
||||
event.setOp(EventType.TIMEOUT);
|
||||
addNewEvent(event);
|
||||
ringBuffer.getIoUringSubmissionQueue().submit();
|
||||
break;
|
||||
case READ:
|
||||
System.out.println("EventLoop Read Res: " + ioUringCqe.getRes());
|
||||
System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().intValue());
|
||||
ByteBuf byteBuf = event.getReadBuffer();
|
||||
int localReadAmount = ioUringCqe.getRes();
|
||||
if (localReadAmount > 0) {
|
||||
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
|
||||
submissionQueue.addTimeout(curDeadlineNanos, eventId);
|
||||
}
|
||||
final IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
|
||||
if (ioUringCqe != null) {
|
||||
final Event event = events.get(ioUringCqe.getEventId());
|
||||
System.out.println("Completion EventId: " + ioUringCqe.getEventId());
|
||||
ioStartTime = System.nanoTime();
|
||||
if (event != null) {
|
||||
processEvent(ioUringCqe.getRes(), event);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
final IOUringRecvByteAllocatorHandle allocHandle =
|
||||
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
|
||||
.recvBufAllocHandle();
|
||||
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
|
||||
|
||||
allocHandle.lastBytesRead(localReadAmount);
|
||||
if (allocHandle.lastBytesRead() <= 0) {
|
||||
// nothing was read, release the buffer.
|
||||
byteBuf.release();
|
||||
byteBuf = null;
|
||||
break;
|
||||
}
|
||||
|
||||
allocHandle.incMessagesRead(1);
|
||||
//readPending = false;
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
byteBuf = null;
|
||||
allocHandle.readComplete();
|
||||
pipeline.fireChannelReadComplete();
|
||||
event.getAbstractIOUringChannel().executeReadEvent();
|
||||
break;
|
||||
case WRITE:
|
||||
System.out.println("EventLoop Write Res: " + ioUringCqe.getRes());
|
||||
System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().intValue());
|
||||
System.out.println("EventLoop Pipeline: " + event.getAbstractIOUringChannel().eventLoop());
|
||||
ChannelOutboundBuffer channelOutboundBuffer = event
|
||||
.getAbstractIOUringChannel().unsafe().outboundBuffer();
|
||||
//remove bytes
|
||||
int localFlushAmount = ioUringCqe.getRes();
|
||||
if (localFlushAmount > 0) {
|
||||
channelOutboundBuffer.removeBytes(localFlushAmount);
|
||||
}
|
||||
try {
|
||||
event.getAbstractIOUringChannel().doWrite(channelOutboundBuffer);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
break;
|
||||
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
|
||||
pendingWakeup = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
//run tasks
|
||||
|
||||
//Todo ioRatio?
|
||||
if (hasTasks()) {
|
||||
runAllTasks();
|
||||
}
|
||||
@ -198,15 +164,124 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
} catch (Throwable t) {
|
||||
System.out.println("Exception error " + t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void processEvent(final int res, final Event event) {
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
switch (event.getOp()) {
|
||||
case ACCEPT:
|
||||
System.out.println("EventLoop Accept Res: " + res);
|
||||
if (res != -1 && res != ERRNO_EAGAIN_NEGATIVE &&
|
||||
res != ERRNO_EWOULDBLOCK_NEGATIVE) {
|
||||
AbstractIOUringServerChannel abstractIOUringServerChannel =
|
||||
(AbstractIOUringServerChannel) event.getAbstractIOUringChannel();
|
||||
System.out.println("EventLoop Fd: " + abstractIOUringServerChannel.getSocket().intValue());
|
||||
final IOUringRecvByteAllocatorHandle allocHandle =
|
||||
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
|
||||
.recvBufAllocHandle();
|
||||
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
|
||||
|
||||
allocHandle.lastBytesRead(res);
|
||||
if (allocHandle.lastBytesRead() > 0) {
|
||||
allocHandle.incMessagesRead(1);
|
||||
try {
|
||||
pipeline.fireChannelRead(abstractIOUringServerChannel
|
||||
.newChildChannel(allocHandle.lastBytesRead()));
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
allocHandle.readComplete();
|
||||
pipeline.fireChannelReadComplete();
|
||||
}
|
||||
}
|
||||
long eventId = incrementEventIdCounter();
|
||||
event.setId(eventId);
|
||||
submissionQueue
|
||||
.add(eventId, EventType.ACCEPT, event.getAbstractIOUringChannel()
|
||||
.getSocket().intValue(),
|
||||
0,
|
||||
0,
|
||||
0);
|
||||
addNewEvent(event);
|
||||
submissionQueue.submit();
|
||||
break;
|
||||
case READ:
|
||||
System.out.println("EventLoop Read Res: " + res);
|
||||
System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().intValue());
|
||||
ByteBuf byteBuf = event.getReadBuffer();
|
||||
int localReadAmount = res;
|
||||
if (localReadAmount > 0) {
|
||||
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
|
||||
}
|
||||
|
||||
final IOUringRecvByteAllocatorHandle allocHandle =
|
||||
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
|
||||
.recvBufAllocHandle();
|
||||
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
|
||||
|
||||
allocHandle.lastBytesRead(localReadAmount);
|
||||
if (allocHandle.lastBytesRead() <= 0) {
|
||||
// nothing was read, release the buffer.
|
||||
byteBuf.release();
|
||||
byteBuf = null;
|
||||
break;
|
||||
}
|
||||
|
||||
allocHandle.incMessagesRead(1);
|
||||
//readPending = false;
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
byteBuf = null;
|
||||
allocHandle.readComplete();
|
||||
pipeline.fireChannelReadComplete();
|
||||
event.getAbstractIOUringChannel().executeReadEvent();
|
||||
break;
|
||||
case WRITE:
|
||||
System.out.println("EventLoop Write Res: " + res);
|
||||
System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().intValue());
|
||||
System.out.println("EventLoop Pipeline: " + event.getAbstractIOUringChannel().eventLoop());
|
||||
ChannelOutboundBuffer channelOutboundBuffer = event
|
||||
.getAbstractIOUringChannel().unsafe().outboundBuffer();
|
||||
//remove bytes
|
||||
int localFlushAmount = res;
|
||||
if (localFlushAmount > 0) {
|
||||
channelOutboundBuffer.removeBytes(localFlushAmount);
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
event.getAbstractIOUringChannel().doWrite(channelOutboundBuffer);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
break;
|
||||
case TIMEOUT:
|
||||
if (res == ETIME) {
|
||||
prevDeadlineNanos = NONE;
|
||||
}
|
||||
|
||||
break;
|
||||
case POLL:
|
||||
pendingWakeup = false;
|
||||
//Todo eventId is already used
|
||||
eventId = incrementEventIdCounter();
|
||||
event.setId(eventId);
|
||||
addNewEvent(event);
|
||||
submissionQueue.addPoll(eventfd.intValue(), eventId);
|
||||
|
||||
break;
|
||||
}
|
||||
this.events.remove(event.getId());
|
||||
}
|
||||
|
||||
public RingBuffer getRingBuffer() {
|
||||
return ringBuffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void wakeup(boolean inEventLoop) {
|
||||
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
|
||||
// write to the evfd which will then wake-up epoll_wait(...)
|
||||
Native.eventFdWrite(eventfd.intValue(), 1L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -15,12 +15,17 @@
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.unix.Buffer;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
final class IOUringSubmissionQueue {
|
||||
|
||||
private static final int SQE_SIZE = 64;
|
||||
private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
|
||||
private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
|
||||
private static final int POLLIN = 1;
|
||||
|
||||
//these offsets are used to access specific properties
|
||||
//SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21
|
||||
@ -35,6 +40,9 @@ final class IOUringSubmissionQueue {
|
||||
private static final int SQE_USER_DATA_FIELD = 32;
|
||||
private static final int SQE_PAD_FIELD = 40;
|
||||
|
||||
private static final int KERNEL_TIMESPEC_TV_SEC_FIELD = 0;
|
||||
private static final int KERNEL_TIMESPEC_TV_NSEC_FIELD = 8;
|
||||
|
||||
//these unsigned integer pointers(shared with the kernel) will be changed by the kernel
|
||||
private final long kHeadAddress;
|
||||
private final long kTailAddress;
|
||||
@ -56,6 +64,9 @@ final class IOUringSubmissionQueue {
|
||||
private static final int SOCK_NONBLOCK = 2048;
|
||||
private static final int SOCK_CLOEXEC = 524288;
|
||||
|
||||
private final ByteBuffer timeoutMemory;
|
||||
private final long timeoutMemoryAddress;
|
||||
|
||||
IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
|
||||
long fFlagsAdress, long kDroppedAddress, long arrayAddress,
|
||||
long submissionQueueArrayAddress, int ringSize,
|
||||
@ -71,6 +82,9 @@ final class IOUringSubmissionQueue {
|
||||
this.ringSize = ringSize;
|
||||
this.ringAddress = ringAddress;
|
||||
this.ringFd = ringFd;
|
||||
|
||||
timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE);
|
||||
timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
|
||||
}
|
||||
|
||||
public long getSqe() {
|
||||
@ -119,6 +133,26 @@ final class IOUringSubmissionQueue {
|
||||
System.out.println("Offset: " + PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD));
|
||||
}
|
||||
|
||||
public boolean addTimeout(long nanoSeconds, long eventId) {
|
||||
long sqe = getSqe();
|
||||
if (sqe == 0) {
|
||||
return false;
|
||||
}
|
||||
setTimeout(nanoSeconds);
|
||||
setData(sqe, eventId, EventType.TIMEOUT, -1, timeoutMemoryAddress, 1, 0);
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean addPoll(int fd, long eventId) {
|
||||
long sqe = getSqe();
|
||||
if (sqe == 0) {
|
||||
return false;
|
||||
}
|
||||
setData(sqe, eventId, EventType.POLL, fd, 0, 0, 0);
|
||||
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, POLLIN);
|
||||
return true;
|
||||
}
|
||||
|
||||
//Todo ring buffer errors for example if submission queue is full
|
||||
public boolean add(long eventId, EventType type, int fd, long bufferAddress, int pos, int limit) {
|
||||
long sqe = getSqe();
|
||||
@ -172,6 +206,22 @@ final class IOUringSubmissionQueue {
|
||||
}
|
||||
}
|
||||
|
||||
private void setTimeout(long timeoutNanoSeconds) {
|
||||
long seconds, nanoSeconds;
|
||||
|
||||
//Todo
|
||||
if (timeoutNanoSeconds == 0) {
|
||||
seconds = 0;
|
||||
nanoSeconds = 0;
|
||||
} else {
|
||||
seconds = timeoutNanoSeconds / 1000000000L;
|
||||
nanoSeconds = timeoutNanoSeconds % 1000;
|
||||
}
|
||||
|
||||
PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_SEC_FIELD, seconds);
|
||||
PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
|
||||
}
|
||||
|
||||
public void setSqeHead(long sqeHead) {
|
||||
this.sqeHead = sqeHead;
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.channel.unix.Socket;
|
||||
import io.netty.util.internal.NativeLibraryLoader;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
@ -44,6 +45,18 @@ final class Native {
|
||||
|
||||
public static native int ioUringEnter(int ringFd, int toSubmit, int minComplete, int flags);
|
||||
|
||||
public static native int ioUringRegisterEventFd(int ringFd, int eventFd);
|
||||
|
||||
public static native int ioUringUnregisterEventFd(int ringFd);
|
||||
|
||||
public static native void eventFdWrite(int fd, long value);
|
||||
|
||||
public static FileDescriptor newEventFd() {
|
||||
return new FileDescriptor(eventFd());
|
||||
}
|
||||
|
||||
private static native int eventFd();
|
||||
|
||||
// for testing(it is only temporary)
|
||||
public static native int createFile();
|
||||
|
||||
|
@ -15,12 +15,14 @@
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import org.junit.Test;
|
||||
import java.nio.charset.Charset;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import static org.junit.Assert.*;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import org.junit.experimental.theories.suppliers.TestedOn;
|
||||
|
||||
public class NativeTest {
|
||||
|
||||
@ -33,7 +35,7 @@ public class NativeTest {
|
||||
String inputString = "Hello World!";
|
||||
writeEventByteBuf.writeCharSequence(inputString, Charset.forName("UTF-8"));
|
||||
|
||||
int fd = (int) Native.createFile();
|
||||
int fd = Native.createFile();
|
||||
|
||||
RingBuffer ringBuffer = Native.createRingBuffer(32);
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
@ -69,4 +71,96 @@ public class NativeTest {
|
||||
assertArrayEquals(inputString.getBytes(), dataRead);
|
||||
readEventByteBuf.release();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void timeoutTest() {
|
||||
|
||||
RingBuffer ringBuffer = Native.createRingBuffer(32);
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
|
||||
|
||||
assertNotNull(ringBuffer);
|
||||
assertNotNull(submissionQueue);
|
||||
assertNotNull(completionQueue);
|
||||
|
||||
Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
final IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
|
||||
assertEquals(-62, ioUringCqe.getRes());
|
||||
assertEquals(1, ioUringCqe.getEventId());
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
try {
|
||||
Thread.sleep(80);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
submissionQueue.addTimeout(0, 1);
|
||||
submissionQueue.submit();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void eventfdTest() throws InterruptedException {
|
||||
RingBuffer ringBuffer = Native.createRingBuffer(32);
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
|
||||
|
||||
assertNotNull(ringBuffer);
|
||||
assertNotNull(submissionQueue);
|
||||
assertNotNull(completionQueue);
|
||||
|
||||
final FileDescriptor eventFd = Native.newEventFd();
|
||||
assertTrue(submissionQueue.addPoll(eventFd.intValue(), 1));
|
||||
submissionQueue.submit();
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
Native.eventFdWrite(eventFd.intValue(), 1L);
|
||||
}
|
||||
}.start();
|
||||
|
||||
IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, ioUringCqe.getRes());
|
||||
assertEquals(1, ioUringCqe.getEventId());
|
||||
}
|
||||
|
||||
//eventfd signal doesnt work when ioUringWaitCqe and eventFdWrite are executed in a thread
|
||||
//created this test to reproduce this "weird" bug
|
||||
@Test(timeout = 8000)
|
||||
public void eventfdNoSignal() throws InterruptedException {
|
||||
|
||||
RingBuffer ringBuffer = Native.createRingBuffer(32);
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
|
||||
|
||||
assertNotNull(ringBuffer);
|
||||
assertNotNull(submissionQueue);
|
||||
assertNotNull(completionQueue);
|
||||
|
||||
Thread waitingCqe = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, ioUringCqe.getRes());
|
||||
assertEquals(1, ioUringCqe.getEventId());
|
||||
}
|
||||
};
|
||||
waitingCqe.start();
|
||||
final FileDescriptor eventFd = Native.newEventFd();
|
||||
assertTrue(submissionQueue.addPoll(eventFd.intValue(), 1));
|
||||
submissionQueue.submit();
|
||||
|
||||
new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
Native.eventFdWrite(eventFd.intValue(), 1L);
|
||||
}
|
||||
}.start();
|
||||
|
||||
waitingCqe.join();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user