Merge pull request #12 from normanmaurer/poll_remove_fix

Correctly build up entry for POLL_REMOVE so we find the right operation
This commit is contained in:
Josef Grieb 2020-09-01 01:22:19 +02:00 committed by GitHub
commit 57884e2e05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 58 deletions

View File

@ -54,10 +54,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
// other value T when EL is waiting with wakeup scheduled at time T // other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE); private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
private final FileDescriptor eventfd; private final FileDescriptor eventfd;
private final IovecArrayPool iovecArrayPool;
private long prevDeadlineNanos = NONE; private long prevDeadlineNanos = NONE;
private boolean pendingWakeup; private boolean pendingWakeup;
private IovecArrayPool iovecArrayPool;
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp); super(parent, executor, addTaskWakesUp);
@ -203,9 +203,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
break; break;
case IOUring.IO_POLL: case IOUring.IO_POLL:
//Todo error handle the res
if (res == ECANCELED) { if (res == ECANCELED) {
logger.trace("POLL_LINK canceled"); logger.trace("IO_POLL cancelled");
break; break;
} }
if (eventfd.intValue() == fd) { if (eventfd.intValue() == fd) {
@ -230,9 +229,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
case IOUring.OP_POLL_REMOVE: case IOUring.OP_POLL_REMOVE:
if (res == ENOENT) { if (res == ENOENT) {
System.out.println(("POLL_REMOVE OPERATION not permitted")); logger.trace("POLL_REMOVE not successful");
} else if (res == 0) { } else if (res == 0) {
System.out.println(("POLL_REMOVE OPERATION successful")); logger.trace("POLL_REMOVE successful");
} }
break; break;

View File

@ -122,22 +122,25 @@ final class IOUringSubmissionQueue {
//user_data should be same as POLL_LINK fd //user_data should be same as POLL_LINK fd
if (op == IOUring.OP_POLL_REMOVE) { if (op == IOUring.OP_POLL_REMOVE) {
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1); PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
long uData = convertToUserData(op, fd, pollMask); long uData = convertToUserData((byte) IOUring.IO_POLL, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0));
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else {
long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);
//c union set Rw-Flags or accept_flags
if (op != IOUring.OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC);
}
} }
long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
//c union set Rw-Flags or accept_flags
if (op != IOUring.OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC);
}
// pad field array -> all fields should be zero // pad field array -> all fields should be zero
long offsetIndex = 0; long offsetIndex = 0;
@ -146,10 +149,6 @@ final class IOUringSubmissionQueue {
offsetIndex += 8; offsetIndex += 8;
} }
if (pollMask != 0) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
}
logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD));

View File

@ -20,6 +20,7 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
@ -187,17 +188,13 @@ public class NativeTest {
@Override @Override
public void run() { public void run() {
assertTrue(completionQueue.ioUringWaitCqe()); assertTrue(completionQueue.ioUringWaitCqe());
try { assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { @Override
@Override public boolean handle(int fd, int res, long flags, int op, int mask) {
public boolean handle(int fd, int res, long flags, int op, int mask) { assertEquals(1, res);
assertEquals(1, res); return true;
return true; }
} }));
}));
} catch (Exception e) {
e.printStackTrace();
}
} }
}; };
waitingCqe.start(); waitingCqe.start();
@ -232,47 +229,46 @@ public class NativeTest {
FileDescriptor eventFd = Native.newEventFd(); FileDescriptor eventFd = Native.newEventFd();
submissionQueue.addPollIn(eventFd.intValue()); submissionQueue.addPollIn(eventFd.intValue());
submissionQueue.submit(); submissionQueue.submit();
Thread.sleep(10);
submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN); submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN);
submissionQueue.submit(); submissionQueue.submit();
final AtomicReference<AssertionError> errorRef = new AtomicReference<AssertionError>();
Thread waitingCqe = new Thread() { Thread waitingCqe = new Thread() {
private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback =
new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
if (op == IOUring.IO_POLL) {
assertEquals(IOUringEventLoop.ECANCELED, res);
} else if (op == IOUring.OP_POLL_REMOVE) {
assertEquals(0, res);
} else {
fail("op " + op);
}
return false;
}
};
@Override @Override
public void run() { public void run() {
assertTrue(completionQueue.ioUringWaitCqe());
try { try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { assertTrue(completionQueue.ioUringWaitCqe());
@Override assertEquals(1, completionQueue.process(verifyCallback));
public boolean handle(int fd, int res, long flags, int op, int mask) { assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(IOUringEventLoop.ECANCELED, res); assertEquals(1, completionQueue.process(verifyCallback));
assertEquals(IOUring.IO_POLL, op); } catch (AssertionError error) {
return true; errorRef.set(error);
}
}));
} catch (Exception e) {
e.printStackTrace();
}
try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(0, res);
assertEquals(IOUring.OP_POLL_REMOVE, op);
return true;
}
}));
} catch (Exception e) {
e.printStackTrace();
} }
} }
}; };
waitingCqe.start(); waitingCqe.start();
waitingCqe.join(); waitingCqe.join();
try { try {
eventFd.close(); eventFd.close();
AssertionError error = errorRef.get();
if (error != null) {
throw error;
}
} finally { } finally {
ringBuffer.close(); ringBuffer.close();
} }