Add Poll before the accept/read operation

Motivation:

The problem is that if io_uring accept/read non blocking doesnt return -EAGAIN for non-blocking sockets
in general, then it removes a way for the application to tell if
there's ever any data there.
There is a fix in Kernel 5.8 https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?h=v5.8&id=e697deed834de15d2322d0619d51893022c90ea2 which means we need to add poll before the accept/read event(poll<link>read/accept) to fix in netty as well

Modification:

-add poll before the accept/read event with this flag IOSQE_IO_LINK

Result:

netty prototype works on Kernel 5.8
This commit is contained in:
Josef Grieb 2020-08-10 09:53:38 +02:00
parent bc9ada411b
commit 71c33eaec3
5 changed files with 47 additions and 25 deletions

View File

@ -33,7 +33,6 @@ import io.netty.channel.unix.UnixChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.UnresolvedAddressException;
@ -200,7 +199,18 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override
public void run() {
uringEventExecution();
IOUringEventLoop eventLoop = (IOUringEventLoop) eventLoop();
long eventId = eventLoop.incrementEventIdCounter();
Event event = new Event();
event.setOp(EventType.POLL_LINK);
event.setId(eventId);
event.setAbstractIOUringChannel(AbstractIOUringChannel.this);
eventLoop.getRingBuffer().getIoUringSubmissionQueue()
.addPoll(eventId, socket.intValue(), event.getOp());
((IOUringEventLoop) eventLoop()).addNewEvent(event);
uringEventExecution(); //flush and submit SQE
}
};

View File

@ -20,8 +20,11 @@ enum EventType {
READ(22),
WRITE(23),
TIMEOUT(11),
POLL(6);
POLL_EVENTFD(6),
POLL_LINK(6);
private final int op;
EventType(int op) {
this.op = op;
}

View File

@ -59,10 +59,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
eventfd = Native.newEventFd();
long eventId = incrementEventIdCounter();
Event event = new Event();
event.setOp(EventType.POLL);
event.setOp(EventType.POLL_EVENTFD);
event.setId(eventId);
addNewEvent(event);
ringBuffer.getIoUringSubmissionQueue().addPoll(eventfd.intValue(), eventId);
ringBuffer.getIoUringSubmissionQueue().addPoll(eventId, eventfd.intValue(), event.getOp());
ringBuffer.getIoUringSubmissionQueue().submit();
}
@ -195,16 +195,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
pipeline.fireChannelReadComplete();
}
}
long eventId = incrementEventIdCounter();
event.setId(eventId);
submissionQueue
.add(eventId, EventType.ACCEPT, event.getAbstractIOUringChannel()
.getSocket().intValue(),
0,
0,
0);
addNewEvent(event);
submissionQueue.submit();
//Todo refactoring method name
event.getAbstractIOUringChannel().executeReadEvent();
break;
case READ:
System.out.println("EventLoop Read Res: " + res);
@ -260,14 +253,17 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
}
break;
case POLL:
case POLL_EVENTFD:
pendingWakeup = false;
//Todo eventId is already used
eventId = incrementEventIdCounter();
long eventId = incrementEventIdCounter();
event.setId(eventId);
event.setOp(EventType.POLL_EVENTFD);
addNewEvent(event);
submissionQueue.addPoll(eventfd.intValue(), eventId);
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
case POLL_LINK:
//Todo error handling error
System.out.println("POLL_LINK Res: " + res);
break;
}
this.events.remove(event.getId());

View File

@ -26,6 +26,7 @@ final class IOUringSubmissionQueue {
private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
private static final int POLLIN = 1;
private static final int IOSQE_IO_LINK = 4;
//these offsets are used to access specific properties
//SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21
@ -103,7 +104,6 @@ final class IOUringSubmissionQueue {
//Todo cleaner
//set sqe(submission queue) properties
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp());
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0);
PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset);
@ -111,6 +111,13 @@ final class IOUringSubmissionQueue {
PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId);
//poll<link>read or accept operation
if (type == EventType.READ || type == EventType.ACCEPT || type == EventType.POLL_LINK) {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK);
} else {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
}
//c union set Rw-Flags or accept_flags
if (type != EventType.ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
@ -143,12 +150,12 @@ final class IOUringSubmissionQueue {
return true;
}
public boolean addPoll(int fd, long eventId) {
public boolean addPoll(long eventId, int fd, EventType eventType) {
long sqe = getSqe();
if (sqe == 0) {
return false;
}
setData(sqe, eventId, EventType.POLL, fd, 0, 0, 0);
setData(sqe, eventId, eventType, fd, 0, 0, 0);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, POLLIN);
return true;
}
@ -266,6 +273,10 @@ final class IOUringSubmissionQueue {
return this.sqeHead;
}
public int getRingFd() {
return ringFd;
}
public long getSqeTail() {
return this.sqeTail;
}

View File

@ -102,8 +102,9 @@ public class NativeTest {
submissionQueue.submit();
}
//Todo clean
@Test
public void eventfdTest() throws InterruptedException {
public void eventfdTest() {
RingBuffer ringBuffer = Native.createRingBuffer(32);
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
@ -113,7 +114,7 @@ public class NativeTest {
assertNotNull(completionQueue);
final FileDescriptor eventFd = Native.newEventFd();
assertTrue(submissionQueue.addPoll(eventFd.intValue(), 1));
assertTrue(submissionQueue.addPoll(1, eventFd.intValue(), EventType.POLL_EVENTFD));
submissionQueue.submit();
new Thread() {
@ -128,6 +129,7 @@ public class NativeTest {
assertEquals(1, ioUringCqe.getEventId());
}
//Todo clean
//eventfd signal doesnt work when ioUringWaitCqe and eventFdWrite are executed in a thread
//created this test to reproduce this "weird" bug
@Test(timeout = 8000)
@ -151,7 +153,7 @@ public class NativeTest {
};
waitingCqe.start();
final FileDescriptor eventFd = Native.newEventFd();
assertTrue(submissionQueue.addPoll(eventFd.intValue(), 1));
assertTrue(submissionQueue.addPoll(1, eventFd.intValue(), EventType.POLL_EVENTFD));
submissionQueue.submit();
new Thread() {