Fix bitmasking / bitshifting code to encode / decode user data (#10617)
Motivation: Our bitmasking / shifting did not correctly handle negative values. Modifications: - Change methods to allow passing user data - fix bitmasking / bitshifting code - Add unit test Result: Be able to pass in negative values as well
This commit is contained in:
parent
ca8c4538c1
commit
d266af2778
@ -242,7 +242,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
} finally {
|
||||
if (submissionQueue != null) {
|
||||
if (socket.markClosed()) {
|
||||
submissionQueue.addClose(fd().intValue());
|
||||
submissionQueue.addClose(fd().intValue(), (short) 0);
|
||||
}
|
||||
} else {
|
||||
// This one was never registered just use a syscall to close.
|
||||
@ -661,7 +661,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
|
||||
final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue();
|
||||
ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress,
|
||||
Native.SIZEOF_SOCKADDR_STORAGE);
|
||||
Native.SIZEOF_SOCKADDR_STORAGE, (short) 0);
|
||||
ioState |= CONNECT_SCHEDULED;
|
||||
} catch (Throwable t) {
|
||||
closeIfClosed();
|
||||
@ -726,13 +726,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
return;
|
||||
}
|
||||
if ((ioState & POLL_IN_SCHEDULED) != 0) {
|
||||
submissionQueue.addPollRemove(socket.intValue(), Native.POLLIN);
|
||||
submissionQueue.addPollRemove(socket.intValue(), Native.POLLIN, (short) 0);
|
||||
}
|
||||
if ((ioState & POLL_OUT_SCHEDULED) != 0) {
|
||||
submissionQueue.addPollRemove(socket.intValue(), Native.POLLOUT);
|
||||
submissionQueue.addPollRemove(socket.intValue(), Native.POLLOUT, (short) 0);
|
||||
}
|
||||
if ((ioState & POLL_RDHUP_SCHEDULED) != 0) {
|
||||
submissionQueue.addPollRemove(socket.intValue(), Native.POLLRDHUP);
|
||||
submissionQueue.addPollRemove(socket.intValue(), Native.POLLRDHUP, (short) 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
|
||||
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addAccept(fd().intValue(),
|
||||
acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress);
|
||||
acceptedAddressMemoryAddress, acceptedAddressLengthMemoryAddress, (short) 0);
|
||||
}
|
||||
|
||||
protected void readComplete0(int res) {
|
||||
|
@ -217,7 +217,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
int offset = iovecArray.count();
|
||||
in.forEachFlushedMessage(iovecArray);
|
||||
submissionQueue().addWritev(socket.intValue(),
|
||||
iovecArray.memoryAddress(offset), iovecArray.count() - offset);
|
||||
iovecArray.memoryAddress(offset), iovecArray.count() - offset, (short) 0);
|
||||
} catch (Exception e) {
|
||||
// This should never happen, anyway fallback to single write.
|
||||
scheduleWriteSingle(in.current());
|
||||
@ -229,7 +229,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
ByteBuf buf = (ByteBuf) msg;
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
|
||||
buf.writerIndex());
|
||||
buf.writerIndex(), (short) 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -243,7 +243,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
readBuffer = byteBuf;
|
||||
|
||||
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
|
||||
byteBuf.writerIndex(), byteBuf.capacity());
|
||||
byteBuf.writerIndex(), byteBuf.capacity(), (short) 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -17,6 +17,8 @@ package io.netty.channel.uring;
|
||||
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import static io.netty.channel.uring.UserData.decode;
|
||||
|
||||
/**
|
||||
* Completion queue implementation for io_uring.
|
||||
*/
|
||||
@ -83,24 +85,13 @@ final class IOUringCompletionQueue {
|
||||
ringHead++;
|
||||
PlatformDependent.putIntOrdered(kHeadAddress, ringHead);
|
||||
|
||||
int fd = (int) (udata >>> 32);
|
||||
int opMask = (int) (udata & 0xFFFFFFFFL);
|
||||
int op = opMask >>> 16;
|
||||
int data = opMask & 0xffff;
|
||||
|
||||
i++;
|
||||
callback.handle(fd, res, flags, op, data);
|
||||
|
||||
decode(res, flags, udata, callback);
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
interface IOUringCompletionQueueCallback {
|
||||
/**
|
||||
* Called for a completion event that was put into the {@link IOUringCompletionQueue}.
|
||||
*/
|
||||
void handle(int fd, int res, int flags, int op, int data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Block until there is at least one completion ready to be processed.
|
||||
*/
|
||||
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
interface IOUringCompletionQueueCallback {
|
||||
/**
|
||||
* Called for a completion event that was put into the {@link IOUringCompletionQueue}.
|
||||
*/
|
||||
void handle(int fd, int res, int flags, int op, short data);
|
||||
}
|
@ -465,7 +465,7 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
|
||||
|
||||
if (!recvMsg) {
|
||||
submissionQueue.addRead(socket.intValue(), bufferAddress,
|
||||
byteBuf.writerIndex(), byteBuf.capacity());
|
||||
byteBuf.writerIndex(), byteBuf.capacity(), (short) 0);
|
||||
} else {
|
||||
int addrLen = addrLen();
|
||||
long recvmsgBufferAddr = recvmsgBufferAddr();
|
||||
@ -474,7 +474,7 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
|
||||
|
||||
Iov.write(iovecAddress, bufferAddress + byteBuf.writerIndex(), byteBuf.writableBytes());
|
||||
MsgHdr.write(recvmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1);
|
||||
submissionQueue.addRecvmsg(socket.intValue(), recvmsgBufferAddr);
|
||||
submissionQueue.addRecvmsg(socket.intValue(), recvmsgBufferAddr, (short) 0);
|
||||
}
|
||||
}
|
||||
|
||||
@ -523,7 +523,7 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
if (remoteAddress == null) {
|
||||
submissionQueue.addWrite(socket.intValue(), bufferAddress, data.readerIndex(),
|
||||
data.writerIndex());
|
||||
data.writerIndex(), (short) 0);
|
||||
} else {
|
||||
int addrLen = addrLen();
|
||||
long sendmsgBufferAddr = sendmsgBufferAddr();
|
||||
@ -533,7 +533,7 @@ public final class IOUringDatagramChannel extends AbstractIOUringChannel impleme
|
||||
SockaddrIn.write(socket.isIpv6(), sockaddrAddress, remoteAddress);
|
||||
Iov.write(iovecAddress, bufferAddress + data.readerIndex(), data.readableBytes());
|
||||
MsgHdr.write(sendmsgBufferAddr, sockaddrAddress, addrLen, iovecAddress, 1);
|
||||
submissionQueue.addSendmsg(socket.intValue(), sendmsgBufferAddr);
|
||||
submissionQueue.addSendmsg(socket.intValue(), sendmsgBufferAddr, (short) 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,6 @@ import io.netty.channel.SingleThreadEventLoop;
|
||||
import io.netty.channel.unix.Errors;
|
||||
import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.channel.unix.IovArray;
|
||||
import io.netty.channel.uring.IOUringCompletionQueue.IOUringCompletionQueueCallback;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
import io.netty.util.concurrent.RejectedExecutionHandler;
|
||||
@ -157,7 +156,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
|
||||
if (!hasTasks()) {
|
||||
if (curDeadlineNanos != prevDeadlineNanos) {
|
||||
prevDeadlineNanos = curDeadlineNanos;
|
||||
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos));
|
||||
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), (short) 0);
|
||||
}
|
||||
|
||||
// Check there were any completion events to process
|
||||
@ -221,7 +220,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int data) {
|
||||
public void handle(int fd, int res, int flags, int op, short data) {
|
||||
if (op == Native.IORING_OP_READ && eventfd.intValue() == fd) {
|
||||
pendingWakeup = false;
|
||||
addEventFdRead(ringBuffer.ioUringSubmissionQueue());
|
||||
@ -281,7 +280,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
|
||||
}
|
||||
|
||||
private void addEventFdRead(IOUringSubmissionQueue submissionQueue) {
|
||||
submissionQueue.addRead(eventfd.intValue(), eventfdReadBuf, 0, 8);
|
||||
submissionQueue.addRead(eventfd.intValue(), eventfdReadBuf, 0, 8, (short) 0);
|
||||
}
|
||||
|
||||
private void handleConnect(AbstractIOUringChannel channel, int res) {
|
||||
@ -297,7 +296,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCom
|
||||
IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
|
||||
IOUringCompletionQueueCallback callback = new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int data) {
|
||||
public void handle(int fd, int res, int flags, int op, short data) {
|
||||
if (op == Native.IORING_OP_READ && eventfd.intValue() == fd) {
|
||||
pendingWakeup = false;
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import static io.netty.channel.uring.UserData.encode;
|
||||
import static java.lang.Math.max;
|
||||
import static java.lang.Math.min;
|
||||
|
||||
@ -99,7 +100,7 @@ final class IOUringSubmissionQueue {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean enqueueSqe(int op, int rwFlags, int fd, long bufferAddress, int length, long offset, int data) {
|
||||
private boolean enqueueSqe(int op, int rwFlags, int fd, long bufferAddress, int length, long offset, short data) {
|
||||
int pending = tail - head;
|
||||
boolean submit = pending == ringEntries;
|
||||
if (submit) {
|
||||
@ -114,7 +115,8 @@ final class IOUringSubmissionQueue {
|
||||
return submit;
|
||||
}
|
||||
|
||||
private void setData(long sqe, int op, int rwFlags, int fd, long bufferAddress, int length, long offset, int data) {
|
||||
private void setData(long sqe, int op, int rwFlags, int fd, long bufferAddress, int length,
|
||||
long offset, short data) {
|
||||
//set sqe(submission queue) properties
|
||||
|
||||
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) op);
|
||||
@ -126,7 +128,7 @@ final class IOUringSubmissionQueue {
|
||||
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress);
|
||||
PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
|
||||
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, rwFlags);
|
||||
long userData = convertToUserData(fd, op, data);
|
||||
long userData = encode(fd, op, data);
|
||||
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, userData);
|
||||
|
||||
logger.trace("UserDataField: {}", userData);
|
||||
@ -135,9 +137,9 @@ final class IOUringSubmissionQueue {
|
||||
logger.trace("Offset: {}", offset);
|
||||
}
|
||||
|
||||
boolean addTimeout(long nanoSeconds) {
|
||||
boolean addTimeout(long nanoSeconds, short extraData) {
|
||||
setTimeout(nanoSeconds);
|
||||
return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0, 0);
|
||||
return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0, extraData);
|
||||
}
|
||||
|
||||
boolean addPollIn(int fd) {
|
||||
@ -153,46 +155,46 @@ final class IOUringSubmissionQueue {
|
||||
}
|
||||
|
||||
private boolean addPoll(int fd, int pollMask) {
|
||||
return enqueueSqe(Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0, pollMask);
|
||||
return enqueueSqe(Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0, (short) pollMask);
|
||||
}
|
||||
|
||||
boolean addRecvmsg(int fd, long msgHdr) {
|
||||
return enqueueSqe(Native.IORING_OP_RECVMSG, 0, fd, msgHdr, 1, 0, 0);
|
||||
boolean addRecvmsg(int fd, long msgHdr, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_RECVMSG, 0, fd, msgHdr, 1, 0, extraData);
|
||||
}
|
||||
|
||||
boolean addSendmsg(int fd, long msgHdr) {
|
||||
return enqueueSqe(Native.IORING_OP_SENDMSG, 0, fd, msgHdr, 1, 0, 0);
|
||||
boolean addSendmsg(int fd, long msgHdr, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_SENDMSG, 0, fd, msgHdr, 1, 0, extraData);
|
||||
}
|
||||
|
||||
boolean addRead(int fd, long bufferAddress, int pos, int limit) {
|
||||
return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0, 0);
|
||||
boolean addRead(int fd, long bufferAddress, int pos, int limit, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0, extraData);
|
||||
}
|
||||
|
||||
boolean addWrite(int fd, long bufferAddress, int pos, int limit) {
|
||||
return enqueueSqe(Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0, 0);
|
||||
boolean addWrite(int fd, long bufferAddress, int pos, int limit, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0, extraData);
|
||||
}
|
||||
|
||||
boolean addAccept(int fd, long address, long addressLength) {
|
||||
boolean addAccept(int fd, long address, long addressLength, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_ACCEPT, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC, fd,
|
||||
address, 0, addressLength, 0);
|
||||
address, 0, addressLength, extraData);
|
||||
}
|
||||
|
||||
//fill the address which is associated with server poll link user_data
|
||||
boolean addPollRemove(int fd, int pollMask) {
|
||||
boolean addPollRemove(int fd, int pollMask, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_POLL_REMOVE, 0, fd,
|
||||
convertToUserData(fd, Native.IORING_OP_POLL_ADD, pollMask), 0, 0, 0);
|
||||
encode(fd, Native.IORING_OP_POLL_ADD, (short) pollMask), 0, 0, extraData);
|
||||
}
|
||||
|
||||
boolean addConnect(int fd, long socketAddress, long socketAddressLength) {
|
||||
return enqueueSqe(Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength, 0);
|
||||
boolean addConnect(int fd, long socketAddress, long socketAddressLength, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength, extraData);
|
||||
}
|
||||
|
||||
boolean addWritev(int fd, long iovecArrayAddress, int length) {
|
||||
return enqueueSqe(Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0, 0);
|
||||
boolean addWritev(int fd, long iovecArrayAddress, int length, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0, extraData);
|
||||
}
|
||||
|
||||
boolean addClose(int fd) {
|
||||
return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0, 0);
|
||||
boolean addClose(int fd, short extraData) {
|
||||
return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0, extraData);
|
||||
}
|
||||
|
||||
int submit() {
|
||||
@ -242,13 +244,6 @@ final class IOUringSubmissionQueue {
|
||||
PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
|
||||
}
|
||||
|
||||
private static long convertToUserData(int fd, int op, int data) {
|
||||
assert op <= Short.MAX_VALUE;
|
||||
assert data <= Short.MAX_VALUE;
|
||||
int opMask = op << 16 | (data & 0xFFFF);
|
||||
return (long) fd << 32 | opMask & 0xFFFFFFFFL;
|
||||
}
|
||||
|
||||
public long count() {
|
||||
return tail - head;
|
||||
}
|
||||
|
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
final class UserData {
|
||||
private UserData() {
|
||||
}
|
||||
|
||||
static long encode(int fd, int op, short data) {
|
||||
assert op <= Short.MAX_VALUE;
|
||||
return (long) data << 48 | (long) op << 32 | fd;
|
||||
}
|
||||
|
||||
static void decode(int res, int flags, long udata, IOUringCompletionQueueCallback callback) {
|
||||
int fd = (int) (udata & 0xFFFFFFFFL);
|
||||
int op = (short) ((udata >>>= 32) & 0xFFFFL);
|
||||
short data = (short) (udata >>> 16);
|
||||
callback.handle(fd, res, flags, op, data);
|
||||
}
|
||||
}
|
@ -46,7 +46,7 @@ public class IOUringSubmissionQueueTest {
|
||||
|
||||
long address = Buffer.memoryAddress(buffer);
|
||||
int counter = 0;
|
||||
while (!submissionQueue.addAccept(-1, address, 128)) {
|
||||
while (!submissionQueue.addAccept(-1, address, 128, (short) 0)) {
|
||||
counter++;
|
||||
}
|
||||
assertEquals(8, counter);
|
||||
|
@ -55,13 +55,13 @@ public class NativeTest {
|
||||
assertNotNull(completionQueue);
|
||||
|
||||
assertFalse(submissionQueue.addWrite(fd, writeEventByteBuf.memoryAddress(),
|
||||
writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex()));
|
||||
writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex(), (short) 0));
|
||||
submissionQueue.submit();
|
||||
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
public void handle(int fd, int res, int flags, int op, short mask) {
|
||||
assertEquals(inputString.length(), res);
|
||||
writeEventByteBuf.release();
|
||||
}
|
||||
@ -69,13 +69,13 @@ public class NativeTest {
|
||||
|
||||
final ByteBuf readEventByteBuf = allocator.directBuffer(100);
|
||||
assertFalse(submissionQueue.addRead(fd, readEventByteBuf.memoryAddress(),
|
||||
readEventByteBuf.writerIndex(), readEventByteBuf.capacity()));
|
||||
readEventByteBuf.writerIndex(), readEventByteBuf.capacity(), (short) 0));
|
||||
submissionQueue.submit();
|
||||
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
public void handle(int fd, int res, int flags, int op, short mask) {
|
||||
assertEquals(inputString.length(), res);
|
||||
readEventByteBuf.writerIndex(res);
|
||||
}
|
||||
@ -105,9 +105,9 @@ public class NativeTest {
|
||||
public void run() {
|
||||
completionQueue.ioUringWaitCqe();
|
||||
try {
|
||||
completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
completionQueue.process(new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
public void handle(int fd, int res, int flags, int op, short mask) {
|
||||
assertEquals(-62, res);
|
||||
}
|
||||
});
|
||||
@ -123,7 +123,7 @@ public class NativeTest {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
submissionQueue.addTimeout(0);
|
||||
submissionQueue.addTimeout(0, (short) 0);
|
||||
submissionQueue.submit();
|
||||
|
||||
thread.join();
|
||||
@ -153,9 +153,9 @@ public class NativeTest {
|
||||
}.start();
|
||||
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
public void handle(int fd, int res, int flags, int op, short mask) {
|
||||
assertEquals(1, res);
|
||||
}
|
||||
}));
|
||||
@ -184,9 +184,9 @@ public class NativeTest {
|
||||
@Override
|
||||
public void run() {
|
||||
completionQueue.ioUringWaitCqe();
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
public void handle(int fd, int res, int flags, int op, short mask) {
|
||||
assertEquals(1, res);
|
||||
}
|
||||
}));
|
||||
@ -229,15 +229,15 @@ public class NativeTest {
|
||||
FileDescriptor eventFd = Native.newBlockingEventFd();
|
||||
submissionQueue.addPollIn(eventFd.intValue());
|
||||
submissionQueue.submit();
|
||||
submissionQueue.addPollRemove(eventFd.intValue(), Native.POLLIN);
|
||||
submissionQueue.addPollRemove(eventFd.intValue(), Native.POLLIN, (short) 0);
|
||||
submissionQueue.submit();
|
||||
|
||||
final AtomicReference<AssertionError> errorRef = new AtomicReference<AssertionError>();
|
||||
Thread waitingCqe = new Thread() {
|
||||
private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback =
|
||||
new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
|
||||
private final IOUringCompletionQueueCallback verifyCallback =
|
||||
new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, int mask) {
|
||||
public void handle(int fd, int res, int flags, int op, short mask) {
|
||||
if (op == Native.IORING_OP_POLL_ADD) {
|
||||
assertEquals(Native.ERRNO_ECANCELED_NEGATIVE, res);
|
||||
} else if (op == Native.IORING_OP_POLL_REMOVE) {
|
||||
@ -270,4 +270,31 @@ public class NativeTest {
|
||||
ringBuffer.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserData() {
|
||||
RingBuffer ringBuffer = Native.createRingBuffer(32);
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.ioUringSubmissionQueue();
|
||||
final IOUringCompletionQueue completionQueue = ringBuffer.ioUringCompletionQueue();
|
||||
|
||||
try {
|
||||
// Ensure userdata works with negative and positive values
|
||||
for (int i = Short.MIN_VALUE; i <= Short.MAX_VALUE; i++) {
|
||||
submissionQueue.addWrite(-1, -1, -1, -1, (short) i);
|
||||
assertEquals(1, submissionQueue.submitAndWait());
|
||||
final int expectedData = i;
|
||||
assertEquals(1, completionQueue.process(new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int fd, int res, int flags, int op, short data) {
|
||||
assertEquals(-1, fd);
|
||||
assertTrue(res < 0);
|
||||
assertEquals(Native.IORING_OP_WRITE, op);
|
||||
assertEquals(expectedData, data);
|
||||
}
|
||||
}));
|
||||
}
|
||||
} finally {
|
||||
ringBuffer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class UserDataTest {
|
||||
@Test
|
||||
public void testUserData() {
|
||||
// Ensure userdata works with negative and positive values
|
||||
for (int fd : new int[] { 0, 1, 10, Short.MAX_VALUE, Integer.MAX_VALUE }) {
|
||||
for (int op = 0; op < 20; op++) {
|
||||
for (int data = Short.MIN_VALUE; data <= Short.MAX_VALUE; data++) {
|
||||
final int expectedFd = fd;
|
||||
final int expectedOp = op;
|
||||
final short expectedData = (short) data;
|
||||
long udata = UserData.encode(expectedFd, expectedOp, expectedData);
|
||||
UserData.decode(0, 0, udata, new IOUringCompletionQueueCallback() {
|
||||
@Override
|
||||
public void handle(int actualFd, int res, int flags, int actualOp, short actualData) {
|
||||
assertEquals(expectedFd, actualFd);
|
||||
assertEquals(expectedOp, actualOp);
|
||||
assertEquals(expectedData, actualData);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user