Make most outbound operations cancellable / More robust promise update

- Inspired by #2214 by @normanmaurer
- Call setUncancellable() before performing an outbound operation
- Add safeSetSuccess/Failure() and use them wherever
This commit is contained in:
Trustin Lee 2014-02-10 14:52:24 -08:00
parent 5395e263ad
commit fee1d9e75c
8 changed files with 55 additions and 35 deletions

View File

@ -149,25 +149,25 @@ public class RxtxChannel extends OioByteStreamChannel {
public void run() { public void run() {
try { try {
doInit(); doInit();
promise.setSuccess(); safeSetSuccess(promise);
if (!wasActive && isActive()) { if (!wasActive && isActive()) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
} }
} catch (Throwable t) { } catch (Throwable t) {
promise.setFailure(t); safeSetFailure(promise, t);
closeIfClosed(); closeIfClosed();
} }
} }
}, waitTime, TimeUnit.MILLISECONDS); }, waitTime, TimeUnit.MILLISECONDS);
} else { } else {
doInit(); doInit();
promise.setSuccess(); safeSetSuccess(promise);
if (!wasActive && isActive()) { if (!wasActive && isActive()) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
promise.setFailure(t); safeSetFailure(promise, t);
closeIfClosed(); closeIfClosed();
} }
} }

View File

@ -409,7 +409,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
AbstractChannel.this, t); AbstractChannel.this, t);
closeForcibly(); closeForcibly();
closeFuture.setClosed(); closeFuture.setClosed();
promise.setFailure(t); safeSetFailure(promise, t);
} }
} }
} }
@ -423,7 +423,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
doRegister(); doRegister();
registered = true; registered = true;
promise.setSuccess(); safeSetSuccess(promise);
pipeline.fireChannelRegistered(); pipeline.fireChannelRegistered();
if (isActive()) { if (isActive()) {
pipeline.fireChannelActive(); pipeline.fireChannelActive();
@ -432,11 +432,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
// Close the channel directly to avoid FD leak. // Close the channel directly to avoid FD leak.
closeForcibly(); closeForcibly();
closeFuture.setClosed(); closeFuture.setClosed();
if (!promise.tryFailure(t)) { safeSetFailure(promise, t);
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
}
} }
} }
@ -463,7 +459,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try { try {
doBind(localAddress); doBind(localAddress);
} catch (Throwable t) { } catch (Throwable t) {
promise.setFailure(t); safeSetFailure(promise, t);
closeIfClosed(); closeIfClosed();
return; return;
} }
@ -475,7 +471,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
}); });
} }
promise.setSuccess(); safeSetSuccess(promise);
} }
@Override @Override
@ -488,7 +484,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try { try {
doDisconnect(); doDisconnect();
} catch (Throwable t) { } catch (Throwable t) {
promise.setFailure(t); safeSetFailure(promise, t);
closeIfClosed(); closeIfClosed();
return; return;
} }
@ -500,7 +496,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
}); });
} }
promise.setSuccess(); safeSetSuccess(promise);
closeIfClosed(); // doDisconnect() might have closed the channel closeIfClosed(); // doDisconnect() might have closed the channel
} }
@ -522,7 +518,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (closeFuture.isDone()) { if (closeFuture.isDone()) {
// Closed already. // Closed already.
promise.setSuccess(); safeSetSuccess(promise);
return; return;
} }
@ -533,10 +529,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try { try {
doClose(); doClose();
closeFuture.setClosed(); closeFuture.setClosed();
promise.setSuccess(); safeSetSuccess(promise);
} catch (Throwable t) { } catch (Throwable t) {
closeFuture.setClosed(); closeFuture.setClosed();
promise.setFailure(t); safeSetFailure(promise, t);
} }
// Fail all the queued messages // Fail all the queued messages
@ -607,9 +603,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (!isActive()) { if (!isActive()) {
// Mark the write request as failure if the channel is inactive. // Mark the write request as failure if the channel is inactive.
if (isOpen()) { if (isOpen()) {
promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); safeSetFailure(promise, NOT_YET_CONNECTED_EXCEPTION);
} else { } else {
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
} }
// release message now to prevent resource-leak // release message now to prevent resource-leak
ReferenceCountUtil.release(msg); ReferenceCountUtil.release(msg);
@ -675,10 +671,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return true; return true;
} }
promise.setFailure(CLOSED_CHANNEL_EXCEPTION); safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
return false; return false;
} }
/**
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
*/
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
/**
* Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message.
*/
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
}
}
protected final void closeIfClosed() { protected final void closeIfClosed() {
if (isOpen()) { if (isOpen()) {
return; return;

View File

@ -97,7 +97,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
} }
private void reject(ChannelPromise promise) { private void reject(ChannelPromise promise) {
promise.setFailure(new UnsupportedOperationException()); safeSetFailure(promise, new UnsupportedOperationException());
} }
} }
} }

View File

@ -28,8 +28,8 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle; import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -325,8 +325,7 @@ public final class ChannelOutboundBuffer {
flushed = flushed + 1 & buffer.length - 1; flushed = flushed + 1 & buffer.length - 1;
safeRelease(msg); safeRelease(msg);
safeSuccess(promise);
promise.trySuccess();
decrementPendingOutboundBytes(size); decrementPendingOutboundBytes(size);
return true; return true;
@ -393,6 +392,7 @@ public final class ChannelOutboundBuffer {
nioBufferSize += readableBytes; nioBufferSize += readableBytes;
int count = entry.count; int count = entry.count;
if (count == -1) { if (count == -1) {
//noinspection ConstantValueVariableUse
entry.count = count = buf.nioBufferCount(); entry.count = count = buf.nioBufferCount();
} }
int neededSpace = nioBufferCount + count; int neededSpace = nioBufferCount + count;
@ -577,9 +577,15 @@ public final class ChannelOutboundBuffer {
} }
} }
private static void safeSuccess(ChannelPromise promise) {
if (!promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}
private static void safeFail(ChannelPromise promise, Throwable cause) { private static void safeFail(ChannelPromise promise, Throwable cause) {
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) { if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
logger.warn("Promise done already: {} - new exception is:", promise, cause); logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
} }
} }
@ -611,7 +617,7 @@ public final class ChannelOutboundBuffer {
} }
public long totalPendingWriteBytes() { public long totalPendingWriteBytes() {
return this.totalPendingSize; return totalPendingSize;
} }
private static final class Entry { private static final class Entry {

View File

@ -343,7 +343,7 @@ public class EmbeddedChannel extends AbstractChannel {
private class DefaultUnsafe extends AbstractUnsafe { private class DefaultUnsafe extends AbstractUnsafe {
@Override @Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
promise.setSuccess(); safeSetSuccess(promise);
} }
} }

View File

@ -320,13 +320,13 @@ public class LocalChannel extends AbstractChannel {
@Override @Override
public void connect(final SocketAddress remoteAddress, public void connect(final SocketAddress remoteAddress,
SocketAddress localAddress, final ChannelPromise promise) { SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) { if (!promise.setUncancellable() || !ensureOpen(promise)) {
return; return;
} }
if (state == State.CONNECTED) { if (state == State.CONNECTED) {
Exception cause = new AlreadyConnectedException(); Exception cause = new AlreadyConnectedException();
promise.setFailure(cause); safeSetFailure(promise, cause);
pipeline().fireExceptionCaught(cause); pipeline().fireExceptionCaught(cause);
return; return;
} }
@ -348,7 +348,7 @@ public class LocalChannel extends AbstractChannel {
try { try {
doBind(localAddress); doBind(localAddress);
} catch (Throwable t) { } catch (Throwable t) {
promise.setFailure(t); safeSetFailure(promise, t);
close(voidPromise()); close(voidPromise());
return; return;
} }
@ -357,7 +357,7 @@ public class LocalChannel extends AbstractChannel {
Channel boundChannel = LocalChannelRegistry.get(remoteAddress); Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
if (!(boundChannel instanceof LocalServerChannel)) { if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause = new ChannelException("connection refused"); Exception cause = new ChannelException("connection refused");
promise.setFailure(cause); safeSetFailure(promise, cause);
close(voidPromise()); close(voidPromise());
return; return;
} }

View File

@ -156,7 +156,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override @Override
public void connect( public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!ensureOpen(promise)) { if (!promise.setUncancellable() || !ensureOpen(promise)) {
return; return;
} }

View File

@ -62,7 +62,7 @@ public abstract class AbstractOioChannel extends AbstractChannel {
try { try {
boolean wasActive = isActive(); boolean wasActive = isActive();
doConnect(remoteAddress, localAddress); doConnect(remoteAddress, localAddress);
promise.setSuccess(); safeSetSuccess(promise);
if (!wasActive && isActive()) { if (!wasActive && isActive()) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
} }
@ -72,7 +72,7 @@ public abstract class AbstractOioChannel extends AbstractChannel {
newT.setStackTrace(t.getStackTrace()); newT.setStackTrace(t.getStackTrace());
t = newT; t = newT;
} }
promise.setFailure(t); safeSetFailure(promise, t);
closeIfClosed(); closeIfClosed();
} }
} }