[#1344] Fix race condition in DefaultChannelPromise / DefaultChannelProgressivePromise which could lead to listeners that are not notified
This commit is contained in:
parent
5dd867ee23
commit
e48bc9c086
@ -43,12 +43,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
private volatile Object result;
|
private volatile Object result;
|
||||||
private Object listeners; // Can be ChannelFutureListener or DefaultFutureListeners
|
private Object listeners; // Can be ChannelFutureListener or DefaultFutureListeners
|
||||||
|
|
||||||
/**
|
private long waiters;
|
||||||
* The the most significant 24 bits of this field represents the number of waiter threads waiting for this promise
|
|
||||||
* with await*() and sync*(). Subclasses can use the other 40 bits of this field to represents its own state, and
|
|
||||||
* are responsible for retaining the most significant 24 bits as is when modifying this field.
|
|
||||||
*/
|
|
||||||
protected long state;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance.
|
* Creates a new instance.
|
||||||
@ -446,19 +441,18 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasWaiters() {
|
private boolean hasWaiters() {
|
||||||
return (state & 0xFFFFFF0000000000L) != 0;
|
return waiters > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void incWaiters() {
|
private void incWaiters() {
|
||||||
long newState = state + 0x10000000000L;
|
if (waiters == Long.MAX_VALUE) {
|
||||||
if ((newState & 0xFFFFFF0000000000L) == 0) {
|
|
||||||
throw new IllegalStateException("too many waiters");
|
throw new IllegalStateException("too many waiters");
|
||||||
}
|
}
|
||||||
state = newState;
|
waiters++;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void decWaiters() {
|
private void decWaiters() {
|
||||||
state -= 0x10000000000L;
|
waiters--;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void notifyListeners() {
|
private void notifyListeners() {
|
||||||
|
@ -30,6 +30,7 @@ public class DefaultChannelProgressivePromise
|
|||||||
extends DefaultProgressivePromise<Void> implements ChannelProgressivePromise, FlushCheckpoint {
|
extends DefaultProgressivePromise<Void> implements ChannelProgressivePromise, FlushCheckpoint {
|
||||||
|
|
||||||
private final Channel channel;
|
private final Channel channel;
|
||||||
|
private long checkpoint;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance.
|
* Creates a new instance.
|
||||||
@ -145,15 +146,12 @@ public class DefaultChannelProgressivePromise
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long flushCheckpoint() {
|
public long flushCheckpoint() {
|
||||||
return state & 0x000000FFFFFFFFFFL;
|
return checkpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flushCheckpoint(long checkpoint) {
|
public void flushCheckpoint(long checkpoint) {
|
||||||
if ((checkpoint & 0xFFFFFF0000000000L) != 0) {
|
this.checkpoint = checkpoint;
|
||||||
throw new IllegalStateException("flushCheckpoint overflow");
|
|
||||||
}
|
|
||||||
state = state & 0xFFFFFF0000000000L | checkpoint;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -28,6 +28,7 @@ import io.netty.util.concurrent.GenericFutureListener;
|
|||||||
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
|
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
|
||||||
|
|
||||||
private final Channel channel;
|
private final Channel channel;
|
||||||
|
private long checkpoint;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance.
|
* Creates a new instance.
|
||||||
@ -137,15 +138,12 @@ public class DefaultChannelPromise extends DefaultPromise<Void> implements Chann
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long flushCheckpoint() {
|
public long flushCheckpoint() {
|
||||||
return state & 0x000000FFFFFFFFFFL;
|
return checkpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flushCheckpoint(long checkpoint) {
|
public void flushCheckpoint(long checkpoint) {
|
||||||
if ((checkpoint & 0xFFFFFF0000000000L) != 0) {
|
this.checkpoint = checkpoint;
|
||||||
throw new IllegalStateException("flushCheckpoint overflow");
|
|
||||||
}
|
|
||||||
state = state & 0xFFFFFF0000000000L | checkpoint;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
Reference in New Issue
Block a user