ChannelFlushPromiseNotifier should allow long value for pendingDataSize

Motivation:
At the moment ChannelFlushPromiseNotifier.add(....) takes an int value for pendingDataSize, which may be too small as a user may need to use a long. This can for example be useful when a user writes a FileRegion etc. Beside this the notify* method names are kind of missleading as these should not contain *Future* because it is about ChannelPromises.

Modification:
Add a new add(...) method that takes a long for pendingDataSize and @deprecated the old method. Beside this also @deprecated all *Future* methods and add methods that have *Promise* in the method name to better reflect usage.

Result:
ChannelFlushPromiseNotifier can be used with bigger data.
This commit is contained in:
Norman Maurer 2014-06-03 10:24:44 +02:00
parent 95b6ee80ca
commit a5b230c585

View File

@ -49,15 +49,23 @@ public final class ChannelFlushPromiseNotifier {
} }
/** /**
* Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given * @deprecated use {@link #add(ChannelPromise, long)}
* pendingDataSize was reached.
*/ */
@Deprecated
public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) { public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
return add(promise, (long) pendingDataSize);
}
/**
* Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
* {@code pendingDataSize} was reached.
*/
public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
if (promise == null) { if (promise == null) {
throw new NullPointerException("promise"); throw new NullPointerException("promise");
} }
if (pendingDataSize < 0) { if (pendingDataSize < 0) {
throw new IllegalArgumentException("pendingDataSize must be >= 0 but was" + pendingDataSize); throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize);
} }
long checkpoint = writeCounter + pendingDataSize; long checkpoint = writeCounter + pendingDataSize;
if (promise instanceof FlushCheckpoint) { if (promise instanceof FlushCheckpoint) {
@ -69,13 +77,12 @@ public final class ChannelFlushPromiseNotifier {
} }
return this; return this;
} }
/** /**
* Increase the current write counter by the given delta * Increase the current write counter by the given delta
*/ */
public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) { public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
if (delta < 0) { if (delta < 0) {
throw new IllegalArgumentException("delta must be >= 0 but was" + delta); throw new IllegalArgumentException("delta must be >= 0 but was " + delta);
} }
writeCounter += delta; writeCounter += delta;
return this; return this;
@ -95,11 +102,19 @@ public final class ChannelFlushPromiseNotifier {
* After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
* so not receive anymore notification. * so not receive anymore notification.
*/ */
public ChannelFlushPromiseNotifier notifyFlushFutures() { public ChannelFlushPromiseNotifier notifyPromises() {
notifyFlushFutures0(null); notifyPromises0(null);
return this; return this;
} }
/**
* @deprecated use {@link #notifyPromises()}
*/
@Deprecated
public ChannelFlushPromiseNotifier notifyFlushFutures() {
return notifyPromises();
}
/** /**
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}. * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
@ -111,8 +126,8 @@ public final class ChannelFlushPromiseNotifier {
* *
* So after this operation this {@link ChannelFutureListener} is empty. * So after this operation this {@link ChannelFutureListener} is empty.
*/ */
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) { public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
notifyFlushFutures(); notifyPromises();
for (;;) { for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll(); FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) { if (cp == null) {
@ -127,6 +142,14 @@ public final class ChannelFlushPromiseNotifier {
return this; return this;
} }
/**
* @deprecated use {@link #notifyPromises(Throwable)}
*/
@Deprecated
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
return notifyPromises(cause);
}
/** /**
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
@ -143,8 +166,8 @@ public final class ChannelFlushPromiseNotifier {
* pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()} * pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
* @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s * @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
*/ */
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) { public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
notifyFlushFutures0(cause1); notifyPromises0(cause1);
for (;;) { for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll(); FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) { if (cp == null) {
@ -159,7 +182,15 @@ public final class ChannelFlushPromiseNotifier {
return this; return this;
} }
private void notifyFlushFutures0(Throwable cause) { /**
* @deprecated use {@link #notifyPromises(Throwable, Throwable)}
*/
@Deprecated
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
return notifyPromises(cause1, cause2);
}
private void notifyPromises0(Throwable cause) {
if (flushCheckpoints.isEmpty()) { if (flushCheckpoints.isEmpty()) {
writeCounter = 0; writeCounter = 0;
return; return;
@ -183,17 +214,18 @@ public final class ChannelFlushPromiseNotifier {
} }
flushCheckpoints.remove(); flushCheckpoints.remove();
ChannelPromise promise = cp.promise();
if (cause == null) { if (cause == null) {
if (tryNotify) { if (tryNotify) {
cp.promise().trySuccess(); promise.trySuccess();
} else { } else {
cp.promise().setSuccess(); promise.setSuccess();
} }
} else { } else {
if (tryNotify) { if (tryNotify) {
cp.promise().tryFailure(cause); promise.tryFailure(cause);
} else { } else {
cp.promise().setFailure(cause); promise.setFailure(cause);
} }
} }
} }