From 33f75d374091946058fc334c3cdbcd0f0a59d9b3 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 6 Feb 2015 12:14:59 +0100 Subject: [PATCH] Not execute shutdownOutput(...) and close(...) in the EventLoop if SO_LINGER is used. Motivation: If SO_LINGER is used shutdownOutput() and close() syscalls will block until either all data was send or until the timeout exceed. This is a problem when we try to execute them on the EventLoop as this means the EventLoop may be blocked and so can not process any other I/O. Modifications: - Add AbstractUnsafe.closeExecutor() which returns null by default and use this Executor for close if not null. - Override the closeExecutor() in NioSocketChannel and EpollSocketChannel and return GlobalEventExecutor.INSTANCE if getSoLinger() > 0 - use closeExecutor() in shutdownInput(...) in NioSocketChannel and EpollSocketChannel Result: No more blocking of the EventLoop if SO_LINGER is used and shutdownOutput() or close() is called. --- .../epoll/AbstractEpollStreamChannel.java | 25 +++------ .../channel/epoll/EpollSocketChannel.java | 42 ++++++++++++++- .../io/netty/channel/AbstractChannel.java | 51 ++++++++++++++++-- .../channel/nio/AbstractNioByteChannel.java | 4 +- .../channel/socket/nio/NioSocketChannel.java | 53 +++++++++++++++---- 5 files changed, 139 insertions(+), 36 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 3fb0bbd25e..36d26c2ad6 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -351,25 +351,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { return outputShutdown || !isActive(); } - protected ChannelFuture shutdownOutput0(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - try { - Native.shutdown(fd().intValue(), false, true); - outputShutdown = true; - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); - } - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownOutput0(promise); - } - }); + protected void shutdownOutput0(final ChannelPromise promise) { + try { + Native.shutdown(fd().intValue(), false, true); + outputShutdown = true; + promise.setSuccess(); + } catch (Throwable cause) { + promise.setFailure(cause); } - return promise; } /** diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 36d1e37e4e..769fdf9f73 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -18,11 +18,15 @@ package io.netty.channel.epoll; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.internal.OneTimeTask; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.concurrent.Executor; /** * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for @@ -123,7 +127,28 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - return shutdownOutput0(promise); + Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).closeExecutor(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownOutput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } + } + return promise; } @Override @@ -131,6 +156,11 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme return (ServerSocketChannel) super.parent(); } + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollSocketChannelUnsafe(); + } + @Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { @@ -145,4 +175,14 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme } return false; } + + private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe { + @Override + protected Executor closeExecutor() { + if (config().getSoLinger() > 0) { + return GlobalEventExecutor.INSTANCE; + } + return null; + } + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 38e7030080..1cf952378a 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -33,6 +33,7 @@ import java.net.SocketAddress; import java.net.SocketException; import java.nio.channels.ClosedChannelException; import java.nio.channels.NotYetConnectedException; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; /** @@ -544,16 +545,48 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return; } + if (outboundBuffer == null) { + // This means close() was called before so we just register a listener and return + closeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + promise.setSuccess(); + } + }); + return; + } + if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); return; } - boolean wasActive = isActive(); - ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; - this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. + final boolean wasActive = isActive(); + final ChannelOutboundBuffer buffer = outboundBuffer; + outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. + Executor closeExecutor = closeExecutor(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + doClose0(promise); + // Call invokeLater so closeAndDeregister is executed in the EventLoop again! + invokeLater(new OneTimeTask() { + @Override + public void run() { + closeAndDeregister(buffer, wasActive); + } + }); + } + }); + } else { + doClose0(promise); + closeAndDeregister(buffer, wasActive); + } + } + private void doClose0(ChannelPromise promise) { try { doClose(); closeFuture.setClosed(); @@ -562,13 +595,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha closeFuture.setClosed(); safeSetFailure(promise, t); } + } + private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive) { // Fail all the queued messages try { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); } finally { - if (wasActive && !isActive()) { invokeLater(new OneTimeTask() { @Override @@ -802,6 +836,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return cause; } + + /** + * @return {@link Executor} to execute {@link #doClose()} or {@code null} if it should be done in the + * {@link EventLoop}. + + + */ + protected Executor closeExecutor() { + return null; + } } /** diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index ec91584793..8e58b51163 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -57,7 +57,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { return new NioByteUnsafe(); } - private final class NioByteUnsafe extends AbstractNioUnsafe { + protected class NioByteUnsafe extends AbstractNioUnsafe { private RecvByteBufAllocator.Handle allocHandle; private void closeOnRead(ChannelPipeline pipeline) { @@ -91,7 +91,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } @Override - public void read() { + public final void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 0c54e2f0fe..dce79344b1 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -28,6 +28,7 @@ import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannelConfig; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.OneTimeTask; import java.io.IOException; @@ -38,6 +39,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; +import java.util.concurrent.Executor; /** * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. @@ -148,25 +150,39 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - try { - javaChannel().socket().shutdownOutput(); - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); - } - } else { - loop.execute(new OneTimeTask() { + Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).closeExecutor(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { @Override public void run() { - shutdownOutput(promise); + shutdownOutput0(promise); } }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownOutput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } } return promise; } + private void shutdownOutput0(final ChannelPromise promise) { + try { + javaChannel().socket().shutdownOutput(); + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } + @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); @@ -308,6 +324,21 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } } + @Override + protected AbstractNioUnsafe newUnsafe() { + return new NioSocketChannelUnsafe(); + } + + private final class NioSocketChannelUnsafe extends NioByteUnsafe { + @Override + protected Executor closeExecutor() { + if (config().getSoLinger() > 0) { + return GlobalEventExecutor.INSTANCE; + } + return null; + } + } + private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket);