EPOLL Shutdown and Half Closed

Motivation:
The EPOLL module was not completly respecting the half closed state. It may have missed events, or procssed events when it should not have due to checking isOpen instead of the appropriate shutdown state.

Modifications:
- use FileDescriptor's isShutdown* methods instead of isOpen to check for processing events.

Result:
Half closed code in EPOLL module is more correct.
This commit is contained in:
Scott Mitchell 2015-10-07 14:12:38 -07:00
parent 94ef424e1c
commit a60c472fa4
8 changed files with 36 additions and 34 deletions

View File

@ -25,7 +25,6 @@ import io.netty.channel.ChannelMetadata;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.unix.FileDescriptor;
import io.netty.channel.unix.Socket; import io.netty.channel.unix.Socket;
import io.netty.channel.unix.UnixChannel; import io.netty.channel.unix.UnixChannel;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
@ -45,7 +44,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
protected int flags = Native.EPOLLET; protected int flags = Native.EPOLLET;
protected volatile boolean active; protected volatile boolean active;
private volatile boolean inputShutdown;
AbstractEpollChannel(Socket fd, int flag) { AbstractEpollChannel(Socket fd, int flag) {
this(null, fd, flag, false); this(null, fd, flag, false);
@ -97,15 +95,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
@Override @Override
protected void doClose() throws Exception { protected void doClose() throws Exception {
boolean active = this.active;
this.active = false; this.active = false;
FileDescriptor fd = fileDescriptor; Socket fd = fileDescriptor;
try { try {
// deregister from epoll now and shutdown the socket. // deregister from epoll now and shutdown the socket.
doDeregister(); doDeregister();
if (active) { if (!fd.isShutdown()) {
try { try {
fd().shutdown(true, true); fd().shutdown();
} catch (IOException ignored) { } catch (IOException ignored) {
// The FD will be closed, so if shutdown fails there is nothing we can do. // The FD will be closed, so if shutdown fails there is nothing we can do.
} }
@ -182,10 +179,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
loop.add(this); loop.add(this);
} }
protected final boolean isInputShutdown0() {
return inputShutdown;
}
@Override @Override
protected abstract AbstractEpollUnsafe newUnsafe(); protected abstract AbstractEpollUnsafe newUnsafe();
@ -352,21 +345,18 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
* Shutdown the input side of the channel. * Shutdown the input side of the channel.
*/ */
void shutdownInput() { void shutdownInput() {
if (!inputShutdown) { // Best effort check on volatile variable to prevent multiple shutdowns if (!fd().isInputShutdown()) {
inputShutdown = true; if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
if (isOpen()) { try {
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) { fd().shutdown(true, false);
try { clearEpollIn0();
fd().shutdown(true, false); pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
clearEpollIn0(); } catch (IOException e) {
pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE); pipeline().fireExceptionCaught(e);
} catch (IOException e) {
pipeline().fireExceptionCaught(e);
close(voidPromise());
}
} else {
close(voidPromise()); close(voidPromise());
} }
} else {
close(voidPromise());
} }
} }
} }
@ -386,6 +376,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
* Called once a EPOLLOUT event is ready to be processed * Called once a EPOLLOUT event is ready to be processed
*/ */
void epollOutReady() { void epollOutReady() {
if (fd().isOutputShutdown()) {
return;
}
// directly call super.flush0() to force a flush now // directly call super.flush0() to force a flush now
super.flush0(); super.flush0();
} }

View File

@ -100,6 +100,9 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
if (fd().isInputShutdown()) {
return;
}
boolean edgeTriggered = isFlagSet(Native.EPOLLET); boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final ChannelConfig config = config(); final ChannelConfig config = config();

View File

@ -71,8 +71,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
private SocketAddress requestedRemoteAddress; private SocketAddress requestedRemoteAddress;
private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue(); private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue();
private volatile boolean outputShutdown;
// Lazy init these if we need to splice(...) // Lazy init these if we need to splice(...)
private FileDescriptor pipeIn; private FileDescriptor pipeIn;
private FileDescriptor pipeOut; private FileDescriptor pipeOut;
@ -528,14 +526,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES); "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
} }
protected boolean isOutputShutdown0() {
return outputShutdown || !isActive();
}
protected void shutdownOutput0(final ChannelPromise promise) { protected void shutdownOutput0(final ChannelPromise promise) {
try { try {
fd().shutdown(false, true); fd().shutdown(false, true);
outputShutdown = true;
promise.setSuccess(); promise.setSuccess();
} catch (Throwable cause) { } catch (Throwable cause) {
promise.setFailure(cause); promise.setFailure(cause);
@ -773,6 +766,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
@Override @Override
void epollInReady() { void epollInReady() {
if (fd().isInputShutdown()) {
return;
}
final ChannelConfig config = config(); final ChannelConfig config = config();
boolean edgeTriggered = isFlagSet(Native.EPOLLET); boolean edgeTriggered = isFlagSet(Native.EPOLLET);

View File

@ -522,6 +522,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
@Override @Override
void epollInReady() { void epollInReady() {
assert eventLoop().inEventLoop(); assert eventLoop().inEventLoop();
if (fd().isInputShutdown()) {
return;
}
DatagramChannelConfig config = config(); DatagramChannelConfig config = config();
boolean edgeTriggered = isFlagSet(Native.EPOLLET); boolean edgeTriggered = isFlagSet(Native.EPOLLET);

View File

@ -148,6 +148,9 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
} }
private void epollInReadFd() { private void epollInReadFd() {
if (fd().isInputShutdown()) {
return;
}
boolean edgeTriggered = isFlagSet(Native.EPOLLET); boolean edgeTriggered = isFlagSet(Native.EPOLLET);
final ChannelConfig config = config(); final ChannelConfig config = config();
if (!readPending && !edgeTriggered && !config.isAutoRead()) { if (!readPending && !edgeTriggered && !config.isAutoRead()) {

View File

@ -324,7 +324,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
// In either case epollOutReady() will do the correct thing (finish connecting, or fail // In either case epollOutReady() will do the correct thing (finish connecting, or fail
// the connection). // the connection).
// See https://github.com/netty/netty/issues/3848 // See https://github.com/netty/netty/issues/3848
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0 && ch.isOpen()) { if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
// Force flush of data as the epoll is writable again // Force flush of data as the epoll is writable again
unsafe.epollOutReady(); unsafe.epollOutReady();
} }
@ -334,7 +334,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
// //
// If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
// try to read from the underlying file descriptor and so notify the user about the error. // try to read from the underlying file descriptor and so notify the user about the error.
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0 && ch.isOpen()) { if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
// The Channel is still open and there is something to read. Do it now. // The Channel is still open and there is something to read. Do it now.
unsafe.epollInReady(); unsafe.epollInReady();
} }

View File

@ -149,12 +149,12 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
@Override @Override
public boolean isInputShutdown() { public boolean isInputShutdown() {
return isInputShutdown0(); return fd().isInputShutdown();
} }
@Override @Override
public boolean isOutputShutdown() { public boolean isOutputShutdown() {
return isOutputShutdown0(); return fd().isOutputShutdown();
} }
@Override @Override

View File

@ -58,6 +58,10 @@ public final class Socket extends FileDescriptor {
} }
} }
public void shutdown() throws IOException {
shutdown(true, true);
}
public boolean isShutdown() { public boolean isShutdown() {
return isInputShutdown() && isOutputShutdown(); return isInputShutdown() && isOutputShutdown();
} }