EPOLL Shutdown and Half Closed
Motivation: The EPOLL module was not completly respecting the half closed state. It may have missed events, or procssed events when it should not have due to checking isOpen instead of the appropriate shutdown state. Modifications: - use FileDescriptor's isShutdown* methods instead of isOpen to check for processing events. Result: Half closed code in EPOLL module is more correct.
This commit is contained in:
parent
dbbdbe11a6
commit
c7cb104dc4
@ -26,7 +26,6 @@ import io.netty.channel.ChannelOption;
|
|||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||||
import io.netty.channel.unix.FileDescriptor;
|
|
||||||
import io.netty.channel.unix.Socket;
|
import io.netty.channel.unix.Socket;
|
||||||
import io.netty.channel.unix.UnixChannel;
|
import io.netty.channel.unix.UnixChannel;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
@ -46,7 +45,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
protected int flags = Native.EPOLLET;
|
protected int flags = Native.EPOLLET;
|
||||||
|
|
||||||
protected volatile boolean active;
|
protected volatile boolean active;
|
||||||
private volatile boolean inputShutdown;
|
|
||||||
|
|
||||||
AbstractEpollChannel(Socket fd, int flag) {
|
AbstractEpollChannel(Socket fd, int flag) {
|
||||||
this(null, fd, flag, false);
|
this(null, fd, flag, false);
|
||||||
@ -98,15 +96,14 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws Exception {
|
protected void doClose() throws Exception {
|
||||||
boolean active = this.active;
|
|
||||||
this.active = false;
|
this.active = false;
|
||||||
FileDescriptor fd = fileDescriptor;
|
Socket fd = fileDescriptor;
|
||||||
try {
|
try {
|
||||||
// deregister from epoll now and shutdown the socket.
|
// deregister from epoll now and shutdown the socket.
|
||||||
doDeregister();
|
doDeregister();
|
||||||
if (active) {
|
if (!fd.isShutdown()) {
|
||||||
try {
|
try {
|
||||||
fd().shutdown(true, true);
|
fd().shutdown();
|
||||||
} catch (IOException ignored) {
|
} catch (IOException ignored) {
|
||||||
// The FD will be closed, so if shutdown fails there is nothing we can do.
|
// The FD will be closed, so if shutdown fails there is nothing we can do.
|
||||||
}
|
}
|
||||||
@ -183,10 +180,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
loop.add(this);
|
loop.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final boolean isInputShutdown0() {
|
|
||||||
return inputShutdown;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected abstract AbstractEpollUnsafe newUnsafe();
|
protected abstract AbstractEpollUnsafe newUnsafe();
|
||||||
|
|
||||||
@ -318,7 +311,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
* @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise.
|
* @param edgeTriggered {@code true} if the channel is using ET mode. {@code false} otherwise.
|
||||||
*/
|
*/
|
||||||
final void checkResetEpollIn(boolean edgeTriggered) {
|
final void checkResetEpollIn(boolean edgeTriggered) {
|
||||||
if (edgeTriggered && !isInputShutdown0()) {
|
if (edgeTriggered && !fd().isInputShutdown()) {
|
||||||
// trigger a read again as there may be something left to read and because of epoll ET we
|
// trigger a read again as there may be something left to read and because of epoll ET we
|
||||||
// will not get notified again until we read everything from the socket
|
// will not get notified again until we read everything from the socket
|
||||||
eventLoop().execute(new OneTimeTask() {
|
eventLoop().execute(new OneTimeTask() {
|
||||||
@ -367,9 +360,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
* Shutdown the input side of the channel.
|
* Shutdown the input side of the channel.
|
||||||
*/
|
*/
|
||||||
void shutdownInput() {
|
void shutdownInput() {
|
||||||
if (!inputShutdown) { // Best effort check on volatile variable to prevent multiple shutdowns
|
if (!fd().isInputShutdown()) {
|
||||||
inputShutdown = true;
|
|
||||||
if (isOpen()) {
|
|
||||||
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
|
if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
|
||||||
try {
|
try {
|
||||||
fd().shutdown(true, false);
|
fd().shutdown(true, false);
|
||||||
@ -384,7 +375,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
|
public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
|
||||||
@ -415,6 +405,9 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
|
|||||||
* Called once a EPOLLOUT event is ready to be processed
|
* Called once a EPOLLOUT event is ready to be processed
|
||||||
*/
|
*/
|
||||||
void epollOutReady() {
|
void epollOutReady() {
|
||||||
|
if (fd().isOutputShutdown()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
// directly call super.flush0() to force a flush now
|
// directly call super.flush0() to force a flush now
|
||||||
super.flush0();
|
super.flush0();
|
||||||
}
|
}
|
||||||
|
@ -112,6 +112,9 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
|
|||||||
@Override
|
@Override
|
||||||
void epollInReady() {
|
void epollInReady() {
|
||||||
assert eventLoop().inEventLoop();
|
assert eventLoop().inEventLoop();
|
||||||
|
if (fd().isInputShutdown()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
|
|
||||||
final ChannelConfig config = config();
|
final ChannelConfig config = config();
|
||||||
|
@ -71,8 +71,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
private SocketAddress requestedRemoteAddress;
|
private SocketAddress requestedRemoteAddress;
|
||||||
private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue();
|
private final Queue<SpliceInTask> spliceQueue = PlatformDependent.newMpscQueue();
|
||||||
|
|
||||||
private volatile boolean outputShutdown;
|
|
||||||
|
|
||||||
// Lazy init these if we need to splice(...)
|
// Lazy init these if we need to splice(...)
|
||||||
private FileDescriptor pipeIn;
|
private FileDescriptor pipeIn;
|
||||||
private FileDescriptor pipeOut;
|
private FileDescriptor pipeOut;
|
||||||
@ -528,14 +526,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean isOutputShutdown0() {
|
|
||||||
return outputShutdown || !isActive();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void shutdownOutput0(final ChannelPromise promise) {
|
protected void shutdownOutput0(final ChannelPromise promise) {
|
||||||
try {
|
try {
|
||||||
fd().shutdown(false, true);
|
fd().shutdown(false, true);
|
||||||
outputShutdown = true;
|
|
||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
} catch (Throwable cause) {
|
} catch (Throwable cause) {
|
||||||
promise.setFailure(cause);
|
promise.setFailure(cause);
|
||||||
@ -774,6 +767,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
void epollInReady() {
|
void epollInReady() {
|
||||||
|
if (fd().isInputShutdown()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
final ChannelConfig config = config();
|
final ChannelConfig config = config();
|
||||||
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
|
|
||||||
|
@ -526,6 +526,9 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
|
|||||||
@Override
|
@Override
|
||||||
void epollInReady() {
|
void epollInReady() {
|
||||||
assert eventLoop().inEventLoop();
|
assert eventLoop().inEventLoop();
|
||||||
|
if (fd().isInputShutdown()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
DatagramChannelConfig config = config();
|
DatagramChannelConfig config = config();
|
||||||
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
|
|
||||||
|
@ -148,6 +148,9 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void epollInReadFd() {
|
private void epollInReadFd() {
|
||||||
|
if (fd().isInputShutdown()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
boolean edgeTriggered = isFlagSet(Native.EPOLLET);
|
||||||
final ChannelConfig config = config();
|
final ChannelConfig config = config();
|
||||||
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
|
if (!readPending && !edgeTriggered && !config.isAutoRead()) {
|
||||||
|
@ -323,7 +323,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
|||||||
// In either case epollOutReady() will do the correct thing (finish connecting, or fail
|
// In either case epollOutReady() will do the correct thing (finish connecting, or fail
|
||||||
// the connection).
|
// the connection).
|
||||||
// See https://github.com/netty/netty/issues/3848
|
// See https://github.com/netty/netty/issues/3848
|
||||||
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0 && ch.isOpen()) {
|
if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
|
||||||
// Force flush of data as the epoll is writable again
|
// Force flush of data as the epoll is writable again
|
||||||
unsafe.epollOutReady();
|
unsafe.epollOutReady();
|
||||||
}
|
}
|
||||||
@ -333,7 +333,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
|
|||||||
//
|
//
|
||||||
// If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
|
// If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
|
||||||
// try to read from the underlying file descriptor and so notify the user about the error.
|
// try to read from the underlying file descriptor and so notify the user about the error.
|
||||||
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0 && ch.isOpen()) {
|
if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
|
||||||
// The Channel is still open and there is something to read. Do it now.
|
// The Channel is still open and there is something to read. Do it now.
|
||||||
unsafe.epollInReady();
|
unsafe.epollInReady();
|
||||||
}
|
}
|
||||||
|
@ -149,12 +149,12 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isInputShutdown() {
|
public boolean isInputShutdown() {
|
||||||
return isInputShutdown0();
|
return fd().isInputShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isOutputShutdown() {
|
public boolean isOutputShutdown() {
|
||||||
return isOutputShutdown0();
|
return fd().isOutputShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -58,6 +58,10 @@ public final class Socket extends FileDescriptor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void shutdown() throws IOException {
|
||||||
|
shutdown(true, true);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isShutdown() {
|
public boolean isShutdown() {
|
||||||
return isInputShutdown() && isOutputShutdown();
|
return isInputShutdown() && isOutputShutdown();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user