Read Event test and documentation
Motivation: missing documentation and read event test Modifications: add documentation and read event test Result: better documentation and tests
This commit is contained in:
parent
692238f6da
commit
8a56cd1959
@ -19,6 +19,8 @@ import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
public class IOUringCompletionQueue {
|
||||
|
||||
//these offsets are used to access specific properties
|
||||
//CQE (https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L162)
|
||||
private final int CQE_USER_DATA_FIELD = 0;
|
||||
private final int CQE_RES_FIELD = 8;
|
||||
private final int CQE_FLAGS_FIELD = 12;
|
||||
@ -27,7 +29,7 @@ public class IOUringCompletionQueue {
|
||||
|
||||
private final int IORING_ENTER_GETEVENTS = 1;
|
||||
|
||||
// (k -> kernel)
|
||||
//these unsigned integer pointers(shared with the kernel) will be changed by the kernel
|
||||
private final long kHeadAddress;
|
||||
private final long kTailAddress;
|
||||
private final long kringMaskAddress;
|
||||
@ -83,6 +85,7 @@ public class IOUringCompletionQueue {
|
||||
return ioUringCqe;
|
||||
}
|
||||
|
||||
//IORING_ENTER_GETEVENTS -> wait until an event is completely processed
|
||||
int ret = Native.ioUringEnter(ringFd, 0, 1, IORING_ENTER_GETEVENTS);
|
||||
if (ret < 0) {
|
||||
//Todo throw exception!
|
||||
@ -130,7 +133,6 @@ public class IOUringCompletionQueue {
|
||||
return this.ringAddress;
|
||||
}
|
||||
|
||||
|
||||
//Todo Integer.toUnsignedLong -> maven checkstyle error
|
||||
public static long toUnsignedLong(int x) {
|
||||
return ((long) x) & 0xffffffffL;
|
||||
|
@ -27,7 +27,6 @@ import java.util.concurrent.Executor;
|
||||
|
||||
class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
|
||||
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
|
||||
// events should be unique to identify which event type that was
|
||||
private long eventIdCounter;
|
||||
|
@ -22,6 +22,8 @@ public class IOUringSubmissionQueue {
|
||||
private final int SQE_SIZE = 64;
|
||||
private final int INT_SIZE = 4;
|
||||
|
||||
//these offsets are used to access specific properties
|
||||
//SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21
|
||||
private final int SQE_OP_CODE_FIELD = 0;
|
||||
private final int SQE_FLAGS_FIELD = 1;
|
||||
private final int SQE_IOPRIO_FIELD = 2; // u16
|
||||
@ -33,7 +35,7 @@ public class IOUringSubmissionQueue {
|
||||
private final int SQE_USER_DATA_FIELD = 32;
|
||||
private final int SQE_PAD_FIELD = 40;
|
||||
|
||||
// (k -> kernel)
|
||||
//these unsigned integer pointers(shared with the kernel) will be changed by the kernel
|
||||
private final long kHeadAddress;
|
||||
private final long kTailAddress;
|
||||
private final long kRingMaskAddress;
|
||||
@ -81,6 +83,7 @@ public class IOUringSubmissionQueue {
|
||||
|
||||
private void setData(long sqe, long eventId, EventType type, int fd, long bufferAddress, int length, long offset) {
|
||||
//Todo cleaner
|
||||
//set sqe(submission queue) properties
|
||||
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp());
|
||||
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
|
||||
PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0);
|
||||
|
@ -25,22 +25,21 @@ import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.buffer.UnpooledUnsafeDirectByteBuf;
|
||||
import static org.junit.Assert.*;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
|
||||
public class NativeTest {
|
||||
|
||||
@Test
|
||||
public void canWriteFile() {
|
||||
//Todo add read operation test
|
||||
final long eventId = 1;
|
||||
|
||||
ByteBufAllocator allocator = new UnpooledByteBufAllocator(true);
|
||||
UnpooledUnsafeDirectByteBuf directByteBufPooled = new UnpooledUnsafeDirectByteBuf(allocator, 500, 1000);
|
||||
ByteBuf writeEventByteBuf = allocator.directBuffer(100);
|
||||
String inputString = "Hello World!";
|
||||
byte[] byteArrray = inputString.getBytes();
|
||||
directByteBufPooled.writeBytes(byteArrray);
|
||||
writeEventByteBuf.writeBytes(byteArrray);
|
||||
|
||||
int fd = (int) Native.createFile();
|
||||
System.out.println("Filedescriptor: " + fd);
|
||||
|
||||
RingBuffer ringBuffer = Native.createRingBuffer(32);
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
@ -50,14 +49,30 @@ public class NativeTest {
|
||||
assertNotNull(submissionQueue);
|
||||
assertNotNull(completionQueue);
|
||||
|
||||
assertTrue(submissionQueue.add(eventId, EventType.WRITE, fd, directByteBufPooled.memoryAddress(),
|
||||
directByteBufPooled.readerIndex(), directByteBufPooled.writerIndex()));
|
||||
assertTrue(submissionQueue.add(eventId, EventType.WRITE, fd, writeEventByteBuf.memoryAddress(),
|
||||
writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex()));
|
||||
submissionQueue.submit();
|
||||
|
||||
IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
|
||||
|
||||
assertNotNull(ioUringCqe);
|
||||
assertEquals(inputString.length(), ioUringCqe.getRes());
|
||||
assertEquals(1, ioUringCqe.getEventId());
|
||||
writeEventByteBuf.release();
|
||||
|
||||
ByteBuf readEventByteBuf = allocator.directBuffer(100);
|
||||
assertTrue(submissionQueue.add(eventId + 1, EventType.READ, fd, readEventByteBuf.memoryAddress(),
|
||||
readEventByteBuf.writerIndex(), readEventByteBuf.capacity()));
|
||||
submissionQueue.submit();
|
||||
|
||||
ioUringCqe = completionQueue.ioUringWaitCqe();
|
||||
assertEquals(2, ioUringCqe.getEventId());
|
||||
assertEquals(inputString.length(), ioUringCqe.getRes());
|
||||
|
||||
readEventByteBuf.writerIndex(ioUringCqe.getRes());
|
||||
byte[] dataRead = new byte[inputString.length()];
|
||||
readEventByteBuf.readBytes(dataRead);
|
||||
|
||||
assertEquals(inputString, new String(dataRead));
|
||||
readEventByteBuf.release();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user