Correctly stop reading when AUTO_READ is set to off and also ensure we cancel the right poll operations
This commit is contained in:
parent
186b9eb6ab
commit
28db67c42b
@ -60,14 +60,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
|
||||
final LinuxSocket socket;
|
||||
protected volatile boolean active;
|
||||
private boolean pollInScheduled = false;
|
||||
|
||||
private boolean pollInPending = false;
|
||||
private boolean pollOutPending = false;
|
||||
private boolean writeScheduled = false;
|
||||
private boolean readScheduled = false;
|
||||
boolean inputClosedSeenErrorOnRead;
|
||||
|
||||
static final int SOCK_ADDR_LEN = 128;
|
||||
|
||||
//can only submit one write operation at a time
|
||||
private boolean writeScheduled = false;
|
||||
/**
|
||||
* The future of the current connection attempt. If not null, subsequent connection attempts will fail.
|
||||
*/
|
||||
@ -211,7 +211,15 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
active = false;
|
||||
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addPollRemove(socket.intValue());
|
||||
if (pollInPending) {
|
||||
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN);
|
||||
pollInPending = false;
|
||||
}
|
||||
if (pollOutPending) {
|
||||
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT);
|
||||
pollOutPending = false;
|
||||
}
|
||||
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP);
|
||||
submissionQueue.submit();
|
||||
|
||||
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
|
||||
@ -258,15 +266,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
@Override
|
||||
protected void doBeginRead() {
|
||||
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
|
||||
if (!pollInScheduled) {
|
||||
if (!pollInPending) {
|
||||
unsafe.schedulePollIn();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doWrite(ChannelOutboundBuffer in) {
|
||||
logger.trace("IOUring doWrite message size: {}", in.size());
|
||||
|
||||
if (writeScheduled) {
|
||||
return;
|
||||
}
|
||||
@ -280,6 +286,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
|
||||
private void doWriteMultiple(ChannelOutboundBuffer in) {
|
||||
|
||||
final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool();
|
||||
|
||||
iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress();
|
||||
@ -313,6 +320,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
|
||||
//POLLOUT
|
||||
private void addPollOut() {
|
||||
assert !pollOutPending;
|
||||
pollOutPending = true;
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addPollOut(socket.intValue());
|
||||
submissionQueue.submit();
|
||||
@ -326,7 +335,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
// Flush immediately only when there's no pending flush.
|
||||
// If there's a pending flush operation, event loop will call forceFlush() later,
|
||||
// and thus there's no need to call it now.
|
||||
if (!writeScheduled) {
|
||||
if (!pollOutPending) {
|
||||
super.flush0();
|
||||
}
|
||||
}
|
||||
@ -418,18 +427,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
|
||||
void schedulePollIn() {
|
||||
assert !pollInScheduled;
|
||||
assert !pollInPending;
|
||||
if (!isActive() || shouldBreakIoUringInReady(config())) {
|
||||
return;
|
||||
}
|
||||
pollInScheduled = true;
|
||||
pollInPending = true;
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
submissionQueue.addPollIn(socket.intValue());
|
||||
submissionQueue.submit();
|
||||
}
|
||||
|
||||
final void readComplete(int res) {
|
||||
pollInScheduled = false;
|
||||
readScheduled = false;
|
||||
|
||||
readComplete0(res);
|
||||
}
|
||||
|
||||
@ -440,11 +450,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
*/
|
||||
final void pollRdHup(int res) {
|
||||
if (isActive()) {
|
||||
if (!pollInScheduled) {
|
||||
// If it is still active, we need to call epollInReady as otherwise we may miss to
|
||||
// read pending data from the underlying file descriptor.
|
||||
// See https://github.com/netty/netty/issues/3709
|
||||
pollIn(res);
|
||||
if (!readScheduled) {
|
||||
scheduleRead();
|
||||
}
|
||||
} else {
|
||||
// Just to be safe make sure the input marked as closed.
|
||||
@ -452,9 +459,30 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
|
||||
abstract void pollIn(int res);
|
||||
/**
|
||||
* Called once POLLIN event is ready to be processed
|
||||
*/
|
||||
final void pollIn(int res) {
|
||||
pollInPending = false;
|
||||
|
||||
if (!readScheduled) {
|
||||
scheduleRead();
|
||||
}
|
||||
}
|
||||
|
||||
protected final void scheduleRead() {
|
||||
readScheduled = true;
|
||||
scheduleRead0();
|
||||
}
|
||||
|
||||
protected abstract void scheduleRead0();
|
||||
|
||||
/**
|
||||
* Called once POLLOUT event is ready to be processed
|
||||
*/
|
||||
final void pollOut(int res) {
|
||||
pollOutPending = false;
|
||||
|
||||
// pending connect
|
||||
if (connectPromise != null) {
|
||||
// Note this method is invoked by the event loop only if the connection attempt was
|
||||
@ -484,12 +512,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
||||
}
|
||||
}
|
||||
} else if (!getSocket().isOutputShutdown()) {
|
||||
doWrite(unsafe().outboundBuffer());
|
||||
// Try writing again
|
||||
super.flush0();
|
||||
}
|
||||
}
|
||||
|
||||
final void writeComplete(int res) {
|
||||
writeScheduled = false;
|
||||
|
||||
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
|
||||
if (iovecMemoryAddress != -1) {
|
||||
((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress);
|
||||
|
@ -54,20 +54,14 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
|
||||
abstract Channel newChildChannel(int fd) throws Exception;
|
||||
|
||||
final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
|
||||
private final byte[] acceptedAddress = new byte[26];
|
||||
|
||||
private void acceptSocket() {
|
||||
@Override
|
||||
protected void scheduleRead0() {
|
||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||
//Todo get network addresses
|
||||
submissionQueue.addAccept(fd().intValue());
|
||||
submissionQueue.submit();
|
||||
}
|
||||
|
||||
@Override
|
||||
void pollIn(int res) {
|
||||
acceptSocket();
|
||||
}
|
||||
|
||||
// TODO: Respect MAX_MESSAGES_READ
|
||||
protected void readComplete0(int res) {
|
||||
final IOUringRecvByteAllocatorHandle allocHandle =
|
||||
@ -89,7 +83,9 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
|
||||
pipeline.fireExceptionCaught(cause);
|
||||
pipeline.fireChannelReadComplete();
|
||||
}
|
||||
acceptSocket();
|
||||
if (config().isAutoRead()) {
|
||||
scheduleRead();
|
||||
}
|
||||
} else {
|
||||
allocHandle.readComplete();
|
||||
// Check if we did fail because there was nothing to accept atm.
|
||||
|
@ -198,11 +198,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
private ByteBuf readBuffer;
|
||||
|
||||
@Override
|
||||
void pollIn(int res) {
|
||||
readFromSocket();
|
||||
}
|
||||
|
||||
private void readFromSocket() {
|
||||
protected void scheduleRead0() {
|
||||
final ChannelConfig config = config();
|
||||
|
||||
final ByteBufAllocator allocator = config.getAllocator();
|
||||
@ -215,6 +211,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
|
||||
assert readBuffer == null;
|
||||
readBuffer = byteBuf;
|
||||
|
||||
submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
|
||||
byteBuf.writerIndex(), byteBuf.capacity());
|
||||
submissionQueue.submit();
|
||||
@ -232,8 +229,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
this.readBuffer = null;
|
||||
assert byteBuf != null;
|
||||
|
||||
boolean writable = true;
|
||||
|
||||
try {
|
||||
if (res < 0) {
|
||||
// If res is negative we should pass it to ioResult(...) which will either throw
|
||||
@ -261,21 +256,20 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
||||
}
|
||||
|
||||
allocHandle.incMessagesRead(1);
|
||||
writable = byteBuf.isWritable();
|
||||
boolean writable = byteBuf.isWritable();
|
||||
pipeline.fireChannelRead(byteBuf);
|
||||
byteBuf = null;
|
||||
} catch (Throwable t) {
|
||||
handleReadException(pipeline, byteBuf, t, close, allocHandle);
|
||||
}
|
||||
if (!close) {
|
||||
if (!writable) {
|
||||
if (!writable && config().isAutoRead()) {
|
||||
// Let's schedule another read.
|
||||
readFromSocket();
|
||||
scheduleRead();
|
||||
} else {
|
||||
// We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
|
||||
pipeline.fireChannelReadComplete();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
handleReadException(pipeline, byteBuf, t, close, allocHandle);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
|
||||
|
@ -177,11 +177,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
switch (op) {
|
||||
case IOUring.OP_ACCEPT:
|
||||
//Todo error handle the res
|
||||
if (res == ECANCELED) {
|
||||
logger.trace("POLL_LINK canceled");
|
||||
break;
|
||||
}
|
||||
// Fall-through
|
||||
|
||||
case IOUring.OP_READ:
|
||||
|
@ -122,8 +122,8 @@ final class IOUringSubmissionQueue {
|
||||
//user_data should be same as POLL_LINK fd
|
||||
if (op == IOUring.OP_POLL_REMOVE) {
|
||||
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
|
||||
long pollLinkuData = convertToUserData((byte) IOUring.IO_POLL, fd, IOUring.POLLMASK_IN);
|
||||
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, pollLinkuData);
|
||||
long uData = convertToUserData(op, fd, pollMask);
|
||||
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
|
||||
}
|
||||
|
||||
long uData = convertToUserData(op, fd, pollMask);
|
||||
@ -241,7 +241,7 @@ final class IOUringSubmissionQueue {
|
||||
}
|
||||
|
||||
//fill the address which is associated with server poll link user_data
|
||||
public boolean addPollRemove(int fd) {
|
||||
public boolean addPollRemove(int fd, int pollMask) {
|
||||
long sqe = 0;
|
||||
boolean submitted = false;
|
||||
while (sqe == 0) {
|
||||
@ -252,7 +252,7 @@ final class IOUringSubmissionQueue {
|
||||
submitted = true;
|
||||
}
|
||||
}
|
||||
setData(sqe, (byte) IOUring.OP_POLL_REMOVE, 0, fd, 0, 0, 0);
|
||||
setData(sqe, (byte) IOUring.OP_POLL_REMOVE, pollMask, fd, 0, 0, 0);
|
||||
|
||||
return submitted;
|
||||
}
|
||||
@ -323,7 +323,6 @@ final class IOUringSubmissionQueue {
|
||||
public void submit() {
|
||||
int submitted = flushSqe();
|
||||
logger.trace("Submitted: {}", submitted);
|
||||
System.out.println("submitted: " + submitted);
|
||||
if (submitted > 0) {
|
||||
int ret = Native.ioUringEnter(ringFd, submitted, 0, 0);
|
||||
if (ret < 0) {
|
||||
|
@ -235,7 +235,7 @@ public class NativeTest {
|
||||
|
||||
Thread.sleep(10);
|
||||
|
||||
submissionQueue.addPollRemove(eventFd.intValue());
|
||||
submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN);
|
||||
submissionQueue.submit();
|
||||
|
||||
Thread waitingCqe = new Thread() {
|
||||
|
Loading…
Reference in New Issue
Block a user