diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 3fb0bbd25e..36d26c2ad6 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -351,25 +351,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { return outputShutdown || !isActive(); } - protected ChannelFuture shutdownOutput0(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - try { - Native.shutdown(fd().intValue(), false, true); - outputShutdown = true; - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); - } - } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownOutput0(promise); - } - }); + protected void shutdownOutput0(final ChannelPromise promise) { + try { + Native.shutdown(fd().intValue(), false, true); + outputShutdown = true; + promise.setSuccess(); + } catch (Throwable cause) { + promise.setFailure(cause); } - return promise; } /** diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 36d1e37e4e..769fdf9f73 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -18,11 +18,15 @@ package io.netty.channel.epoll; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.internal.OneTimeTask; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.concurrent.Executor; /** * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for @@ -123,7 +127,28 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - return shutdownOutput0(promise); + Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).closeExecutor(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownOutput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } + } + return promise; } @Override @@ -131,6 +156,11 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme return (ServerSocketChannel) super.parent(); } + @Override + protected AbstractEpollUnsafe newUnsafe() { + return new EpollSocketChannelUnsafe(); + } + @Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { @@ -145,4 +175,14 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme } return false; } + + private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe { + @Override + protected Executor closeExecutor() { + if (config().getSoLinger() > 0) { + return GlobalEventExecutor.INSTANCE; + } + return null; + } + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index e446776b57..e61bcb11c3 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -31,6 +31,7 @@ import java.net.SocketAddress; import java.net.SocketException; import java.nio.channels.ClosedChannelException; import java.nio.channels.NotYetConnectedException; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; /** @@ -613,23 +614,60 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return; } + if (outboundBuffer == null) { + // This means close() was called before so we just register a listener and return + closeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + promise.setSuccess(); + } + }); + return; + } + if (closeFuture.isDone()) { // Closed already. safeSetSuccess(promise); return; } - boolean wasActive = isActive(); - ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; - this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. - - Throwable error = null; - try { - doClose(); - } catch (Throwable t) { - error = t; + final boolean wasActive = isActive(); + final ChannelOutboundBuffer buffer = outboundBuffer; + outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. + Executor closeExecutor = closeExecutor(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { + @Override + public void run() { + Throwable cause = null; + try { + doClose(); + } catch (Throwable t) { + cause = t; + } + final Throwable error = cause; + // Call invokeLater so closeAndDeregister is executed in the EventLoop again! + invokeLater(new OneTimeTask() { + @Override + public void run() { + closeAndDeregister(buffer, wasActive, promise, error); + } + }); + } + }); + } else { + Throwable error = null; + try { + doClose(); + } catch (Throwable t) { + error = t; + } + closeAndDeregister(buffer, wasActive, promise, error); } + } + private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive, + ChannelPromise promise, Throwable error) { // Fail all the queued messages try { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); @@ -882,6 +920,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return cause; } + + /** + * @return {@link Executor} to execute {@link #doClose()} or {@code null} if it should be done in the + * {@link EventLoop}. + + + */ + protected Executor closeExecutor() { + return null; + } } /** diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index b4e5b19049..80bf96cc65 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -57,7 +57,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { return new NioByteUnsafe(); } - private final class NioByteUnsafe extends AbstractNioUnsafe { + protected class NioByteUnsafe extends AbstractNioUnsafe { private void closeOnRead(ChannelPipeline pipeline) { SelectionKey key = selectionKey(); @@ -90,7 +90,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } @Override - public void read() { + public final void read() { final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 1798c512c1..df70f0e5dd 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -28,6 +28,7 @@ import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannelConfig; +import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.OneTimeTask; import java.io.IOException; @@ -38,6 +39,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; +import java.util.concurrent.Executor; /** * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. @@ -147,25 +149,39 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty @Override public ChannelFuture shutdownOutput(final ChannelPromise promise) { - EventLoop loop = eventLoop(); - if (loop.inEventLoop()) { - try { - javaChannel().socket().shutdownOutput(); - promise.setSuccess(); - } catch (Throwable t) { - promise.setFailure(t); - } - } else { - loop.execute(new OneTimeTask() { + Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).closeExecutor(); + if (closeExecutor != null) { + closeExecutor.execute(new OneTimeTask() { @Override public void run() { - shutdownOutput(promise); + shutdownOutput0(promise); } }); + } else { + EventLoop loop = eventLoop(); + if (loop.inEventLoop()) { + shutdownOutput0(promise); + } else { + loop.execute(new OneTimeTask() { + @Override + public void run() { + shutdownOutput0(promise); + } + }); + } } return promise; } + private void shutdownOutput0(final ChannelPromise promise) { + try { + javaChannel().socket().shutdownOutput(); + promise.setSuccess(); + } catch (Throwable t) { + promise.setFailure(t); + } + } + @Override protected SocketAddress localAddress0() { return javaChannel().socket().getLocalSocketAddress(); @@ -307,6 +323,21 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } } + @Override + protected AbstractNioUnsafe newUnsafe() { + return new NioSocketChannelUnsafe(); + } + + private final class NioSocketChannelUnsafe extends NioByteUnsafe { + @Override + protected Executor closeExecutor() { + if (config().getSoLinger() > 0) { + return GlobalEventExecutor.INSTANCE; + } + return null; + } + } + private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket);