From 8a56cd1959fd31b51cf807a10048bd8e60155346 Mon Sep 17 00:00:00 2001 From: Josef Grieb Date: Fri, 10 Jul 2020 09:17:21 +0200 Subject: [PATCH] Read Event test and documentation Motivation: missing documentation and read event test Modifications: add documentation and read event test Result: better documentation and tests --- .../channel/uring/IOUringCompletionQueue.java | 6 ++-- .../netty/channel/uring/IOUringEventLoop.java | 1 - .../channel/uring/IOUringSubmissionQueue.java | 5 +++- .../io/netty/channel/uring/NativeTest.java | 29 ++++++++++++++----- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java index 79577f0b85..57d17a3acb 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringCompletionQueue.java @@ -19,6 +19,8 @@ import io.netty.util.internal.PlatformDependent; public class IOUringCompletionQueue { + //these offsets are used to access specific properties + //CQE (https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L162) private final int CQE_USER_DATA_FIELD = 0; private final int CQE_RES_FIELD = 8; private final int CQE_FLAGS_FIELD = 12; @@ -27,7 +29,7 @@ public class IOUringCompletionQueue { private final int IORING_ENTER_GETEVENTS = 1; - // (k -> kernel) + //these unsigned integer pointers(shared with the kernel) will be changed by the kernel private final long kHeadAddress; private final long kTailAddress; private final long kringMaskAddress; @@ -83,6 +85,7 @@ public class IOUringCompletionQueue { return ioUringCqe; } + //IORING_ENTER_GETEVENTS -> wait until an event is completely processed int ret = Native.ioUringEnter(ringFd, 0, 1, IORING_ENTER_GETEVENTS); if (ret < 0) { //Todo throw exception! @@ -130,7 +133,6 @@ public class IOUringCompletionQueue { return this.ringAddress; } - //Todo Integer.toUnsignedLong -> maven checkstyle error public static long toUnsignedLong(int x) { return ((long) x) & 0xffffffffL; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 08213a0cf4..7e95b57434 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -27,7 +27,6 @@ import java.util.concurrent.Executor; class IOUringEventLoop extends SingleThreadEventLoop { - private final IntObjectMap channels = new IntObjectHashMap(4096); // events should be unique to identify which event type that was private long eventIdCounter; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 2dd0a0b3d8..67f954ae4f 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -22,6 +22,8 @@ public class IOUringSubmissionQueue { private final int SQE_SIZE = 64; private final int INT_SIZE = 4; + //these offsets are used to access specific properties + //SQE https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L21 private final int SQE_OP_CODE_FIELD = 0; private final int SQE_FLAGS_FIELD = 1; private final int SQE_IOPRIO_FIELD = 2; // u16 @@ -33,7 +35,7 @@ public class IOUringSubmissionQueue { private final int SQE_USER_DATA_FIELD = 32; private final int SQE_PAD_FIELD = 40; - // (k -> kernel) + //these unsigned integer pointers(shared with the kernel) will be changed by the kernel private final long kHeadAddress; private final long kTailAddress; private final long kRingMaskAddress; @@ -81,6 +83,7 @@ public class IOUringSubmissionQueue { private void setData(long sqe, long eventId, EventType type, int fd, long bufferAddress, int length, long offset) { //Todo cleaner + //set sqe(submission queue) properties PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp()); PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0); diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index 6be9810750..a14a53dbee 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -25,22 +25,21 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledUnsafeDirectByteBuf; import static org.junit.Assert.*; +import io.netty.buffer.ByteBuf; public class NativeTest { @Test public void canWriteFile() { - //Todo add read operation test final long eventId = 1; ByteBufAllocator allocator = new UnpooledByteBufAllocator(true); - UnpooledUnsafeDirectByteBuf directByteBufPooled = new UnpooledUnsafeDirectByteBuf(allocator, 500, 1000); + ByteBuf writeEventByteBuf = allocator.directBuffer(100); String inputString = "Hello World!"; byte[] byteArrray = inputString.getBytes(); - directByteBufPooled.writeBytes(byteArrray); + writeEventByteBuf.writeBytes(byteArrray); int fd = (int) Native.createFile(); - System.out.println("Filedescriptor: " + fd); RingBuffer ringBuffer = Native.createRingBuffer(32); IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); @@ -50,14 +49,30 @@ public class NativeTest { assertNotNull(submissionQueue); assertNotNull(completionQueue); - assertTrue(submissionQueue.add(eventId, EventType.WRITE, fd, directByteBufPooled.memoryAddress(), - directByteBufPooled.readerIndex(), directByteBufPooled.writerIndex())); + assertTrue(submissionQueue.add(eventId, EventType.WRITE, fd, writeEventByteBuf.memoryAddress(), + writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex())); submissionQueue.submit(); IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); - assertNotNull(ioUringCqe); assertEquals(inputString.length(), ioUringCqe.getRes()); assertEquals(1, ioUringCqe.getEventId()); + writeEventByteBuf.release(); + + ByteBuf readEventByteBuf = allocator.directBuffer(100); + assertTrue(submissionQueue.add(eventId + 1, EventType.READ, fd, readEventByteBuf.memoryAddress(), + readEventByteBuf.writerIndex(), readEventByteBuf.capacity())); + submissionQueue.submit(); + + ioUringCqe = completionQueue.ioUringWaitCqe(); + assertEquals(2, ioUringCqe.getEventId()); + assertEquals(inputString.length(), ioUringCqe.getRes()); + + readEventByteBuf.writerIndex(ioUringCqe.getRes()); + byte[] dataRead = new byte[inputString.length()]; + readEventByteBuf.readBytes(dataRead); + + assertEquals(inputString, new String(dataRead)); + readEventByteBuf.release(); } }