[#3662] Fail the connect future on close
Motivation: Because of a bug we missed to fail the connect future when doClose() is called. This can lead to a future which is never notified and so may lead to deadlocks in user-programs. Modifications: Correctly fail the connect future when doClose() is called and the connection was not established yet. Result: Connect future is always notified.
This commit is contained in:
parent
f67b14bf35
commit
1cce998bb0
@ -31,12 +31,14 @@ import io.netty.channel.DefaultFileRegion;
|
|||||||
import io.netty.channel.RecvByteBufAllocator;
|
import io.netty.channel.RecvByteBufAllocator;
|
||||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||||
import io.netty.channel.unix.FileDescriptor;
|
import io.netty.channel.unix.FileDescriptor;
|
||||||
|
import io.netty.util.internal.EmptyArrays;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
import io.netty.util.internal.StringUtil;
|
import io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -45,6 +47,19 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
private static final String EXPECTED_TYPES =
|
private static final String EXPECTED_TYPES =
|
||||||
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
||||||
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
|
StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
|
||||||
|
private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
|
||||||
|
|
||||||
|
static {
|
||||||
|
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The future of the current connection attempt. If not null, subsequent
|
||||||
|
* connection attempts will fail.
|
||||||
|
*/
|
||||||
|
private ChannelPromise connectPromise;
|
||||||
|
private ScheduledFuture<?> connectTimeoutFuture;
|
||||||
|
private SocketAddress requestedRemoteAddress;
|
||||||
|
|
||||||
private volatile boolean inputShutdown;
|
private volatile boolean inputShutdown;
|
||||||
private volatile boolean outputShutdown;
|
private volatile boolean outputShutdown;
|
||||||
@ -388,14 +403,24 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() throws Exception {
|
||||||
|
ChannelPromise promise = connectPromise;
|
||||||
|
if (promise != null) {
|
||||||
|
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
|
||||||
|
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
||||||
|
connectPromise = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledFuture<?> future = connectTimeoutFuture;
|
||||||
|
if (future != null) {
|
||||||
|
future.cancel(false);
|
||||||
|
connectTimeoutFuture = null;
|
||||||
|
}
|
||||||
|
super.doClose();
|
||||||
|
}
|
||||||
|
|
||||||
class EpollStreamUnsafe extends AbstractEpollUnsafe {
|
class EpollStreamUnsafe extends AbstractEpollUnsafe {
|
||||||
/**
|
|
||||||
* The future of the current connection attempt. If not null, subsequent
|
|
||||||
* connection attempts will fail.
|
|
||||||
*/
|
|
||||||
private ChannelPromise connectPromise;
|
|
||||||
private ScheduledFuture<?> connectTimeoutFuture;
|
|
||||||
private SocketAddress requestedRemoteAddress;
|
|
||||||
|
|
||||||
private RecvByteBufAllocator.Handle allocHandle;
|
private RecvByteBufAllocator.Handle allocHandle;
|
||||||
|
|
||||||
@ -454,7 +479,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
|
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
ChannelPromise connectPromise = EpollStreamUnsafe.this.connectPromise;
|
ChannelPromise connectPromise = AbstractEpollStreamChannel.this.connectPromise;
|
||||||
ConnectTimeoutException cause =
|
ConnectTimeoutException cause =
|
||||||
new ConnectTimeoutException("connection timed out: " + remoteAddress);
|
new ConnectTimeoutException("connection timed out: " + remoteAddress);
|
||||||
if (connectPromise != null && connectPromise.tryFailure(cause)) {
|
if (connectPromise != null && connectPromise.tryFailure(cause)) {
|
||||||
|
@ -29,6 +29,7 @@ import io.netty.channel.ConnectTimeoutException;
|
|||||||
import io.netty.channel.EventLoop;
|
import io.netty.channel.EventLoop;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import io.netty.util.ReferenceCounted;
|
import io.netty.util.ReferenceCounted;
|
||||||
|
import io.netty.util.internal.EmptyArrays;
|
||||||
import io.netty.util.internal.OneTimeTask;
|
import io.netty.util.internal.OneTimeTask;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
@ -36,6 +37,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.nio.channels.CancelledKeyException;
|
import java.nio.channels.CancelledKeyException;
|
||||||
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.nio.channels.SelectableChannel;
|
import java.nio.channels.SelectableChannel;
|
||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
@ -49,6 +51,12 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
|||||||
private static final InternalLogger logger =
|
private static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
|
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
|
||||||
|
|
||||||
|
private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
|
||||||
|
|
||||||
|
static {
|
||||||
|
CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
|
||||||
|
}
|
||||||
|
|
||||||
private final SelectableChannel ch;
|
private final SelectableChannel ch;
|
||||||
protected final int readInterestOp;
|
protected final int readInterestOp;
|
||||||
volatile SelectionKey selectionKey;
|
volatile SelectionKey selectionKey;
|
||||||
@ -445,4 +453,20 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
|||||||
|
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doClose() throws Exception {
|
||||||
|
ChannelPromise promise = connectPromise;
|
||||||
|
if (promise != null) {
|
||||||
|
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
|
||||||
|
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
||||||
|
connectPromise = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
ScheduledFuture<?> future = connectTimeoutFuture;
|
||||||
|
if (future != null) {
|
||||||
|
future.cancel(false);
|
||||||
|
connectTimeoutFuture = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,6 +233,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doClose() throws Exception {
|
protected void doClose() throws Exception {
|
||||||
|
super.doClose();
|
||||||
javaChannel().close();
|
javaChannel().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user