Add read exception handling to shutdown channels

Motivation:

-at the moment we dont shutdown when we get a read error message
-missing autoread support

Modifications:

-even if autoread is disable, should do check if the read event is already submitted
-added new Handle exception method to shutdown the channels

Result:

EL read event can handle read errors
This commit is contained in:
Josef Grieb 2020-08-24 10:46:47 +02:00 committed by josef
parent b10b4ca6e7
commit d9b3f293a5
6 changed files with 133 additions and 51 deletions

View File

@ -21,21 +21,31 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannel;
import io.netty.channel.unix.UnixChannelUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.ScheduledFuture;
import static io.netty.util.internal.ObjectUtil.*;
@ -63,6 +73,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
super(parent);
this.socket = checkNotNull(socket, "fd");
this.active = true;
this.uringInReadyPending = false;
if (active) {
// Directly cache the remote and local addresses
@ -70,6 +81,12 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
this.local = socket.localAddress();
this.remote = socket.remoteAddress();
}
if (parent != null) {
logger.info("Create Channel Socket: {}", socket.intValue());
} else {
logger.info("Create Server Socket: {}", socket.intValue());
}
}
protected AbstractIOUringChannel(final Channel parent, LinuxSocket socket, boolean active) {
@ -81,6 +98,12 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
this.local = socket.localAddress();
this.remote = socket.remoteAddress();
}
if (parent != null) {
logger.info("Create Channel Socket: {}", socket.intValue());
} else {
logger.info("Create Server Socket: {}", socket.intValue());
}
}
public boolean isOpen() {
@ -220,9 +243,9 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// Channel/ChannelHandlerContext.read() was called
@Override
protected void doBeginRead() {
logger.info("Begin Read");
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
if (!uringInReadyPending) {
uringInReadyPending = true;
unsafe.executeUringReadOperator();
}
}
@ -291,6 +314,12 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
return new IOUringRecvByteAllocatorHandle(handle);
}
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
@Override
public IOUringRecvByteAllocatorHandle recvBufAllocHandle() {
if (allocHandle == null) {
@ -299,16 +328,11 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
return allocHandle;
}
@Override
public void connect(final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException());
}
final void executeUringReadOperator() {
if (!isActive()) {
if (uringInReadyPending || !isActive() || shouldBreakIoUringInReady(config())) {
return;
}
uringInReadyPending = true;
eventLoop().execute(readRunnable);
}

View File

@ -59,6 +59,7 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
}
private void addPoll(IOUringEventLoop ioUringEventLoop) {
long eventId = ioUringEventLoop.incrementEventIdCounter();
Event event = new Event();
event.setOp(EventType.POLL_LINK);

View File

@ -52,7 +52,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
private static long ETIME = -62;
// nextWakeupNanos is:
// AWAKE when EL is awake
@ -77,24 +76,25 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
addNewEvent(event);
ringBuffer.getIoUringSubmissionQueue().addPoll(eventId, eventfd.intValue(), event.getOp());
ringBuffer.getIoUringSubmissionQueue().submit();
logger.info("New EventLoop: {}", this.toString());
}
public long incrementEventIdCounter() {
long eventId = eventIdCounter;
System.out.println(" incrementEventIdCounter EventId: " + eventId);
logger.info("incrementEventIdCounter EventId: {}", eventId);
eventIdCounter++;
return eventId;
}
public void add(AbstractIOUringChannel ch) {
System.out.println("Add Channel: " + ch.socket.intValue());
logger.info("Add Channel: {} ", ch.socket.intValue());
int fd = ch.socket.intValue();
channels.put(fd, ch);
}
public void remove(AbstractIOUringChannel ch) {
System.out.println("Remove Channel: " + ch.socket.intValue());
logger.info("Remove Channel: {}", ch.socket.intValue());
int fd = ch.socket.intValue();
AbstractIOUringChannel old = channels.remove(fd);
@ -108,7 +108,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
}
private void closeAll() {
System.out.println("CloseAll IOUringEvenloop");
logger.info("CloseAll IOUringEvenloop");
// Using the intermediate collection to prevent ConcurrentModificationException.
// In the `close()` method, the channel is deleted from `channels` map.
AbstractIOUringChannel[] localChannels = channels.values().toArray(new AbstractIOUringChannel[0]);
@ -127,12 +127,12 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
for (;;) {
logger.info("Run IOUringEventLoop {}", this.toString());
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
long ioStartTime = 0;
if (!hasTasks()) {
try {
@ -146,11 +146,12 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
submissionQueue.addTimeout(curDeadlineNanos, eventId);
}
final IOUringCqe ioUringCqe = completionQueue.ioUringWaitCqe();
logger.info("ioUringWaitCqe {}", this.toString());
if (ioUringCqe != null) {
final Event event = events.get(ioUringCqe.getEventId());
System.out.println("Completion EventId: " + ioUringCqe.getEventId());
ioStartTime = System.nanoTime();
if (event != null) {
logger.info("EventType Incoming: " + event.getOp().name());
processEvent(ioUringCqe.getRes(), event);
}
}
@ -162,7 +163,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
}
}
//Todo ioRatio?
if (hasTasks()) {
runAllTasks();
}
@ -175,7 +175,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
}
}
} catch (Throwable t) {
System.out.println("Exception error " + t);
logger.info("Exception error: {}", t);
}
}
}
@ -214,37 +214,53 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
//Todo refactoring method name
event.getAbstractIOUringChannel().executeReadEvent();
break;
break;
case READ:
System.out.println("EventLoop Read Res: " + res);
System.out.println("EventLoop Fd: " + event.getAbstractIOUringChannel().getSocket().intValue());
ByteBuf byteBuf = event.getReadBuffer();
boolean close = false;
ByteBuf byteBuf = null;
int localReadAmount = res;
if (localReadAmount > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
}
final IOUringRecvByteAllocatorHandle allocHandle =
(IOUringRecvByteAllocatorHandle) event.getAbstractIOUringChannel().unsafe()
.recvBufAllocHandle();
final ChannelPipeline pipeline = event.getAbstractIOUringChannel().pipeline();
try {
logger.info("EventLoop Read Res: {}", res);
logger.info("EventLoop Fd: {}", event.getAbstractIOUringChannel().getSocket().intValue());
event.getAbstractIOUringChannel().setUringInReadyPending(false);
byteBuf = event.getReadBuffer();
if (localReadAmount > 0) {
byteBuf.writerIndex(byteBuf.writerIndex() + localReadAmount);
}
allocHandle.lastBytesRead(localReadAmount);
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
allocHandle.lastBytesRead(localReadAmount);
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read, release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
event.getAbstractIOUringChannel().shutdownInput(false);
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
break;
}
allocHandle.incMessagesRead(1);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
break;
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
allocHandle.incMessagesRead(1);
//readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
event.getAbstractIOUringChannel().executeReadEvent();
break;
logger.info("READ autoRead {}", event.getAbstractIOUringChannel().config().isAutoRead());
if (event.getAbstractIOUringChannel().config().isAutoRead()) {
event.getAbstractIOUringChannel().executeReadEvent();
}
} catch (Throwable t) {
handleReadException(event.getAbstractIOUringChannel(), pipeline, byteBuf, t, close, allocHandle);
}
break;
case WRITE:
//localFlushAmount -> res
logger.info("EventLoop Write Res: {}", res);
@ -273,7 +289,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
prevDeadlineNanos = NONE;
}
break;
break;
case POLL_EVENTFD:
pendingWakeup = false;
//Todo eventId is already used
@ -291,10 +307,23 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
event.getAbstractIOUringChannel().shutdownInput(true);
}
break;
case POLL_OUT:
logger.info("POLL_OUT Res: {}", res);
break;
}
this.events.remove(event.getId());
}
@Override
protected void cleanup() {
try {
eventfd.close();
} catch (IOException e) {
e.printStackTrace();
}
ringBuffer.close();
}
public RingBuffer getRingBuffer() {
return ringBuffer;
}
@ -307,6 +336,28 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
}
}
private void handleReadException(AbstractIOUringChannel channel, ChannelPipeline pipeline, ByteBuf byteBuf,
Throwable cause, boolean close,
IOUringRecvByteAllocatorHandle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
channel.shutdownInput(false);
} else {
if (channel.config().isAutoRead()) {
channel.executeReadEvent();
}
}
}
//to be notified when the filedesciptor is closed
private void pollRdHup(AbstractIOUringChannel channel) {
//all childChannels should poll POLLRDHUP

View File

@ -30,6 +30,7 @@ import java.util.Map;
public final class IOUringServerSocketChannel extends AbstractIOUringServerChannel implements ServerSocketChannel {
private final IOUringServerSocketChannelConfig config;
private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
public IOUringServerSocketChannel() {
super(Socket.newSocketStream().intValue());

View File

@ -17,6 +17,8 @@ package io.netty.channel.uring;
import io.netty.channel.unix.Buffer;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer;
@ -73,9 +75,9 @@ final class IOUringSubmissionQueue {
private final long timeoutMemoryAddress;
IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
long fFlagsAdress, long kDroppedAddress, long arrayAddress,
long submissionQueueArrayAddress, int ringSize,
long ringAddress, int ringFd) {
long fFlagsAdress, long kDroppedAddress, long arrayAddress,
long submissionQueueArrayAddress, int ringSize,
long ringAddress, int ringFd) {
this.kHeadAddress = kHeadAddress;
this.kTailAddress = kTailAddress;
this.kRingMaskAddress = kRingMaskAddress;
@ -122,7 +124,7 @@ final class IOUringSubmissionQueue {
if (type == EventType.POLL_LINK || type == EventType.POLL_OUT) {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) IOSQE_IO_LINK);
} else {
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);
}
//c union set Rw-Flags or accept_flags
@ -140,11 +142,11 @@ final class IOUringSubmissionQueue {
offsetIndex += 8;
}
System.out.println("OPField: " + PlatformDependent.getByte(sqe + SQE_OP_CODE_FIELD));
System.out.println("UserDataField: " + PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
System.out.println("BufferAddress: " + PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
System.out.println("Length: " + PlatformDependent.getInt(sqe + SQE_LEN_FIELD));
System.out.println("Offset: " + PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD));
logger.info("OPField: {}", type.name());
logger.info("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
logger.info("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
logger.info("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD));
logger.info("Offset: {}", PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD));
}
public boolean addTimeout(long nanoSeconds, long eventId) {

View File

@ -23,6 +23,8 @@ import io.netty.util.internal.NativeLibraryLoader;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThrowableUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.nio.channels.Selector;
@ -32,6 +34,7 @@ import static io.netty.channel.unix.Socket.isIPv6Preferred;
final class Native {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32);
static {
@ -117,7 +120,7 @@ final class Native {
} catch (UnsatisfiedLinkError e1) {
try {
NativeLibraryLoader.load(staticLibName, cl);
System.out.println("Failed to load io_uring");
logger.info("Failed to load io_uring");
} catch (UnsatisfiedLinkError e2) {
ThrowableUtil.addSuppressed(e1, e2);
throw e1;