Not execute shutdownOutput(...) and close(...) in the EventLoop if SO_LINGER is used.

Motivation:

If SO_LINGER is used shutdownOutput() and close() syscalls will block until either all data was send or until the timeout exceed. This is a problem when we try to execute them on the EventLoop as this means the EventLoop may be blocked and so can not process any other I/O.

Modifications:

- Add AbstractUnsafe.closeExecutor() which returns null by default and use this Executor for close if not null.
- Override the closeExecutor() in NioSocketChannel and EpollSocketChannel and return GlobalEventExecutor.INSTANCE if getSoLinger() > 0
- use closeExecutor() in shutdownInput(...) in NioSocketChannel and EpollSocketChannel

 Result:

No more blocking of the EventLoop if SO_LINGER is used and shutdownOutput() or close() is called.
This commit is contained in:
Norman Maurer 2015-02-06 12:14:59 +01:00
parent 6231113c38
commit 33f75d3740
5 changed files with 139 additions and 36 deletions

View File

@ -351,25 +351,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
return outputShutdown || !isActive(); return outputShutdown || !isActive();
} }
protected ChannelFuture shutdownOutput0(final ChannelPromise promise) { protected void shutdownOutput0(final ChannelPromise promise) {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
try { try {
Native.shutdown(fd().intValue(), false, true); Native.shutdown(fd().intValue(), false, true);
outputShutdown = true; outputShutdown = true;
promise.setSuccess(); promise.setSuccess();
} catch (Throwable t) { } catch (Throwable cause) {
promise.setFailure(t); promise.setFailure(cause);
} }
} else {
loop.execute(new Runnable() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
return promise;
} }
/** /**

View File

@ -18,11 +18,15 @@ package io.netty.channel.epoll;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.OneTimeTask;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.Executor;
/** /**
* {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
@ -123,7 +127,28 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
return shutdownOutput0(promise); Executor closeExecutor = ((EpollSocketChannelUnsafe) unsafe()).closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop();
if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
} }
@Override @Override
@ -131,6 +156,11 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
return (ServerSocketChannel) super.parent(); return (ServerSocketChannel) super.parent();
} }
@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollSocketChannelUnsafe();
}
@Override @Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) { if (localAddress != null) {
@ -145,4 +175,14 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
} }
return false; return false;
} }
private final class EpollSocketChannelUnsafe extends EpollStreamUnsafe {
@Override
protected Executor closeExecutor() {
if (config().getSoLinger() > 0) {
return GlobalEventExecutor.INSTANCE;
}
return null;
}
}
} }

View File

@ -33,6 +33,7 @@ import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.NotYetConnectedException; import java.nio.channels.NotYetConnectedException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
/** /**
@ -544,16 +545,48 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return; return;
} }
if (outboundBuffer == null) {
// This means close() was called before so we just register a listener and return
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
return;
}
if (closeFuture.isDone()) { if (closeFuture.isDone()) {
// Closed already. // Closed already.
safeSetSuccess(promise); safeSetSuccess(promise);
return; return;
} }
boolean wasActive = isActive(); final boolean wasActive = isActive();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; final ChannelOutboundBuffer buffer = outboundBuffer;
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
Executor closeExecutor = closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
doClose0(promise);
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new OneTimeTask() {
@Override
public void run() {
closeAndDeregister(buffer, wasActive);
}
});
}
});
} else {
doClose0(promise);
closeAndDeregister(buffer, wasActive);
}
}
private void doClose0(ChannelPromise promise) {
try { try {
doClose(); doClose();
closeFuture.setClosed(); closeFuture.setClosed();
@ -562,13 +595,14 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
closeFuture.setClosed(); closeFuture.setClosed();
safeSetFailure(promise, t); safeSetFailure(promise, t);
} }
}
private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive) {
// Fail all the queued messages // Fail all the queued messages
try { try {
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
} finally { } finally {
if (wasActive && !isActive()) { if (wasActive && !isActive()) {
invokeLater(new OneTimeTask() { invokeLater(new OneTimeTask() {
@Override @Override
@ -802,6 +836,15 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return cause; return cause;
} }
/**
* @return {@link Executor} to execute {@link #doClose()} or {@code null} if it should be done in the
* {@link EventLoop}.
+
*/
protected Executor closeExecutor() {
return null;
}
} }
/** /**

View File

@ -57,7 +57,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
return new NioByteUnsafe(); return new NioByteUnsafe();
} }
private final class NioByteUnsafe extends AbstractNioUnsafe { protected class NioByteUnsafe extends AbstractNioUnsafe {
private RecvByteBufAllocator.Handle allocHandle; private RecvByteBufAllocator.Handle allocHandle;
private void closeOnRead(ChannelPipeline pipeline) { private void closeOnRead(ChannelPipeline pipeline) {
@ -91,7 +91,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
@Override @Override
public void read() { public final void read() {
final ChannelConfig config = config(); final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) { if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime // ChannelConfig.setAutoRead(false) was called in the meantime

View File

@ -28,6 +28,7 @@ import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannelConfig; import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import java.io.IOException; import java.io.IOException;
@ -38,6 +39,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.Executor;
/** /**
* {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
@ -148,23 +150,37 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
@Override @Override
public ChannelFuture shutdownOutput(final ChannelPromise promise) { public ChannelFuture shutdownOutput(final ChannelPromise promise) {
Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).closeExecutor();
if (closeExecutor != null) {
closeExecutor.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
} else {
EventLoop loop = eventLoop(); EventLoop loop = eventLoop();
if (loop.inEventLoop()) { if (loop.inEventLoop()) {
shutdownOutput0(promise);
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput0(promise);
}
});
}
}
return promise;
}
private void shutdownOutput0(final ChannelPromise promise) {
try { try {
javaChannel().socket().shutdownOutput(); javaChannel().socket().shutdownOutput();
promise.setSuccess(); promise.setSuccess();
} catch (Throwable t) { } catch (Throwable t) {
promise.setFailure(t); promise.setFailure(t);
} }
} else {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput(promise);
}
});
}
return promise;
} }
@Override @Override
@ -308,6 +324,21 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} }
} }
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
@Override
protected Executor closeExecutor() {
if (config().getSoLinger() > 0) {
return GlobalEventExecutor.INSTANCE;
}
return null;
}
}
private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket); super(channel, javaSocket);