ChannelFlushPromiseNotifier should allow long value for pendingDataSize
Motivation: At the moment ChannelFlushPromiseNotifier.add(....) takes an int value for pendingDataSize, which may be too small as a user may need to use a long. This can for example be useful when a user writes a FileRegion etc. Beside this the notify* method names are kind of missleading as these should not contain *Future* because it is about ChannelPromises. Modification: Add a new add(...) method that takes a long for pendingDataSize and @deprecated the old method. Beside this also @deprecated all *Future* methods and add methods that have *Promise* in the method name to better reflect usage. Result: ChannelFlushPromiseNotifier can be used with bigger data.
This commit is contained in:
parent
c1d6ba0598
commit
a79b69adf5
@ -49,15 +49,23 @@ public final class ChannelFlushPromiseNotifier {
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
|
||||
* pendingDataSize was reached.
|
||||
* @deprecated use {@link #add(ChannelPromise, long)}
|
||||
*/
|
||||
@Deprecated
|
||||
public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
|
||||
return add(promise, (long) pendingDataSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
|
||||
* {@code pendingDataSize} was reached.
|
||||
*/
|
||||
public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
|
||||
if (promise == null) {
|
||||
throw new NullPointerException("promise");
|
||||
}
|
||||
if (pendingDataSize < 0) {
|
||||
throw new IllegalArgumentException("pendingDataSize must be >= 0 but was" + pendingDataSize);
|
||||
throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize);
|
||||
}
|
||||
long checkpoint = writeCounter + pendingDataSize;
|
||||
if (promise instanceof FlushCheckpoint) {
|
||||
@ -69,13 +77,12 @@ public final class ChannelFlushPromiseNotifier {
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increase the current write counter by the given delta
|
||||
*/
|
||||
public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
|
||||
if (delta < 0) {
|
||||
throw new IllegalArgumentException("delta must be >= 0 but was" + delta);
|
||||
throw new IllegalArgumentException("delta must be >= 0 but was " + delta);
|
||||
}
|
||||
writeCounter += delta;
|
||||
return this;
|
||||
@ -95,11 +102,19 @@ public final class ChannelFlushPromiseNotifier {
|
||||
* After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
|
||||
* so not receive anymore notification.
|
||||
*/
|
||||
public ChannelFlushPromiseNotifier notifyFlushFutures() {
|
||||
notifyFlushFutures0(null);
|
||||
public ChannelFlushPromiseNotifier notifyPromises() {
|
||||
notifyPromises0(null);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #notifyPromises()}
|
||||
*/
|
||||
@Deprecated
|
||||
public ChannelFlushPromiseNotifier notifyFlushFutures() {
|
||||
return notifyPromises();
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
|
||||
* their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
|
||||
@ -111,8 +126,8 @@ public final class ChannelFlushPromiseNotifier {
|
||||
*
|
||||
* So after this operation this {@link ChannelFutureListener} is empty.
|
||||
*/
|
||||
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
|
||||
notifyFlushFutures();
|
||||
public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
|
||||
notifyPromises();
|
||||
for (;;) {
|
||||
FlushCheckpoint cp = flushCheckpoints.poll();
|
||||
if (cp == null) {
|
||||
@ -127,6 +142,14 @@ public final class ChannelFlushPromiseNotifier {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use {@link #notifyPromises(Throwable)}
|
||||
*/
|
||||
@Deprecated
|
||||
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
|
||||
return notifyPromises(cause);
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
|
||||
* their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
|
||||
@ -143,8 +166,8 @@ public final class ChannelFlushPromiseNotifier {
|
||||
* pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
|
||||
* @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
|
||||
*/
|
||||
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
|
||||
notifyFlushFutures0(cause1);
|
||||
public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
|
||||
notifyPromises0(cause1);
|
||||
for (;;) {
|
||||
FlushCheckpoint cp = flushCheckpoints.poll();
|
||||
if (cp == null) {
|
||||
@ -159,7 +182,15 @@ public final class ChannelFlushPromiseNotifier {
|
||||
return this;
|
||||
}
|
||||
|
||||
private void notifyFlushFutures0(Throwable cause) {
|
||||
/**
|
||||
* @deprecated use {@link #notifyPromises(Throwable, Throwable)}
|
||||
*/
|
||||
@Deprecated
|
||||
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
|
||||
return notifyPromises(cause1, cause2);
|
||||
}
|
||||
|
||||
private void notifyPromises0(Throwable cause) {
|
||||
if (flushCheckpoints.isEmpty()) {
|
||||
writeCounter = 0;
|
||||
return;
|
||||
@ -183,17 +214,18 @@ public final class ChannelFlushPromiseNotifier {
|
||||
}
|
||||
|
||||
flushCheckpoints.remove();
|
||||
ChannelPromise promise = cp.promise();
|
||||
if (cause == null) {
|
||||
if (tryNotify) {
|
||||
cp.promise().trySuccess();
|
||||
promise.trySuccess();
|
||||
} else {
|
||||
cp.promise().setSuccess();
|
||||
promise.setSuccess();
|
||||
}
|
||||
} else {
|
||||
if (tryNotify) {
|
||||
cp.promise().tryFailure(cause);
|
||||
promise.tryFailure(cause);
|
||||
} else {
|
||||
cp.promise().setFailure(cause);
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user