Add abstract stream channel

Motivation:

to shutdown child channels we should create new abstact client class instead of using AbstractIOUringChannel

Modifications:

-Added new child channel abstract class
-Add shutdown methods to close a channel when the connection is lost

Result:

the channels can be closed when the connection is lost
This commit is contained in:
Josef Grieb 2020-08-24 09:45:30 +02:00 committed by josef
parent 96e0e5cc91
commit 0160284301
3 changed files with 298 additions and 78 deletions

View File

@ -44,6 +44,16 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
final LinuxSocket socket;
protected volatile boolean active;
boolean uringInReadyPending;
boolean inputClosedSeenErrorOnRead;
//can only submit one write operation at a time
private boolean writeable = true;
/**
* The future of the current connection attempt. If not null, subsequent connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private volatile SocketAddress local;
private volatile SocketAddress remote;
@ -158,8 +168,51 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override
protected void doClose() throws Exception {
System.out.println("DoClose Socket: " + socket.intValue());
socket.close();
if (parent() == null) {
logger.info("ServerSocket Close: {}", this.socket.intValue());
}
active = false;
// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
// socket which has not even been connected yet. This has been observed to block during unit tests.
//inputClosedSeenErrorOnRead = true;
try {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(new ClosedChannelException());
connectPromise = null;
}
ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
if (isRegistered()) {
// Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
// if SO_LINGER is used.
//
// See https://github.com/netty/netty/issues/7159
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
doDeregister();
} else {
loop.execute(new Runnable() {
@Override
public void run() {
try {
doDeregister();
} catch (Throwable cause) {
pipeline().fireExceptionCaught(cause);
}
}
});
}
}
} finally {
socket.close();
}
}
//deregister
@ -210,17 +263,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
@Override
public void run() {
IOUringEventLoop eventLoop = (IOUringEventLoop) eventLoop();
long eventId = eventLoop.incrementEventIdCounter();
Event event = new Event();
event.setOp(EventType.POLL_LINK);
event.setId(eventId);
event.setAbstractIOUringChannel(AbstractIOUringChannel.this);
eventLoop.getRingBuffer().getIoUringSubmissionQueue()
.addPoll(eventId, socket.intValue(), event.getOp());
((IOUringEventLoop) eventLoop()).addNewEvent(event);
uringEventExecution(); //flush and submit SQE
}
};
@ -288,6 +330,10 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
}
}
public void setUringInReadyPending(boolean uringInReadyPending) {
this.uringInReadyPending = uringInReadyPending;
}
@Override
public abstract DefaultChannelConfig config();
@ -304,4 +350,47 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
public Socket getSocket() {
return socket;
}
void shutdownInput(boolean rdHup) {
logger.info("shutdownInput Fd: {}", this.socket.intValue());
if (!socket.isInputShutdown()) {
if (isAllowHalfClosure(config())) {
try {
socket.shutdown(true, false);
} catch (IOException ignored) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
return;
} catch (NotYetConnectedException ignore) {
// We attempted to shutdown and failed, which means the input has already effectively been
// shutdown.
}
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else if (!rdHup) {
inputClosedSeenErrorOnRead = true;
pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
}
}
private static boolean isAllowHalfClosure(ChannelConfig config) {
return config instanceof SocketChannelConfig &&
((SocketChannelConfig) config).isAllowHalfClosure();
}
private void fireEventAndClose(Object evt) {
pipeline().fireUserEventTriggered(evt);
close(voidPromise());
}
final boolean shouldBreakIoUringInReady(ChannelConfig config) {
return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
}
public void setWriteable(boolean writeable) {
this.writeable = writeable;
}
}

View File

@ -0,0 +1,191 @@
package io.netty.channel.uring;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.DuplexChannel;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.concurrent.Executor;
abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel implements DuplexChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIOUringStreamChannel.class);
AbstractIOUringStreamChannel(Channel parent, LinuxSocket socket) {
super(parent, socket);
}
protected AbstractIOUringStreamChannel(Channel parent, LinuxSocket socket, boolean active) {
super(parent, socket, active);
}
@Override
public ChannelFuture shutdown() {
System.out.println("AbstractStreamChannel shutdown");
return shutdown(newPromise());
}
@Override
public ChannelFuture shutdown(final ChannelPromise promise) {
ChannelFuture shutdownOutputFuture = shutdownOutput();
if (shutdownOutputFuture.isDone()) {
shutdownOutputDone(shutdownOutputFuture, promise);
} else {
shutdownOutputFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
shutdownOutputDone(shutdownOutputFuture, promise);
}
});
}
return promise;
}
@UnstableApi
@Override
protected final void doShutdownOutput() throws Exception {
socket.shutdown(false, true);
}
private void shutdownInput0(final ChannelPromise promise) {
try {
socket.shutdown(true, false);
promise.setSuccess();
} catch (Throwable cause) {
promise.setFailure(cause);
}
}
@Override
public boolean isOutputShutdown() {
return socket.isOutputShutdown();
}
@Override
public boolean isInputShutdown() {
return socket.isInputShutdown();
}
@Override
public boolean isShutdown() {
return socket.isShutdown();
}
@Override
public ChannelFuture shutdownOutput() {
return shutdownOutput(newPromise());
}
@Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
}
});
}
return promise;
}
@Override
public ChannelFuture shutdownInput() {
return shutdownInput(newPromise());
}
@Override
public ChannelFuture shutdownInput(final ChannelPromise promise) {
Executor closeExecutor = ((IOUringStreamUnsafe) unsafe()).prepareToClose();
if (closeExecutor != null) {
closeExecutor.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownInput0(promise);
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownInput0(promise);
}
});
}
}
return promise;
}
private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
ChannelFuture shutdownInputFuture = shutdownInput();
if (shutdownInputFuture.isDone()) {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
} else {
shutdownInputFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
}
});
}
}
private static void shutdownDone(ChannelFuture shutdownOutputFuture,
ChannelFuture shutdownInputFuture,
ChannelPromise promise) {
System.out.println("AbstractStreamChannel ShutdownDone");
Throwable shutdownOutputCause = shutdownOutputFuture.cause();
Throwable shutdownInputCause = shutdownInputFuture.cause();
if (shutdownOutputCause != null) {
if (shutdownInputCause != null) {
logger.info("Exception suppressed because a previous exception occurred.",
shutdownInputCause);
}
promise.setFailure(shutdownOutputCause);
} else if (shutdownInputCause != null) {
promise.setFailure(shutdownInputCause);
} else {
promise.setSuccess();
}
}
class IOUringStreamUnsafe extends AbstractUringUnsafe {
// Overridden here just to be able to access this method from AbstractEpollStreamChannel
@Override
protected Executor prepareToClose() {
return super.prepareToClose();
}
@Override
public void uringEventExecution() {
final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = allocHandle.allocate(allocator);
doReadBytes(byteBuf);
}
}
}

View File

@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.RecvByteBufAllocator;
@ -29,11 +30,13 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket;
import io.netty.channel.uring.AbstractIOUringStreamChannel.IOUringStreamUnsafe;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
public final class IOUringSocketChannel extends AbstractIOUringChannel implements SocketChannel {
public final class IOUringSocketChannel extends AbstractIOUringStreamChannel implements SocketChannel {
private final IOUringSocketChannelConfig config;
public IOUringSocketChannel() {
@ -53,20 +56,7 @@ public final class IOUringSocketChannel extends AbstractIOUringChannel implement
@Override
protected AbstractUringUnsafe newUnsafe() {
return new AbstractUringUnsafe() {
@Override
public void uringEventExecution() {
final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = allocHandle.allocate(allocator);
doReadBytes(byteBuf);
}
};
return new IOUringStreamUnsafe();
}
@Override
@ -74,56 +64,6 @@ public final class IOUringSocketChannel extends AbstractIOUringChannel implement
return config;
}
@Override
public boolean isInputShutdown() {
return false;
}
@Override
public ChannelFuture shutdownInput() {
return null;
}
@Override
public ChannelFuture shutdownInput(ChannelPromise promise) {
return null;
}
//Todo
@Override
public boolean isOutputShutdown() {
return false;
}
//Todo
@Override
public ChannelFuture shutdownOutput() {
return null;
}
//Todo
@Override
public ChannelFuture shutdownOutput(ChannelPromise promise) {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
//Todo
@Override
public ChannelFuture shutdown() {
return null;
}
//Todo
@Override
public ChannelFuture shutdown(ChannelPromise promise) {
return null;
}
@Override
public FileDescriptor fd() {
return super.fd();