Merge pull request #1 from normanmaurer/io-uring-gc

Reduce GC by remove creation of objects related to completion queue a…
This commit is contained in:
Josef Grieb 2020-08-26 13:07:22 +02:00 committed by GitHub
commit 63c490470c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 360 additions and 488 deletions

View File

@ -24,6 +24,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata; import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig; import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
@ -133,26 +134,107 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
return loop instanceof IOUringEventLoop; return loop instanceof IOUringEventLoop;
} }
private ByteBuf readBuffer;
public void doReadBytes(ByteBuf byteBuf) { public void doReadBytes(ByteBuf byteBuf) {
assert readBuffer == null;
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
if (byteBuf.hasMemoryAddress()) { if (byteBuf.hasMemoryAddress()) {
long eventId = ioUringEventLoop.incrementEventIdCounter(); readBuffer = byteBuf;
final Event event = new Event(); submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
event.setId(eventId);
event.setOp(EventType.READ);
event.setReadBuffer(byteBuf);
event.setAbstractIOUringChannel(this);
submissionQueue.add(eventId, EventType.READ, socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity()); byteBuf.writerIndex(), byteBuf.capacity());
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit(); submissionQueue.submit();
} }
} }
void writeComplete(int res) {
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (res > 0) {
channelOutboundBuffer.removeBytes(res);
setWriteable(true);
try {
doWrite(channelOutboundBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
void readComplete(int localReadAmount) {
boolean close = false;
ByteBuf byteBuf = null;
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline();
try {
logger.trace("EventLoop Read Res: {}", localReadAmount);
logger.trace("EventLoop Fd: {}", fd().intValue());
setUringInReadyPending(false);
byteBuf = this.readBuffer;
this.readBuffer = null;
if (localReadAmount > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
}
allocHandle.lastBytesRead(localReadAmount);
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
shutdownInput(false);
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
return;
}
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
logger.trace("READ autoRead {}", config().isAutoRead());
if (config().isAutoRead()) {
executeReadEvent();
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
}
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
Throwable cause, boolean close,
IOUringRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
shutdownInput(false);
} else {
if (config().isAutoRead()) {
executeReadEvent();
}
}
}
protected final ByteBuf newDirectBuffer(ByteBuf buf) { protected final ByteBuf newDirectBuffer(ByteBuf buf) {
return newDirectBuffer(buf, buf); return newDirectBuffer(buf, buf);
} }
@ -236,6 +318,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
} finally { } finally {
socket.close(); socket.close();
if (readBuffer != null) {
readBuffer.release();
readBuffer = null;
}
} }
} }
@ -273,14 +359,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
final Event event = new Event(); submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
long eventId = ioUringEventLoop.incrementEventIdCounter();
event.setId(eventId);
event.setOp(EventType.WRITE);
event.setAbstractIOUringChannel(this);
submissionQueue.add(eventId, EventType.WRITE, socket.intValue(), buf.memoryAddress(), buf.readerIndex(),
buf.writerIndex()); buf.writerIndex());
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit(); submissionQueue.submit();
writeable = false; writeable = false;
} }
@ -290,13 +370,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
private void addPollOut() { private void addPollOut() {
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
final Event event = new Event(); submissionQueue.addPollOut(socket.intValue());
long eventId = ioUringEventLoop.incrementEventIdCounter();
event.setId(eventId);
event.setOp(EventType.POLL_OUT);
event.setAbstractIOUringChannel(this);
submissionQueue.addPoll(eventId, socket.intValue(), EventType.POLL_OUT);
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit(); submissionQueue.submit();
} }

View File

@ -15,11 +15,7 @@
*/ */
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.channel.Channel; import io.netty.channel.*;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -49,6 +45,27 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
abstract Channel newChildChannel(int fd) throws Exception; abstract Channel newChildChannel(int fd) throws Exception;
void acceptComplete(int res) {
if (res >= 0) {
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline();
allocHandle.incMessagesRead(1);
try {
final Channel childChannel = newChildChannel(res);
pipeline.fireChannelRead(childChannel);
} catch (Exception e) {
e.printStackTrace();
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
//Todo refactoring method name
executeReadEvent();
}
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
private final byte[] acceptedAddress = new byte[26]; private final byte[] acceptedAddress = new byte[26];
@ -58,36 +75,15 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
promise.setFailure(new UnsupportedOperationException()); promise.setFailure(new UnsupportedOperationException());
} }
private void addPoll(IOUringEventLoop ioUringEventLoop) {
long eventId = ioUringEventLoop.incrementEventIdCounter();
Event event = new Event();
event.setOp(EventType.POLL_LINK);
event.setId(eventId);
event.setAbstractIOUringChannel(AbstractIOUringServerChannel.this);
ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue()
.addPoll(eventId, socket.intValue(), event.getOp());
((IOUringEventLoop) eventLoop()).addNewEvent(event);
}
@Override @Override
public void uringEventExecution() { public void uringEventExecution() {
final IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); final IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
submissionQueue.addPollLink(socket.intValue());
addPoll(ioUringEventLoop);
long eventId = ioUringEventLoop.incrementEventIdCounter();
final Event event = new Event();
event.setId(eventId);
event.setOp(EventType.ACCEPT);
event.setAbstractIOUringChannel(getChannel());
//Todo get network addresses //Todo get network addresses
submissionQueue.add(eventId, EventType.ACCEPT, getChannel().getSocket().intValue(), 0, 0, 0); submissionQueue.addAccept(fd().intValue());
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit(); submissionQueue.submit();
} }
} }

View File

@ -1,60 +0,0 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
final class Event {
private long id;
private ByteBuf readBuffer;
//Todo use fd instead
private AbstractIOUringChannel abstractIOUringChannel;
private EventType op;
public AbstractIOUringChannel getAbstractIOUringChannel() {
return abstractIOUringChannel;
}
public void setAbstractIOUringChannel(AbstractIOUringChannel abstractIOUringChannel) {
this.abstractIOUringChannel = abstractIOUringChannel;
}
public ByteBuf getReadBuffer() {
return readBuffer;
}
public void setReadBuffer(ByteBuf readBuffer) {
this.readBuffer = readBuffer;
}
public long getId() {
return id;
}
public void setId(final long id) {
this.id = id;
}
public EventType getOp() {
return op;
}
public void setOp(final EventType op) {
this.op = op;
}
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
enum EventType {
ACCEPT(13),
READ(22),
WRITE(23),
TIMEOUT(11),
POLL_EVENTFD(6),
POLL_LINK(6),
POLL_RDHUP(6),
POLL_OUT(6);
private final int op;
EventType(int op) {
this.op = op;
}
public int getOp() {
return op;
}
}

View File

@ -21,6 +21,15 @@ import io.netty.util.internal.SystemPropertyUtil;
final class IOUring { final class IOUring {
private static final Throwable UNAVAILABILITY_CAUSE; private static final Throwable UNAVAILABILITY_CAUSE;
static final int IO_POLL = 6;
static final int IO_TIMEOUT = 11;
static final int OP_ACCEPT = 13;
static final int OP_READ = 22;
static final int OP_WRITE = 23;
static final int POLLMASK_LINK = 1;
static final int POLLMASK_OUT = 4;
static final int POLLMASK_RDHUP = 8192;
static { static {
Throwable cause = null; Throwable cause = null;

View File

@ -55,46 +55,54 @@ final class IOUringCompletionQueue {
this.ringFd = ringFd; this.ringFd = ringFd;
} }
public IOUringCqe poll() { public int process(IOUringCompletionQueueCallback callback) {
int i = 0;
for (;;) {
long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); long head = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) { if (head != toUnsignedLong(PlatformDependent.getInt(kTailAddress))) {
long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress)); long index = head & toUnsignedLong(PlatformDependent.getInt(kringMaskAddress));
long cqe = index * CQE_SIZE + completionQueueArrayAddress; long cqe = index * CQE_SIZE + completionQueueArrayAddress;
long eventId = PlatformDependent.getLong(cqe + CQE_USER_DATA_FIELD); long udata = PlatformDependent.getLong(cqe + CQE_USER_DATA_FIELD);
int res = PlatformDependent.getInt(cqe + CQE_RES_FIELD); int res = PlatformDependent.getInt(cqe + CQE_RES_FIELD);
long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD)); long flags = toUnsignedLong(PlatformDependent.getInt(cqe + CQE_FLAGS_FIELD));
//Ensure that the kernel only sees the new value of the head index after the CQEs have been read. //Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1)); PlatformDependent.putIntOrdered(kHeadAddress, (int) (head + 1));
return new IOUringCqe(eventId, res, flags); int fd = (int) (udata >> 32);
int opMask = (int) udata;
short op = (short) (opMask >> 16);
short mask = (short) opMask;
i++;
if (!callback.handle(fd, res, flags, op, mask)) {
break;
} }
return null; } else {
if (i == 0) {
return -1;
}
return i;
}
}
return i;
} }
public IOUringCqe ioUringWaitCqe() { interface IOUringCompletionQueueCallback {
IOUringCqe ioUringCqe = poll(); boolean handle(int fd, int res, long flags, int op, int mask);
if (ioUringCqe != null) {
return ioUringCqe;
} }
public boolean ioUringWaitCqe() {
//IORING_ENTER_GETEVENTS -> wait until an event is completely processed //IORING_ENTER_GETEVENTS -> wait until an event is completely processed
int ret = Native.ioUringEnter(ringFd, 0, 1, IORING_ENTER_GETEVENTS); int ret = Native.ioUringEnter(ringFd, 0, 1, IORING_ENTER_GETEVENTS);
if (ret < 0) { if (ret < 0) {
//Todo throw exception! //Todo throw exception!
return null; return false;
} else if (ret == 0) { } else if (ret == 0) {
ioUringCqe = poll(); return true;
if (ioUringCqe != null) {
return ioUringCqe;
}
} }
//Todo throw Exception! //Todo throw Exception!
return null; return false;
} }
public long getKHeadAddress() { public long getKHeadAddress() {

View File

@ -1,40 +0,0 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring;
final class IOUringCqe {
private final long eventId;
private final int res;
private final long flags;
IOUringCqe(long eventId, int res, long flags) {
this.eventId = eventId;
this.res = res;
this.flags = flags;
}
public long getEventId() {
return this.eventId;
}
public int getRes() {
return this.res;
}
public long getFlags() {
return this.flags;
}
}

View File

@ -15,16 +15,11 @@
*/ */
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.LongObjectHashMap;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -36,7 +31,8 @@ import java.util.concurrent.atomic.AtomicLong;
import static io.netty.channel.unix.Errors.*; import static io.netty.channel.unix.Errors.*;
final class IOUringEventLoop extends SingleThreadEventLoop { final class IOUringEventLoop extends SingleThreadEventLoop implements
IOUringCompletionQueue.IOUringCompletionQueueCallback {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
//Todo set config ring buffer size //Todo set config ring buffer size
@ -48,7 +44,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
// events should be unique to identify which event type that was // events should be unique to identify which event type that was
private long eventIdCounter; private long eventIdCounter;
private final LongObjectHashMap<Event> events = new LongObjectHashMap<Event>();
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096); private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
private final RingBuffer ringBuffer; private final RingBuffer ringBuffer;
@ -84,13 +79,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
} }
public long incrementEventIdCounter() {
long eventId = eventIdCounter;
logger.trace("incrementEventIdCounter EventId: {}", eventId);
eventIdCounter++;
return eventId;
}
public void add(AbstractIOUringChannel ch) { public void add(AbstractIOUringChannel ch) {
logger.trace("Add Channel: {} ", ch.socket.intValue()); logger.trace("Add Channel: {} ", ch.socket.intValue());
int fd = ch.socket.intValue(); int fd = ch.socket.intValue();
@ -123,22 +111,13 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
} }
} }
public void addNewEvent(Event event) {
events.put(event.getId(), event);
}
@Override @Override
protected void run() { protected void run() {
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue(); final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
// Lets add the eventfd related events before starting to do any real work. // Lets add the eventfd related events before starting to do any real work.
long eventId = incrementEventIdCounter(); submissionQueue.addPollLink(eventfd.intValue());
Event event = new Event();
event.setOp(EventType.POLL_EVENTFD);
event.setId(eventId);
addNewEvent(event);
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
submissionQueue.submit(); submissionQueue.submit();
for (;;) { for (;;) {
@ -148,7 +127,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
curDeadlineNanos = NONE; // nothing on the calendar curDeadlineNanos = NONE; // nothing on the calendar
} }
nextWakeupNanos.set(curDeadlineNanos); nextWakeupNanos.set(curDeadlineNanos);
IOUringCqe ioUringCqe;
// Only submit a timeout if there are no tasks to process and do a blocking operation // Only submit a timeout if there are no tasks to process and do a blocking operation
// on the completionQueue. // on the completionQueue.
@ -156,43 +134,24 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
try { try {
if (curDeadlineNanos != prevDeadlineNanos) { if (curDeadlineNanos != prevDeadlineNanos) {
prevDeadlineNanos = curDeadlineNanos; prevDeadlineNanos = curDeadlineNanos;
event = new Event(); submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos));
eventId = incrementEventIdCounter();
event.setId(eventId);
event.setOp(EventType.TIMEOUT);
addNewEvent(event);
submissionQueue.addTimeout(deadlineToDelayNanos(curDeadlineNanos), eventId);
submissionQueue.submit(); submissionQueue.submit();
} }
// Block if there is nothing to process. // Check there were any completion events to process
if (completionQueue.process(this) == -1) {
// Block if there is nothing to process after this try again to call process(....)
logger.trace("ioUringWaitCqe {}", this.toString()); logger.trace("ioUringWaitCqe {}", this.toString());
ioUringCqe = completionQueue.ioUringWaitCqe(); completionQueue.ioUringWaitCqe();
}
} finally { } finally {
if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) { if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
pendingWakeup = true; pendingWakeup = true;
} }
} }
} else {
// Just poll as there are tasks to process so we don't want to block.
logger.trace("poll {}", this.toString());
ioUringCqe = completionQueue.poll();
} }
while (ioUringCqe != null) { completionQueue.process(this);
event = events.get(ioUringCqe.getEventId());
if (event != null) {
logger.trace("EventType Incoming: " + event.getOp().name());
processEvent(ioUringCqe.getRes(), event);
}
// Process one entry after the other until there are none left. This will ensure we process
// all of these before we try to consume tasks.
ioUringCqe = completionQueue.poll();
logger.trace("poll {}", this.toString());
}
if (hasTasks()) { if (hasTasks()) {
runAllTasks(); runAllTasks();
@ -211,145 +170,77 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
} }
} }
private void processEvent(final int res, final Event event) { @Override
// Remove the id first so we not end up with invalid entries in any cases. public boolean handle(int fd, int res, long flags, int op, int pollMask) {
this.events.remove(event.getId());
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
switch (event.getOp()) { switch (op) {
case ACCEPT: case IOUring.OP_ACCEPT:
AbstractIOUringServerChannel acceptChannel = (AbstractIOUringServerChannel) channels.get(fd);
if (acceptChannel == null) {
break;
}
logger.trace("EventLoop Accept filedescriptor: {}", res); logger.trace("EventLoop Accept filedescriptor: {}", res);
event.getAbstractIOUringChannel().setUringInReadyPending(false); acceptChannel.setUringInReadyPending(false);
if (res != -1 && res != ERRNO_EAGAIN_NEGATIVE && if (res != -1 && res != ERRNO_EAGAIN_NEGATIVE &&
res != ERRNO_EWOULDBLOCK_NEGATIVE) { res != ERRNO_EWOULDBLOCK_NEGATIVE) {
AbstractIOUringServerChannel abstractIOUringServerChannel = logger.trace("server filedescriptor Fd: {}", fd);
(AbstractIOUringServerChannel) event.getAbstractIOUringChannel(); acceptChannel.acceptComplete(res);
logger.trace("server filedescriptor Fd: {}", abstractIOUringServerChannel.getSocket().intValue()); pollRdHup(res);
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
allocHandle.lastBytesRead(res);
if (allocHandle.lastBytesRead() > 0) {
allocHandle.incMessagesRead(1);
try {
final Channel childChannel =
abstractIOUringServerChannel.newChildChannel(allocHandle.lastBytesRead());
pipeline.fireChannelRead(childChannel);
pollRdHup((AbstractIOUringChannel) childChannel);
} catch (Exception e) {
e.printStackTrace();
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
//Todo refactoring method name
event.getAbstractIOUringChannel().executeReadEvent();
break;
case READ:
boolean close = false;
ByteBuf byteBuf = null;
int localReadAmount = res;
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
try {
logger.trace("EventLoop Read Res: {}", res);
logger.trace("EventLoop Fd: {}", event.getAbstractIOUringChannel().getSocket().intValue());
event.getAbstractIOUringChannel().setUringInReadyPending(false);
byteBuf = event.getReadBuffer();
if (localReadAmount > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
}
allocHandle.lastBytesRead(localReadAmount);
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
event.getAbstractIOUringChannel().shutdownInput(false);
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
break;
}
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
logger.trace("READ autoRead {}", event.getAbstractIOUringChannel().config().isAutoRead());
if (event.getAbstractIOUringChannel().config().isAutoRead()) {
event.getAbstractIOUringChannel().executeReadEvent();
}
} catch (Throwable t) {
handleReadException(event.getAbstractIOUringChannel(), pipeline, byteBuf, t, close, allocHandle);
} }
break; break;
case WRITE: case IOUring.OP_READ:
AbstractIOUringChannel readChannel = channels.get(fd);
if (readChannel == null) {
break;
}
readChannel.readComplete(res);
break;
case IOUring.OP_WRITE:
AbstractIOUringChannel writeChannel = channels.get(fd);
if (writeChannel == null) {
break;
}
//localFlushAmount -> res //localFlushAmount -> res
logger.trace("EventLoop Write Res: {}", res); logger.trace("EventLoop Write Res: {}", res);
logger.trace("EventLoop Fd: {}", event.getAbstractIOUringChannel().getSocket().intValue()); logger.trace("EventLoop Fd: {}", fd);
ChannelOutboundBuffer channelOutboundBuffer = event
.getAbstractIOUringChannel().unsafe().outboundBuffer();
AbstractIOUringChannel channel = event.getAbstractIOUringChannel();
if (res == SOCKET_ERROR_EPIPE) { if (res == SOCKET_ERROR_EPIPE) {
event.getAbstractIOUringChannel().shutdownInput(false); writeChannel.shutdownInput(false);
break; } else {
} writeChannel.writeComplete(res);
if (res > 0) {
channelOutboundBuffer.removeBytes(res);
channel.setWriteable(true);
try {
event.getAbstractIOUringChannel().doWrite(channelOutboundBuffer);
} catch (Exception e) {
e.printStackTrace();
}
} }
break; break;
case TIMEOUT: case IOUring.IO_TIMEOUT:
if (res == ETIME) { if (res == ETIME) {
prevDeadlineNanos = NONE; prevDeadlineNanos = NONE;
} }
break; break;
case POLL_EVENTFD: case IOUring.IO_POLL:
if (eventfd.intValue() == fd) {
pendingWakeup = false; pendingWakeup = false;
// We need to consume the data as otherwise we would see another event in the completionQueue without // We need to consume the data as otherwise we would see another event in the completionQueue without
// an extra eventfd_write(....) // an extra eventfd_write(....)
Native.eventFdRead(eventfd.intValue()); Native.eventFdRead(eventfd.intValue());
Event eventfdEvent = new Event(); submissionQueue.addPollLink(eventfd.intValue());
eventfdEvent.setId(incrementEventIdCounter());
eventfdEvent.setOp(EventType.POLL_EVENTFD);
addNewEvent(eventfdEvent);
submissionQueue.addPoll(eventfdEvent.getId(), eventfd.intValue(), eventfdEvent.getOp());
// Submit so its picked up // Submit so its picked up
submissionQueue.submit(); submissionQueue.submit();
break; } else {
case POLL_LINK: if (pollMask == IOUring.POLLMASK_RDHUP) {
AbstractIOUringChannel channel = channels.get(fd);
if (channel != null && !channel.isActive()) {
channel.shutdownInput(true);
}
} else {
//Todo error handling error //Todo error handling error
logger.trace("POLL_LINK Res: {}", res); logger.trace("POLL_LINK Res: {}", res);
break; break;
case POLL_RDHUP:
if (!event.getAbstractIOUringChannel().isActive()) {
event.getAbstractIOUringChannel().shutdownInput(true);
} }
break; }
case POLL_OUT:
logger.trace("POLL_OUT Res: {}", res);
break; break;
} }
return true;
} }
@Override @Override
@ -374,38 +265,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
} }
} }
private void handleReadException(AbstractIOUringChannel channel, ChannelPipeline pipeline, ByteBuf byteBuf,
Throwable cause, boolean close,
IOUringRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
channel.shutdownInput(false);
} else {
if (channel.config().isAutoRead()) {
channel.executeReadEvent();
}
}
}
//to be notified when the filedesciptor is closed //to be notified when the filedesciptor is closed
private void pollRdHup(AbstractIOUringChannel channel) { private void pollRdHup(int fd) {
//all childChannels should poll POLLRDHUP //all childChannels should poll POLLRDHUP
long eventId = incrementEventIdCounter(); ringBuffer.getIoUringSubmissionQueue().addPollRdHup(fd);
Event event = new Event();
event.setOp(EventType.POLL_RDHUP);
event.setId(eventId);
event.setAbstractIOUringChannel(channel);
addNewEvent(event);
ringBuffer.getIoUringSubmissionQueue().addPoll(eventId, channel.socket.intValue(), event.getOp());
ringBuffer.getIoUringSubmissionQueue().submit(); ringBuffer.getIoUringSubmissionQueue().submit();
} }
} }

View File

@ -28,9 +28,6 @@ final class IOUringSubmissionQueue {
private static final int SQE_SIZE = 64; private static final int SQE_SIZE = 64;
private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support? private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
private static final int POLLIN = 1;
private static final int POLLRDHUP = 8192;
private static final int POLLOUT = 4;
private static final int IOSQE_IO_LINK = 4; private static final int IOSQE_IO_LINK = 4;
@ -109,26 +106,31 @@ final class IOUringSubmissionQueue {
return sqe; return sqe;
} }
private void setData(long sqe, long eventId, EventType type, int fd, long bufferAddress, int length, long offset) { private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress, int length, long offset) {
//Todo cleaner //Todo cleaner
//set sqe(submission queue) properties //set sqe(submission queue) properties
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) type.getOp()); PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op);
PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0); PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0);
PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd); PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset); PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress);
PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length); PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId);
// Store the fd and event type in the user_data field
int opMask = (((short) op) << 16) | (((short) pollMask) & 0xFFFF);
long uData = (long)fd << 32 | opMask & 0xFFFFFFFFL;
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);
//poll<link>read or accept operation //poll<link>read or accept operation
if (type == EventType.POLL_LINK || type == EventType.POLL_OUT) { if (op == 6 && (pollMask == IOUring.POLLMASK_OUT || pollMask == IOUring.POLLMASK_LINK)) {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK); PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK);
} else { } else {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
} }
//c union set Rw-Flags or accept_flags //c union set Rw-Flags or accept_flags
if (type != EventType.ACCEPT) { if (op != IOUring.OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else { } else {
//accept_flags set NON_BLOCKING //accept_flags set NON_BLOCKING
@ -142,56 +144,74 @@ final class IOUringSubmissionQueue {
offsetIndex += 8; offsetIndex += 8;
} }
logger.trace("OPField: {}", type.name()); if (pollMask != 0) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
}
logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD));
logger.trace("Offset: {}", PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD)); logger.trace("Offset: {}", PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD));
} }
public boolean addTimeout(long nanoSeconds, long eventId) { public boolean addTimeout(long nanoSeconds) {
long sqe = getSqe(); long sqe = getSqe();
if (sqe == 0) { if (sqe == 0) {
return false; return false;
} }
setTimeout(nanoSeconds); setTimeout(nanoSeconds);
setData(sqe, eventId, EventType.TIMEOUT, -1, timeoutMemoryAddress, 1, 0); setData(sqe, (byte) IOUring.IO_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0);
return true; return true;
} }
public boolean addPoll(long eventId, int fd, EventType eventType) { public boolean addPollLink(int fd) {
return addPoll(fd, IOUring.POLLMASK_LINK);
}
public boolean addPollOut(int fd) {
return addPoll(fd, IOUring.POLLMASK_OUT);
}
public boolean addPollRdHup(int fd) {
return addPoll(fd, IOUring.POLLMASK_RDHUP);
}
private boolean addPoll(int fd, int pollMask) {
long sqe = getSqe(); long sqe = getSqe();
if (sqe == 0) { if (sqe == 0) {
return false; return false;
} }
int pollMask;
switch (eventType) { setData(sqe, (byte) IOUring.IO_POLL, pollMask, fd, 0, 0, 0);
case POLL_EVENTFD:
case POLL_LINK:
pollMask = POLLIN;
break;
case POLL_OUT:
pollMask = POLLOUT;
break;
case POLL_RDHUP:
pollMask = POLLRDHUP;
break;
default:
//Todo exeception
return false;
}
setData(sqe, eventId, eventType, fd, 0, 0, 0);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
return true; return true;
} }
//Todo ring buffer errors for example if submission queue is full public boolean addRead(int fd, long bufferAddress, int pos, int limit) {
public boolean add(long eventId, EventType type, int fd, long bufferAddress, int pos, int limit) {
long sqe = getSqe(); long sqe = getSqe();
if (sqe == 0) { if (sqe == 0) {
return false; return false;
} }
setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0); setData(sqe, (byte) IOUring.OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0);
return true;
}
public boolean addWrite(int fd, long bufferAddress, int pos, int limit) {
long sqe = getSqe();
if (sqe == 0) {
return false;
}
setData(sqe, (byte) IOUring.OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0);
return true;
}
public boolean addAccept(int fd) {
long sqe = getSqe();
if (sqe == 0) {
return false;
}
setData(sqe, (byte) IOUring.OP_ACCEPT, 0, fd, 0,0,0);
return true; return true;
} }

View File

@ -0,0 +1,16 @@
package io.netty.channel.uring;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketFixedLengthEchoTest;
import java.util.List;
public class IOUringSocketFixedLengthEchoTest extends SocketFixedLengthEchoTest {
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return IOUringSocketTestPermutation.INSTANCE.socket();
}
}

View File

@ -31,8 +31,8 @@ public class NativeTest {
final long eventId = 1; final long eventId = 1;
ByteBufAllocator allocator = new UnpooledByteBufAllocator(true); ByteBufAllocator allocator = new UnpooledByteBufAllocator(true);
ByteBuf writeEventByteBuf = allocator.directBuffer(100); final ByteBuf writeEventByteBuf = allocator.directBuffer(100);
String inputString = "Hello World!"; final String inputString = "Hello World!";
writeEventByteBuf.writeCharSequence(inputString, Charset.forName("UTF-8")); writeEventByteBuf.writeCharSequence(inputString, Charset.forName("UTF-8"));
int fd = Native.createFile(); int fd = Native.createFile();
@ -45,26 +45,34 @@ public class NativeTest {
assertNotNull(submissionQueue); assertNotNull(submissionQueue);
assertNotNull(completionQueue); assertNotNull(completionQueue);
assertTrue(submissionQueue.add(eventId, EventType.WRITE, fd, writeEventByteBuf.memoryAddress(), assertTrue(submissionQueue.addWrite(fd, writeEventByteBuf.memoryAddress(),
writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex())); writeEventByteBuf.readerIndex(), writeEventByteBuf.writerIndex()));
submissionQueue.submit(); submissionQueue.submit();
IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); assertTrue(completionQueue.ioUringWaitCqe());
assertNotNull(ioUringCqe); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
assertEquals(inputString.length(), ioUringCqe.getRes()); @Override
assertEquals(1, ioUringCqe.getEventId()); public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(inputString.length(), res);
writeEventByteBuf.release(); writeEventByteBuf.release();
return true;
}
}));
ByteBuf readEventByteBuf = allocator.directBuffer(100); final ByteBuf readEventByteBuf = allocator.directBuffer(100);
assertTrue(submissionQueue.add(eventId + 1, EventType.READ, fd, readEventByteBuf.memoryAddress(), assertTrue(submissionQueue.addRead(fd, readEventByteBuf.memoryAddress(),
readEventByteBuf.writerIndex(), readEventByteBuf.capacity())); readEventByteBuf.writerIndex(), readEventByteBuf.capacity()));
submissionQueue.submit(); submissionQueue.submit();
ioUringCqe = completionQueue.ioUringWaitCqe(); assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(2, ioUringCqe.getEventId()); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
assertEquals(inputString.length(), ioUringCqe.getRes()); @Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
readEventByteBuf.writerIndex(ioUringCqe.getRes()); assertEquals(inputString.length(), res);
readEventByteBuf.writerIndex(res);
return true;
}
}));
byte[] dataRead = new byte[inputString.length()]; byte[] dataRead = new byte[inputString.length()];
readEventByteBuf.readBytes(dataRead); readEventByteBuf.readBytes(dataRead);
@ -86,9 +94,14 @@ public class NativeTest {
Thread thread = new Thread() { Thread thread = new Thread() {
@Override @Override
public void run() { public void run() {
final IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(-62, ioUringCqe.getRes()); completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
assertEquals(1, ioUringCqe.getEventId()); @Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(-62, res);
return true;
}
});
} }
}; };
thread.start(); thread.start();
@ -98,7 +111,7 @@ public class NativeTest {
e.printStackTrace(); e.printStackTrace();
} }
submissionQueue.addTimeout(0, 1); submissionQueue.addTimeout(0);
submissionQueue.submit(); submissionQueue.submit();
} }
@ -114,7 +127,7 @@ public class NativeTest {
assertNotNull(completionQueue); assertNotNull(completionQueue);
final FileDescriptor eventFd = Native.newEventFd(); final FileDescriptor eventFd = Native.newEventFd();
assertTrue(submissionQueue.addPoll(1, eventFd.intValue(), EventType.POLL_EVENTFD)); assertTrue(submissionQueue.addPollLink(eventFd.intValue()));
submissionQueue.submit(); submissionQueue.submit();
new Thread() { new Thread() {
@ -124,9 +137,14 @@ public class NativeTest {
} }
}.start(); }.start();
IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, ioUringCqe.getRes()); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
assertEquals(1, ioUringCqe.getEventId()); @Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(1, res);
return true;
}
}));
} }
//Todo clean //Todo clean
@ -146,14 +164,19 @@ public class NativeTest {
Thread waitingCqe = new Thread() { Thread waitingCqe = new Thread() {
@Override @Override
public void run() { public void run() {
IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe(); assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, ioUringCqe.getRes()); assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
assertEquals(1, ioUringCqe.getEventId()); @Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(1, res);
return true;
}
}));
} }
}; };
waitingCqe.start(); waitingCqe.start();
final FileDescriptor eventFd = Native.newEventFd(); final FileDescriptor eventFd = Native.newEventFd();
assertTrue(submissionQueue.addPoll(1, eventFd.intValue(), EventType.POLL_EVENTFD)); assertTrue(submissionQueue.addPollLink(eventFd.intValue()));
submissionQueue.submit(); submissionQueue.submit();
new Thread() { new Thread() {