Correctly respect RecvByteBufAllocator.Handle when reading

Motivation:

We need to respect RecvByteBufAllocator.Handle.continueReading() so settings like MAX_MESSAGES_PER_READ are respected. This also ensures that AUTO_READ is correctly working in all cases

Modifications:

- Correctly respect continueReading();
- Fix IOUringRecvByteAllocatorHandle
- Cleanup

Result:

Correctly handling reading
This commit is contained in:
Norman Maurer 2020-09-01 10:49:09 +02:00
parent 57884e2e05
commit 663c44cd45
4 changed files with 37 additions and 41 deletions

View File

@ -56,45 +56,53 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
@Override @Override
protected void scheduleRead0() { protected void scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config());
allocHandle.attemptedBytesRead(1);
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
//Todo get network addresses //Todo get network addresses
submissionQueue.addAccept(fd().intValue()); submissionQueue.addAccept(fd().intValue());
submissionQueue.submit(); submissionQueue.submit();
} }
// TODO: Respect MAX_MESSAGES_READ
protected void readComplete0(int res) { protected void readComplete0(int res) {
final IOUringRecvByteAllocatorHandle allocHandle = final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) unsafe() (IOUringRecvByteAllocatorHandle) unsafe()
.recvBufAllocHandle(); .recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
if (res >= 0) { allocHandle.lastBytesRead(res);
allocHandle.incMessagesRead(1);
try {
Channel channel = newChildChannel(res);
// Register accepted channel for POLLRDHUP
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRdHup(res);
submissionQueue.submit();
pipeline.fireChannelRead(channel); if (res >= 0) {
} catch (Throwable cause) { allocHandle.incMessagesRead(1);
try {
Channel channel = newChildChannel(res);
// Register accepted channel for POLLRDHUP
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRdHup(res);
submissionQueue.submit();
pipeline.fireChannelRead(channel);
if (allocHandle.continueReading()) {
scheduleRead();
} else {
allocHandle.readComplete(); allocHandle.readComplete();
pipeline.fireExceptionCaught(cause);
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
} }
if (config().isAutoRead()) { } catch (Throwable cause) {
scheduleRead();
}
} else {
allocHandle.readComplete(); allocHandle.readComplete();
// Check if we did fail because there was nothing to accept atm.
if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
// Something bad happened. Convert to an exception.
pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
}
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
} }
} else {
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// Check if we did fail because there was nothing to accept atm.
if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) {
// Something bad happened. Convert to an exception.
pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res));
}
}
} }
@Override @Override

View File

@ -202,12 +202,12 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
final ChannelConfig config = config(); final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator(); final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config); allocHandle.reset(config);
ByteBuf byteBuf = allocHandle.allocate(allocator); ByteBuf byteBuf = allocHandle.allocate(allocator);
IOUringSubmissionQueue submissionQueue = submissionQueue(); IOUringSubmissionQueue submissionQueue = submissionQueue();
unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); allocHandle.attemptedBytesRead(byteBuf.writableBytes());
assert readBuffer == null; assert readBuffer == null;
readBuffer = byteBuf; readBuffer = byteBuf;
@ -217,13 +217,11 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
submissionQueue.submit(); submissionQueue.submit();
} }
// TODO: Respect MAX_MESSAGE_READ. @Override
protected void readComplete0(int res) { protected void readComplete0(int res) {
boolean close = false; boolean close = false;
final IOUringRecvByteAllocatorHandle allocHandle = final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
(IOUringRecvByteAllocatorHandle) unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = pipeline(); final ChannelPipeline pipeline = pipeline();
ByteBuf byteBuf = this.readBuffer; ByteBuf byteBuf = this.readBuffer;
this.readBuffer = null; this.readBuffer = null;
@ -256,10 +254,9 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
} }
allocHandle.incMessagesRead(1); allocHandle.incMessagesRead(1);
boolean writable = byteBuf.isWritable();
pipeline.fireChannelRead(byteBuf); pipeline.fireChannelRead(byteBuf);
byteBuf = null; byteBuf = null;
if (!writable && config().isAutoRead()) { if (allocHandle.continueReading()) {
// Let's schedule another read. // Let's schedule another read.
scheduleRead(); scheduleRead();
} else { } else {
@ -269,7 +266,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle); handleReadException(pipeline, byteBuf, t, close, allocHandle);
} }
} }
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,

View File

@ -34,11 +34,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
//Todo set config ring buffer size //Todo set config ring buffer size
private final int ringSize = 32; private static final int ringSize = 32;
private final int ENOENT = -2; private static final int ENOENT = -2;
//just temporary -> Todo use ErrorsStaticallyReferencedJniMethods like in Epoll
private static final int SOCKET_ERROR_EPIPE = -32;
private static final long ETIME = -62; private static final long ETIME = -62;
static final long ECANCELED = -125; static final long ECANCELED = -125;
@ -263,7 +260,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
try { try {
eventfd.close(); eventfd.close();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); logger.warn("Failed to close the event fd.", e);
} }
ringBuffer.close(); ringBuffer.close();
iovecArrayPool.release(); iovecArrayPool.release();

View File

@ -41,9 +41,4 @@ final class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.Delegati
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return ((RecvByteBufAllocator.ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier); return ((RecvByteBufAllocator.ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
} }
@Override
public boolean continueReading() {
return false;
}
} }