Make most outbound operations cancellable / More robust promise update
- Inspired by #2214 by @normanmaurer - Call setUncancellable() before performing an outbound operation - Add safeSetSuccess/Failure() and use them wherever
This commit is contained in:
parent
852a7cec77
commit
96b0a949e1
|
@ -148,25 +148,25 @@ public class RxtxChannel extends OioByteStreamChannel {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
doInit();
|
doInit();
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
if (!wasActive && isActive()) {
|
if (!wasActive && isActive()) {
|
||||||
pipeline().fireChannelActive();
|
pipeline().fireChannelActive();
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
closeIfClosed();
|
closeIfClosed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, waitTime, TimeUnit.MILLISECONDS);
|
}, waitTime, TimeUnit.MILLISECONDS);
|
||||||
} else {
|
} else {
|
||||||
doInit();
|
doInit();
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
if (!wasActive && isActive()) {
|
if (!wasActive && isActive()) {
|
||||||
pipeline().fireChannelActive();
|
pipeline().fireChannelActive();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
closeIfClosed();
|
closeIfClosed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -424,7 +424,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
AbstractChannel.this, t);
|
AbstractChannel.this, t);
|
||||||
closeForcibly();
|
closeForcibly();
|
||||||
closeFuture.setClosed();
|
closeFuture.setClosed();
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -438,7 +438,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
}
|
}
|
||||||
doRegister();
|
doRegister();
|
||||||
registered = true;
|
registered = true;
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
pipeline.fireChannelRegistered();
|
pipeline.fireChannelRegistered();
|
||||||
if (isActive()) {
|
if (isActive()) {
|
||||||
pipeline.fireChannelActive();
|
pipeline.fireChannelActive();
|
||||||
|
@ -447,11 +447,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
// Close the channel directly to avoid FD leak.
|
// Close the channel directly to avoid FD leak.
|
||||||
closeForcibly();
|
closeForcibly();
|
||||||
closeFuture.setClosed();
|
closeFuture.setClosed();
|
||||||
if (!promise.tryFailure(t)) {
|
safeSetFailure(promise, t);
|
||||||
logger.warn(
|
|
||||||
"Tried to fail the registration promise, but it is complete already. " +
|
|
||||||
"Swallowing the cause of the registration failure:", t);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,11 +474,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
try {
|
try {
|
||||||
doBind(localAddress);
|
doBind(localAddress);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
closeIfClosed();
|
closeIfClosed();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
promise.setSuccess();
|
|
||||||
|
|
||||||
if (!wasActive && isActive()) {
|
if (!wasActive && isActive()) {
|
||||||
invokeLater(new Runnable() {
|
invokeLater(new Runnable() {
|
||||||
|
@ -492,6 +487,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
safeSetSuccess(promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -504,11 +501,11 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
try {
|
try {
|
||||||
doDisconnect();
|
doDisconnect();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
closeIfClosed();
|
closeIfClosed();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
promise.setSuccess();
|
|
||||||
if (wasActive && !isActive()) {
|
if (wasActive && !isActive()) {
|
||||||
invokeLater(new Runnable() {
|
invokeLater(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -517,6 +514,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
safeSetSuccess(promise);
|
||||||
closeIfClosed(); // doDisconnect() might have closed the channel
|
closeIfClosed(); // doDisconnect() might have closed the channel
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -538,7 +537,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
|
|
||||||
if (closeFuture.isDone()) {
|
if (closeFuture.isDone()) {
|
||||||
// Closed already.
|
// Closed already.
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -549,10 +548,10 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
try {
|
try {
|
||||||
doClose();
|
doClose();
|
||||||
closeFuture.setClosed();
|
closeFuture.setClosed();
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
closeFuture.setClosed();
|
closeFuture.setClosed();
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fail all the queued messages
|
// Fail all the queued messages
|
||||||
|
@ -590,7 +589,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!registered) {
|
if (!registered) {
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -601,18 +600,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
} finally {
|
} finally {
|
||||||
if (registered) {
|
if (registered) {
|
||||||
registered = false;
|
registered = false;
|
||||||
promise.setSuccess();
|
|
||||||
invokeLater(new Runnable() {
|
invokeLater(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
pipeline.fireChannelUnregistered();
|
pipeline.fireChannelUnregistered();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
safeSetSuccess(promise);
|
||||||
} else {
|
} else {
|
||||||
// Some transports like local and AIO does not allow the deregistration of
|
// Some transports like local and AIO does not allow the deregistration of
|
||||||
// an open channel. Their doDeregister() calls close(). Consequently,
|
// an open channel. Their doDeregister() calls close(). Consequently,
|
||||||
// close() calls deregister() again - no need to fire channelUnregistered.
|
// close() calls deregister() again - no need to fire channelUnregistered.
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -641,9 +640,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
if (!isActive()) {
|
if (!isActive()) {
|
||||||
// Mark the write request as failure if the channel is inactive.
|
// Mark the write request as failure if the channel is inactive.
|
||||||
if (isOpen()) {
|
if (isOpen()) {
|
||||||
promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
|
safeSetFailure(promise, NOT_YET_CONNECTED_EXCEPTION);
|
||||||
} else {
|
} else {
|
||||||
promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
|
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
|
||||||
}
|
}
|
||||||
// release message now to prevent resource-leak
|
// release message now to prevent resource-leak
|
||||||
ReferenceCountUtil.release(msg);
|
ReferenceCountUtil.release(msg);
|
||||||
|
@ -712,10 +711,28 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
promise.setFailure(CLOSED_CHANNEL_EXCEPTION);
|
safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the specified {@code promise} as success. If the {@code promise} is done already, log a message.
|
||||||
|
*/
|
||||||
|
protected final void safeSetSuccess(ChannelPromise promise) {
|
||||||
|
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
|
||||||
|
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks the specified {@code promise} as failure. If the {@code promise} is done already, log a message.
|
||||||
|
*/
|
||||||
|
protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
|
||||||
|
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
||||||
|
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected final void closeIfClosed() {
|
protected final void closeIfClosed() {
|
||||||
if (isOpen()) {
|
if (isOpen()) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -89,7 +89,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
|
||||||
}
|
}
|
||||||
|
|
||||||
private void reject(ChannelPromise promise) {
|
private void reject(ChannelPromise promise) {
|
||||||
promise.setFailure(new UnsupportedOperationException());
|
safeSetFailure(promise, new UnsupportedOperationException());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -281,8 +281,7 @@ public final class ChannelOutboundBuffer {
|
||||||
flushed = flushed + 1 & buffer.length - 1;
|
flushed = flushed + 1 & buffer.length - 1;
|
||||||
|
|
||||||
safeRelease(msg);
|
safeRelease(msg);
|
||||||
|
safeSuccess(promise);
|
||||||
promise.trySuccess();
|
|
||||||
decrementPendingOutboundBytes(size);
|
decrementPendingOutboundBytes(size);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -349,7 +348,8 @@ public final class ChannelOutboundBuffer {
|
||||||
nioBufferSize += readableBytes;
|
nioBufferSize += readableBytes;
|
||||||
int count = entry.count;
|
int count = entry.count;
|
||||||
if (count == -1) {
|
if (count == -1) {
|
||||||
entry.count = count = buf.nioBufferCount();
|
//noinspection ConstantValueVariableUse
|
||||||
|
entry.count = count = buf.nioBufferCount();
|
||||||
}
|
}
|
||||||
int neededSpace = nioBufferCount + count;
|
int neededSpace = nioBufferCount + count;
|
||||||
if (neededSpace > nioBuffers.length) {
|
if (neededSpace > nioBuffers.length) {
|
||||||
|
@ -527,9 +527,15 @@ public final class ChannelOutboundBuffer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void safeSuccess(ChannelPromise promise) {
|
||||||
|
if (!promise.trySuccess()) {
|
||||||
|
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
private static void safeFail(ChannelPromise promise, Throwable cause) {
|
||||||
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
|
||||||
logger.warn("Promise done already: {} - new exception is:", promise, cause);
|
logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -561,7 +567,7 @@ public final class ChannelOutboundBuffer {
|
||||||
}
|
}
|
||||||
|
|
||||||
public long totalPendingWriteBytes() {
|
public long totalPendingWriteBytes() {
|
||||||
return this.totalPendingSize;
|
return totalPendingSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class Entry {
|
private static final class Entry {
|
||||||
|
|
|
@ -334,7 +334,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
||||||
private class DefaultUnsafe extends AbstractUnsafe {
|
private class DefaultUnsafe extends AbstractUnsafe {
|
||||||
@Override
|
@Override
|
||||||
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -320,13 +320,13 @@ public class LocalChannel extends AbstractChannel {
|
||||||
@Override
|
@Override
|
||||||
public void connect(final SocketAddress remoteAddress,
|
public void connect(final SocketAddress remoteAddress,
|
||||||
SocketAddress localAddress, final ChannelPromise promise) {
|
SocketAddress localAddress, final ChannelPromise promise) {
|
||||||
if (!ensureOpen(promise)) {
|
if (!promise.setUncancellable() || !ensureOpen(promise)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == 2) {
|
if (state == 2) {
|
||||||
Exception cause = new AlreadyConnectedException();
|
Exception cause = new AlreadyConnectedException();
|
||||||
promise.setFailure(cause);
|
safeSetFailure(promise, cause);
|
||||||
pipeline().fireExceptionCaught(cause);
|
pipeline().fireExceptionCaught(cause);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -348,7 +348,7 @@ public class LocalChannel extends AbstractChannel {
|
||||||
try {
|
try {
|
||||||
doBind(localAddress);
|
doBind(localAddress);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
close(voidPromise());
|
close(voidPromise());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -357,7 +357,7 @@ public class LocalChannel extends AbstractChannel {
|
||||||
Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
|
Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
|
||||||
if (!(boundChannel instanceof LocalServerChannel)) {
|
if (!(boundChannel instanceof LocalServerChannel)) {
|
||||||
Exception cause = new ChannelException("connection refused");
|
Exception cause = new ChannelException("connection refused");
|
||||||
promise.setFailure(cause);
|
safeSetFailure(promise, cause);
|
||||||
close(voidPromise());
|
close(voidPromise());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,7 +156,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||||
@Override
|
@Override
|
||||||
public void connect(
|
public void connect(
|
||||||
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
||||||
if (!ensureOpen(promise)) {
|
if (!promise.setUncancellable() || !ensureOpen(promise)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,7 +65,7 @@ public abstract class AbstractOioChannel extends AbstractChannel {
|
||||||
try {
|
try {
|
||||||
boolean wasActive = isActive();
|
boolean wasActive = isActive();
|
||||||
doConnect(remoteAddress, localAddress);
|
doConnect(remoteAddress, localAddress);
|
||||||
promise.setSuccess();
|
safeSetSuccess(promise);
|
||||||
if (!wasActive && isActive()) {
|
if (!wasActive && isActive()) {
|
||||||
pipeline().fireChannelActive();
|
pipeline().fireChannelActive();
|
||||||
}
|
}
|
||||||
|
@ -75,7 +75,7 @@ public abstract class AbstractOioChannel extends AbstractChannel {
|
||||||
newT.setStackTrace(t.getStackTrace());
|
newT.setStackTrace(t.getStackTrace());
|
||||||
t = newT;
|
t = newT;
|
||||||
}
|
}
|
||||||
promise.setFailure(t);
|
safeSetFailure(promise, t);
|
||||||
closeIfClosed();
|
closeIfClosed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user