Add polling RdHup

Motivation:

when the channel connection is lost, we dont get any notification(unless the customer has not submitted a writer or read event)

Modifications:

add rhup polling to be notified when the connection is lost

Result:

the eventloop is notified on connection losts
This commit is contained in:
Josef Grieb 2020-08-24 10:00:48 +02:00 committed by josef
parent 0160284301
commit d3a0395ac2
3 changed files with 55 additions and 8 deletions

View File

@ -21,7 +21,9 @@ enum EventType {
WRITE(23),
TIMEOUT(11),
POLL_EVENTFD(6),
POLL_LINK(6);
POLL_LINK(6),
POLL_RDHUP(6),
POLL_OUT(6);
private final int op;

View File

@ -31,6 +31,14 @@ import java.util.concurrent.atomic.AtomicLong;
import static io.netty.channel.unix.Errors.*;
final class IOUringEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
//Todo set config ring buffer size
private final int ringSize = 32;
//just temporary -> Todo use ErrorsStaticallyReferencedJniMethods like in Epoll
private final int SOCKET_ERROR_EPIPE = -32;
private static long ETIME = -62;
// events should be unique to identify which event type that was
private long eventIdCounter;
@ -171,12 +179,13 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
switch (event.getOp()) {
case ACCEPT:
System.out.println("EventLoop Accept Res: " + res);
logger.info("EventLoop Accept filedescriptor: {}", res);
event.getAbstractIOUringChannel().setUringInReadyPending(false);
if (res != -1 && res != ERRNO_EAGAIN_NEGATIVE &&
res != ERRNO_EWOULDBLOCK_NEGATIVE) {
AbstractIOUringServerChannel abstractIOUringServerChannel =
(AbstractIOUringServerChannel) event.getAbstractIOUringChannel();
System.out.println("EventLoop Fd: " + abstractIOUringServerChannel.getSocket().intValue());
logger.info("server filedescriptor Fd: {}", abstractIOUringServerChannel.getSocket().intValue());
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
.recvBufAllocHandle();
@ -186,8 +195,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
if (allocHandle.lastBytesRead() > 0) {
allocHandle.incMessagesRead(1);
try {
pipeline.fireChannelRead(abstractIOUringServerChannel
.newChildChannel(allocHandle.lastBytesRead()));
final Channel childChannel =
abstractIOUringServerChannel.newChildChannel(allocHandle.lastBytesRead());
pipeline.fireChannelRead(childChannel);
pollRdHup((AbstractIOUringChannel) childChannel);
} catch (Exception e) {
e.printStackTrace();
}
@ -263,7 +274,12 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
case POLL_LINK:
//Todo error handling error
System.out.println("POLL_LINK Res: " + res);
logger.info("POLL_LINK Res: {}", res);
break;
case POLL_RDHUP:
if (!event.getAbstractIOUringChannel().isActive()) {
event.getAbstractIOUringChannel().shutdownInput(true);
}
break;
}
this.events.remove(event.getId());
@ -280,4 +296,17 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
Native.eventFdWrite(eventfd.intValue(), 1L);
}
}
//to be notified when the filedesciptor is closed
private void pollRdHup(AbstractIOUringChannel channel) {
//all childChannels should poll POLLRDHUP
long eventId = incrementEventIdCounter();
Event event = new Event();
event.setOp(EventType.POLL_RDHUP);
event.setId(eventId);
event.setAbstractIOUringChannel(channel);
addNewEvent(event);
ringBuffer.getIoUringSubmissionQueue().addPoll(eventId, channel.socket.intValue(), event.getOp());
ringBuffer.getIoUringSubmissionQueue().submit();
}
}

View File

@ -155,8 +155,24 @@ final class IOUringSubmissionQueue {
if (sqe == 0) {
return false;
}
int pollMask;
switch (eventType) {
case POLL_EVENTFD:
case POLL_LINK:
pollMask = POLLIN;
break;
case POLL_OUT:
pollMask = POLLOUT;
break;
case POLL_RDHUP:
pollMask = POLLRDHUP;
break;
default:
//Todo exeception
return false;
}
setData(sqe, eventId, eventType, fd, 0, 0, 0);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, POLLIN);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
return true;
}