Merge pull request #11 from normanmaurer/auto_read

Correctly stop reading when AUTO_READ is set to off and also ensure w…
This commit is contained in:
Josef Grieb 2020-08-31 17:48:05 +02:00 committed by GitHub
commit e1a582d798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 53 deletions

View File

@ -60,14 +60,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
final LinuxSocket socket; final LinuxSocket socket;
protected volatile boolean active; protected volatile boolean active;
private boolean pollInScheduled = false; private boolean pollInPending = false;
private boolean pollOutPending = false;
private boolean writeScheduled = false;
private boolean readScheduled = false;
boolean inputClosedSeenErrorOnRead; boolean inputClosedSeenErrorOnRead;
static final int SOCK_ADDR_LEN = 128; static final int SOCK_ADDR_LEN = 128;
//can only submit one write operation at a time
private boolean writeScheduled = false;
/** /**
* The future of the current connection attempt. If not null, subsequent connection attempts will fail. * The future of the current connection attempt. If not null, subsequent connection attempts will fail.
*/ */
@ -211,7 +211,15 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
active = false; active = false;
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRemove(socket.intValue()); if (pollInPending) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN);
pollInPending = false;
}
if (pollOutPending) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT);
pollOutPending = false;
}
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP);
submissionQueue.submit(); submissionQueue.submit();
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
@ -258,15 +266,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override @Override
protected void doBeginRead() { protected void doBeginRead() {
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
if (!pollInScheduled) { if (!pollInPending) {
unsafe.schedulePollIn(); unsafe.schedulePollIn();
} }
} }
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) { protected void doWrite(ChannelOutboundBuffer in) {
logger.trace("IOUring doWrite message size: {}", in.size());
if (writeScheduled) { if (writeScheduled) {
return; return;
} }
@ -280,6 +286,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
private void doWriteMultiple(ChannelOutboundBuffer in) { private void doWriteMultiple(ChannelOutboundBuffer in) {
final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool(); final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool();
iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress(); iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress();
@ -313,6 +320,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
//POLLOUT //POLLOUT
private void addPollOut() { private void addPollOut() {
assert !pollOutPending;
pollOutPending = true;
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollOut(socket.intValue()); submissionQueue.addPollOut(socket.intValue());
submissionQueue.submit(); submissionQueue.submit();
@ -326,7 +335,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// Flush immediately only when there's no pending flush. // Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later, // If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now. // and thus there's no need to call it now.
if (!writeScheduled) { if (!pollOutPending) {
super.flush0(); super.flush0();
} }
} }
@ -418,18 +427,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
void schedulePollIn() { void schedulePollIn() {
assert !pollInScheduled; assert !pollInPending;
if (!isActive() || shouldBreakIoUringInReady(config())) { if (!isActive() || shouldBreakIoUringInReady(config())) {
return; return;
} }
pollInScheduled = true; pollInPending = true;
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollIn(socket.intValue()); submissionQueue.addPollIn(socket.intValue());
submissionQueue.submit(); submissionQueue.submit();
} }
final void readComplete(int res) { final void readComplete(int res) {
pollInScheduled = false; readScheduled = false;
readComplete0(res); readComplete0(res);
} }
@ -440,11 +450,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
*/ */
final void pollRdHup(int res) { final void pollRdHup(int res) {
if (isActive()) { if (isActive()) {
if (!pollInScheduled) { if (!readScheduled) {
// If it is still active, we need to call epollInReady as otherwise we may miss to scheduleRead();
// read pending data from the underlying file descriptor.
// See https://github.com/netty/netty/issues/3709
pollIn(res);
} }
} else { } else {
// Just to be safe make sure the input marked as closed. // Just to be safe make sure the input marked as closed.
@ -452,9 +459,30 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
} }
abstract void pollIn(int res); /**
* Called once POLLIN event is ready to be processed
*/
final void pollIn(int res) {
pollInPending = false;
if (!readScheduled) {
scheduleRead();
}
}
protected final void scheduleRead() {
readScheduled = true;
scheduleRead0();
}
protected abstract void scheduleRead0();
/**
* Called once POLLOUT event is ready to be processed
*/
final void pollOut(int res) { final void pollOut(int res) {
pollOutPending = false;
// pending connect // pending connect
if (connectPromise != null) { if (connectPromise != null) {
// Note this method is invoked by the event loop only if the connection attempt was // Note this method is invoked by the event loop only if the connection attempt was
@ -484,12 +512,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
} }
} else if (!getSocket().isOutputShutdown()) { } else if (!getSocket().isOutputShutdown()) {
doWrite(unsafe().outboundBuffer()); // Try writing again
super.flush0();
} }
} }
final void writeComplete(int res) { final void writeComplete(int res) {
writeScheduled = false; writeScheduled = false;
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (iovecMemoryAddress != -1) { if (iovecMemoryAddress != -1) {
((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress); ((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress);

View File

@ -54,20 +54,14 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
abstract Channel newChildChannel(int fd) throws Exception; abstract Channel newChildChannel(int fd) throws Exception;
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
private final byte[] acceptedAddress = new byte[26]; @Override
protected void scheduleRead0() {
private void acceptSocket() {
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
//Todo get network addresses //Todo get network addresses
submissionQueue.addAccept(fd().intValue()); submissionQueue.addAccept(fd().intValue());
submissionQueue.submit(); submissionQueue.submit();
} }
@Override
void pollIn(int res) {
acceptSocket();
}
// TODO: Respect MAX_MESSAGES_READ // TODO: Respect MAX_MESSAGES_READ
protected void readComplete0(int res) { protected void readComplete0(int res) {
final IOUringRecvByteAllocatorHandle allocHandle = final IOUringRecvByteAllocatorHandle allocHandle =
@ -89,7 +83,9 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
} }
acceptSocket(); if (config().isAutoRead()) {
scheduleRead();
}
} else { } else {
allocHandle.readComplete(); allocHandle.readComplete();
// Check if we did fail because there was nothing to accept atm. // Check if we did fail because there was nothing to accept atm.

View File

@ -198,11 +198,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
private ByteBuf readBuffer; private ByteBuf readBuffer;
@Override @Override
void pollIn(int res) { protected void scheduleRead0() {
readFromSocket();
}
private void readFromSocket() {
final ChannelConfig config = config(); final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
@ -215,6 +211,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
assert readBuffer == null; assert readBuffer == null;
readBuffer = byteBuf; readBuffer = byteBuf;
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity()); byteBuf.writerIndex(), byteBuf.capacity());
submissionQueue.submit(); submissionQueue.submit();
@ -232,8 +229,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
this.readBuffer = null; this.readBuffer = null;
assert byteBuf != null; assert byteBuf != null;
boolean writable = true;
try { try {
if (res < 0) { if (res < 0) {
// If res is negative we should pass it to ioResult(...) which will either throw // If res is negative we should pass it to ioResult(...) which will either throw
@ -261,21 +256,20 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
writable = byteBuf.isWritable(); boolean writable = byteBuf.isWritable();
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;
} catch (Throwable t) { if (!writable && config().isAutoRead()) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
if (!close) {
if (!writable) {
// Let's schedule another read. // Let's schedule another read.
readFromSocket(); scheduleRead();
} else { } else {
// We did not fill the whole ByteBuf so we should break the "read loop" and try again later. // We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
} }
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} }
} }
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,

View File

@ -177,11 +177,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
switch (op) { switch (op) {
case IOUring.OP_ACCEPT: case IOUring.OP_ACCEPT:
//Todo error handle the res
if (res == ECANCELED) {
logger.trace("POLL_LINK canceled");
break;
}
// Fall-through // Fall-through
case IOUring.OP_READ: case IOUring.OP_READ:

View File

@ -122,8 +122,8 @@ final class IOUringSubmissionQueue {
//user_data should be same as POLL_LINK fd //user_data should be same as POLL_LINK fd
if (op == IOUring.OP_POLL_REMOVE) { if (op == IOUring.OP_POLL_REMOVE) {
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1); PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
long pollLinkuData = convertToUserData((byte) IOUring.IO_POLL, fd, IOUring.POLLMASK_IN); long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, pollLinkuData); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
} }
long uData = convertToUserData(op, fd, pollMask); long uData = convertToUserData(op, fd, pollMask);
@ -241,7 +241,7 @@ final class IOUringSubmissionQueue {
} }
//fill the address which is associated with server poll link user_data //fill the address which is associated with server poll link user_data
public boolean addPollRemove(int fd) { public boolean addPollRemove(int fd, int pollMask) {
long sqe = 0; long sqe = 0;
boolean submitted = false; boolean submitted = false;
while (sqe == 0) { while (sqe == 0) {
@ -252,7 +252,7 @@ final class IOUringSubmissionQueue {
submitted = true; submitted = true;
} }
} }
setData(sqe, (byte) IOUring.OP_POLL_REMOVE, 0, fd, 0, 0, 0); setData(sqe, (byte) IOUring.OP_POLL_REMOVE, pollMask, fd, 0, 0, 0);
return submitted; return submitted;
} }
@ -323,7 +323,6 @@ final class IOUringSubmissionQueue {
public void submit() { public void submit() {
int submitted = flushSqe(); int submitted = flushSqe();
logger.trace("Submitted: {}", submitted); logger.trace("Submitted: {}", submitted);
System.out.println("submitted: " + submitted);
if (submitted > 0) { if (submitted > 0) {
int ret = Native.ioUringEnter(ringFd, submitted, 0, 0); int ret = Native.ioUringEnter(ringFd, submitted, 0, 0);
if (ret < 0) { if (ret < 0) {

View File

@ -235,7 +235,7 @@ public class NativeTest {
Thread.sleep(10); Thread.sleep(10);
submissionQueue.addPollRemove(eventFd.intValue()); submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN);
submissionQueue.submit(); submissionQueue.submit();
Thread waitingCqe = new Thread() { Thread waitingCqe = new Thread() {