Not execute shutdownOutput(...) and close(...) in the EventLoop if SO_LINGER is used.

Motivation:

If SO_LINGER is used shutdownOutput() and close() syscalls will block until either all data was send or until the timeout exceed. This is a problem when we try to execute them on the EventLoop as this means the EventLoop may be blocked and so can not process any other I/O.

Modifications:

- Add AbstractUnsafe.closeExecutor() which returns null by default and use this Executor for close if not null.
- Override the closeExecutor() in NioSocketChannel and EpollSocketChannel and return GlobalEventExecutor.INSTANCE if getSoLinger() > 0
- use closeExecutor() in shutdownInput(...) in NioSocketChannel and EpollSocketChannel

 Result:

No more blocking of the EventLoop if SO_LINGER is used and shutdownOutput() or close() is called.
This commit is contained in:
Norman Maurer 2015-02-06 12:14:59 +01:00
parent 4e34d2ce6d
commit ad7deb8dc1
5 changed files with 148 additions and 41 deletions

View File

@ -351,25 +351,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
return outputShutdown || !isActive(); return outputShutdown || !isActive();
} }
protected ChannelFuture shutdownOutput0(final ChannelPromise promise) { protected void shutdownOutput0(final ChannelPromise promise) {
EventLoop loop = eventLoop(); try {
if (loop.inEventLoop()) { Native.shutdown(fd().intValue(), false, true);
try { outputShutdown = true;
Native.shutdown(fd().intValue(), false, true); promise.setSuccess();
outputShutdown = true; } catch (Throwable cause) {
promise.setSuccess(); promise.setFailure(cause);
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} }
return promise;
} }
/** /**

View File

@ -18,11 +18,15 @@ package io.netty.channel.epoll;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.OneTimeTask;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.Executor;
/** /**
* {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
@ -123,7 +127,28 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
return shutdownOutput0(promise); Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
} }
@Override @Override
@ -131,6 +156,11 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
return (ServerSocketChannel) super.parent(); return (ServerSocketChannel) super.parent();
} }
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollSocketChannelUnsafe();
}
@Override @Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) { if (localAddress != null) {
@ -145,4 +175,14 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
} }
return false; return false;
} }
private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe {
@Override
protected Executor closeExecutor() {
if (config().getSoLinger() > 0) {
return GlobalEventExecutor.INSTANCE;
}
return null;
}
}
} }

View File

@ -31,6 +31,7 @@ import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
/** /**
@ -613,23 +614,60 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return; return;
} }
if (outboundBuffer == null) {
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
return;
}
if (closeFuture.isDone()) { if (closeFuture.isDone()) {
// Closed already. // Closed already.
safeSetSuccess(promise); safeSetSuccess(promise);
return; return;
} }
boolean wasActive = isActive(); final boolean wasActive = isActive();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; final ChannelOutboundBuffer buffer = outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = closeExecutor();
Throwable error = null; if (closeExecutor != null) {
try { closeExecutor.execute(new OneTimeTask() {
doClose(); @Override
} catch (Throwable t) { public void run() {
error = t; Throwable cause = null;
try {
doClose();
} catch (Throwable t) {
cause = t;
}
final Throwable error = cause;
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new OneTimeTask() {
@Override
public void run() {
closeAndDeregister(buffer, wasActive, promise, error);
}
});
}
});
} else {
Throwable error = null;
try {
doClose();
} catch (Throwable t) {
error = t;
}
closeAndDeregister(buffer, wasActive, promise, error);
} }
}
private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive,
ChannelPromise promise, Throwable error) {
// Fail all the queued messages // Fail all the queued messages
try { try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
@ -882,6 +920,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return cause; return cause;
} }
/**
* @return {@link Executor} to execute {@link #doClose()} or {@code null} if it should be done in the
* {@link EventLoop}.
+
*/
protected Executor closeExecutor() {
return null;
}
} }
/** /**

View File

@ -57,7 +57,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
return new NioByteUnsafe(); return new NioByteUnsafe();
} }
private final class NioByteUnsafe extends AbstractNioUnsafe { protected class NioByteUnsafe extends AbstractNioUnsafe {
private void closeOnRead(ChannelPipeline pipeline) { private void closeOnRead(ChannelPipeline pipeline) {
SelectionKey key = selectionKey(); SelectionKey key = selectionKey();
@ -90,7 +90,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
@Override @Override
public void read() { public final void read() {
final ChannelConfig config = config(); final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) { if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime // ChannelConfig.setAutoRead(false) was called in the meantime

View File

@ -28,6 +28,7 @@ import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import java.io.IOException; import java.io.IOException;
@ -38,6 +39,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executor;
/** /**
* {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
@ -147,25 +149,39 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
EventLoop loop = eventLoop(); Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).closeExecutor();
if (loop.inEventLoop()) { if (closeExecutor != null) {
try { closeExecutor.execute(new OneTimeTask() {
javaChannel().socket().shutdownOutput();
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
} else {
loop.execute(new OneTimeTask() {
@Override @Override
public void run() { public void run() {
shutdownOutput(promise); shutdownOutput0(promise);
} }
}); });
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
} }
return promise; return promise;
} }
private void shutdownOutput0(final ChannelPromise promise) {
try {
javaChannel().socket().shutdownOutput();
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
}
@Override @Override
protected SocketAddress localAddress0() { protected SocketAddress localAddress0() {
return javaChannel().socket().getLocalSocketAddress(); return javaChannel().socket().getLocalSocketAddress();
@ -307,6 +323,21 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} }
} }
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
@Override
protected Executor closeExecutor() {
if (config().getSoLinger() > 0) {
return GlobalEventExecutor.INSTANCE;
}
return null;
}
}
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket); super(channel, javaSocket);