Add polling POLLOUT

Motivation:

no checks for non writeable sockets

Modifications:

-Added a linked write poll to make sure that the socket does not write if it is not writeable
-Added a new boolean to avoid to submit a second write operation

Result:

writeable socket check
This commit is contained in:
Josef Grieb 2020-08-24 10:33:07 +02:00 committed by josef
parent d3a0395ac2
commit b10b4ca6e7
4 changed files with 72 additions and 23 deletions

View File

@ -40,6 +40,7 @@ import java.nio.channels.UnresolvedAddressException;
import static io.netty.util.internal.ObjectUtil.*; import static io.netty.util.internal.ObjectUtil.*;
abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel { abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIOUringChannel.class);
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
final LinuxSocket socket; final LinuxSocket socket;
protected volatile boolean active; protected volatile boolean active;
@ -233,7 +234,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
if (in.size() >= 1) { logger.info("IOUring doWrite message size: {}", in.size());
if (writeable && in.size() >= 1) {
Object msg = in.current(); Object msg = in.current();
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
doWriteBytes((ByteBuf) msg); doWriteBytes((ByteBuf) msg);
@ -243,6 +245,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
protected final void doWriteBytes(ByteBuf buf) { protected final void doWriteBytes(ByteBuf buf) {
if (buf.hasMemoryAddress()) { if (buf.hasMemoryAddress()) {
//link poll<link>write operation
addPollOut();
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop(); IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue(); IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
final Event event = new Event(); final Event event = new Event();
@ -254,9 +259,24 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
buf.writerIndex()); buf.writerIndex());
ioUringEventLoop.addNewEvent(event); ioUringEventLoop.addNewEvent(event);
submissionQueue.submit(); submissionQueue.submit();
writeable = false;
} }
} }
//POLLOUT
private void addPollOut() {
IOUringEventLoop ioUringEventLoop = (IOUringEventLoop) eventLoop();
IOUringSubmissionQueue submissionQueue = ioUringEventLoop.getRingBuffer().getIoUringSubmissionQueue();
final Event event = new Event();
long eventId = ioUringEventLoop.incrementEventIdCounter();
event.setId(eventId);
event.setOp(EventType.POLL_OUT);
event.setAbstractIOUringChannel(this);
submissionQueue.addPoll(eventId, socket.intValue(), EventType.POLL_OUT);
ioUringEventLoop.addNewEvent(event);
submissionQueue.submit();
}
abstract class AbstractUringUnsafe extends AbstractUnsafe { abstract class AbstractUringUnsafe extends AbstractUnsafe {
private IOUringRecvByteAllocatorHandle allocHandle; private IOUringRecvByteAllocatorHandle allocHandle;
private final Runnable readRunnable = new Runnable() { private final Runnable readRunnable = new Runnable() {

View File

@ -1,3 +1,18 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;

View File

@ -16,6 +16,7 @@
package io.netty.channel.uring; package io.netty.channel.uring;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -24,7 +25,10 @@ import io.netty.channel.unix.FileDescriptor;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
import io.netty.util.collection.LongObjectHashMap; import io.netty.util.collection.LongObjectHashMap;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -63,7 +67,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp); super(parent, executor, addTaskWakesUp);
ringBuffer = Native.createRingBuffer(32);
ringBuffer = Native.createRingBuffer(ringSize);
eventfd = Native.newEventFd(); eventfd = Native.newEventFd();
long eventId = incrementEventIdCounter(); long eventId = incrementEventIdCounter();
Event event = new Event(); Event event = new Event();
@ -241,22 +246,27 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
event.getAbstractIOUringChannel().executeReadEvent(); event.getAbstractIOUringChannel().executeReadEvent();
break; break;
case WRITE: case WRITE:
System.out.println("EventLoop Write Res: " + res); //localFlushAmount -> res
System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().intValue()); logger.info("EventLoop Write Res: {}", res);
System.out.println("EventLoop Pipeline: " + event.getAbstractIOUringChannel().eventLoop()); logger.info("EventLoop Fd: {}", event.getAbstractIOUringChannel().getSocket().intValue());
ChannelOutboundBuffer channelOutboundBuffer = event ChannelOutboundBuffer channelOutboundBuffer = event
.getAbstractIOUringChannel().unsafe().outboundBuffer(); .getAbstractIOUringChannel().unsafe().outboundBuffer();
//remove bytes AbstractIOUringChannel channel = event.getAbstractIOUringChannel();
int localFlushAmount = res;
if (localFlushAmount > 0) { if (res == SOCKET_ERROR_EPIPE) {
channelOutboundBuffer.removeBytes(localFlushAmount); event.getAbstractIOUringChannel().shutdownInput(false);
break;
} }
if (res > 0) {
channelOutboundBuffer.removeBytes(res);
channel.setWriteable(true);
try { try {
event.getAbstractIOUringChannel().doWrite(channelOutboundBuffer); event.getAbstractIOUringChannel().doWrite(channelOutboundBuffer);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
}
break; break;
case TIMEOUT: case TIMEOUT:
if (res == ETIME) { if (res == ETIME) {

View File

@ -21,11 +21,15 @@ import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
final class IOUringSubmissionQueue { final class IOUringSubmissionQueue {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringSubmissionQueue.class);
private static final int SQE_SIZE = 64; private static final int SQE_SIZE = 64;
private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support? private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
private static final int POLLIN = 1; private static final int POLLIN = 1;
private static final int POLLRDHUP = 8192;
private static final int POLLOUT = 4;
private static final int IOSQE_IO_LINK = 4; private static final int IOSQE_IO_LINK = 4;
//these offsets are used to access specific properties //these offsets are used to access specific properties
@ -97,6 +101,9 @@ final class IOUringSubmissionQueue {
sqe = SQE_SIZE * index + submissionQueueArrayAddress; sqe = SQE_SIZE * index + submissionQueueArrayAddress;
sqeTail = next; sqeTail = next;
} }
if (sqe == 0) {
logger.info("sqe is null");
}
return sqe; return sqe;
} }
@ -112,7 +119,7 @@ final class IOUringSubmissionQueue {
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId); PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, eventId);
//poll<link>read or accept operation //poll<link>read or accept operation
if (type == EventType.POLL_LINK) { if (type == EventType.POLL_LINK || type == EventType.POLL_OUT) {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK); PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK);
} else { } else {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
@ -182,9 +189,6 @@ final class IOUringSubmissionQueue {
if (sqe == 0) { if (sqe == 0) {
return false; return false;
} }
System.out.println("fd " + fd);
System.out.println("BufferAddress + pos: " + (bufferAddress + pos));
System.out.println("limit + pos " + (limit - pos));
setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0); setData(sqe, eventId, type, fd, bufferAddress + pos, limit - pos, 0);
return true; return true;
} }
@ -194,10 +198,10 @@ final class IOUringSubmissionQueue {
long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress));
System.out.println("Ktail: " + kTail); logger.info("Ktail: {}", kTail);
System.out.println("Ktail: " + kHead); logger.info("Ktail: {}", kHead);
System.out.println("SqeHead: " + sqeHead); logger.info("SqeHead: {}", sqeHead);
System.out.println("SqeTail: " + sqeTail); logger.info("SqeTail: {}", sqeTail);
if (sqeHead == sqeTail) { if (sqeHead == sqeTail) {
return (int) (kTail - kHead); return (int) (kTail - kHead);
@ -222,7 +226,7 @@ final class IOUringSubmissionQueue {
public void submit() { public void submit() {
int submitted = flushSqe(); int submitted = flushSqe();
System.out.println("Submitted: " + submitted); logger.info("Submitted: {}", submitted);
int ret = Native.ioUringEnter(ringFd, submitted, 0, 0); int ret = Native.ioUringEnter(ringFd, submitted, 0, 0);
if (ret < 0) { if (ret < 0) {