Correctly build up entry for POLL_REMOVE so we find the right operation

Motivation:

We did not correctly compute all fields when POLL_REMOVE entry was calculate. Which could lead to not finding the right operation.

Modifications:

- Correctly fill all fields
- Fix unit tests

Result:

Remove IO_POLL operations work again as expected
This commit is contained in:
Norman Maurer 2020-08-31 21:23:45 +02:00
parent e1a582d798
commit 8e5b5400e6
3 changed files with 52 additions and 58 deletions

View File

@ -54,10 +54,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
private final FileDescriptor eventfd;
private final IovecArrayPool iovecArrayPool;
private long prevDeadlineNanos = NONE;
private boolean pendingWakeup;
private IovecArrayPool iovecArrayPool;
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
@ -203,9 +203,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
break;
case IOUring.IO_POLL:
//Todo error handle the res
if (res == ECANCELED) {
logger.trace("POLL_LINK canceled");
logger.trace("IO_POLL cancelled");
break;
}
if (eventfd.intValue() == fd) {
@ -230,9 +229,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
case IOUring.OP_POLL_REMOVE:
if (res == ENOENT) {
System.out.println(("POLL_REMOVE OPERATION not permitted"));
logger.trace("POLL_REMOVE not successful");
} else if (res == 0) {
System.out.println(("POLL_REMOVE OPERATION successful"));
logger.trace("POLL_REMOVE successful");
}
break;

View File

@ -122,22 +122,25 @@ final class IOUringSubmissionQueue {
//user_data should be same as POLL_LINK fd
if (op == IOUring.OP_POLL_REMOVE) {
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
long uData = convertToUserData(op, fd, pollMask);
long uData = convertToUserData((byte) IOUring.IO_POLL, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
}
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0));
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else {
long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
//c union set Rw-Flags or accept_flags
if (op != IOUring.OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC);
}
}
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
// pad field array -> all fields should be zero
long offsetIndex = 0;
@ -146,10 +149,6 @@ final class IOUringSubmissionQueue {
offsetIndex += 8;
}
if (pollMask != 0) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
}
logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD));

View File

@ -20,6 +20,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
@ -187,7 +188,6 @@ public class NativeTest {
@Override
public void run() {
assertTrue(completionQueue.ioUringWaitCqe());
try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
@ -195,9 +195,6 @@ public class NativeTest {
return true;
}
}));
} catch (Exception e) {
e.printStackTrace();
}
}
};
waitingCqe.start();
@ -232,47 +229,46 @@ public class NativeTest {
FileDescriptor eventFd = Native.newEventFd();
submissionQueue.addPollIn(eventFd.intValue());
submissionQueue.submit();
Thread.sleep(10);
submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN);
submissionQueue.submit();
final AtomicReference<AssertionError> errorRef = new AtomicReference<AssertionError>();
Thread waitingCqe = new Thread() {
private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback =
new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
if (op == IOUring.IO_POLL) {
assertEquals(IOUringEventLoop.ECANCELED, res);
} else if (op == IOUring.OP_POLL_REMOVE) {
assertEquals(0, res);
} else {
fail("op " + op);
}
return false;
}
};
@Override
public void run() {
try {
assertTrue(completionQueue.ioUringWaitCqe());
try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(IOUringEventLoop.ECANCELED, res);
assertEquals(IOUring.IO_POLL, op);
return true;
}
}));
} catch (Exception e) {
e.printStackTrace();
}
try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(0, res);
assertEquals(IOUring.OP_POLL_REMOVE, op);
return true;
}
}));
} catch (Exception e) {
e.printStackTrace();
assertEquals(1, completionQueue.process(verifyCallback));
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(verifyCallback));
} catch (AssertionError error) {
errorRef.set(error);
}
}
};
waitingCqe.start();
waitingCqe.join();
try {
eventFd.close();
AssertionError error = errorRef.get();
if (error != null) {
throw error;
}
} finally {
ringBuffer.close();
}