ChannelFlushPromiseNotifier should allow long value for pendingDataSize

Motivation:
At the moment ChannelFlushPromiseNotifier.add(....) takes an int value for pendingDataSize, which may be too small as a user may need to use a long. This can for example be useful when a user writes a FileRegion etc. Beside this the notify* method names are kind of missleading as these should not contain *Future* because it is about ChannelPromises.

Modification:
Alter add(...) method to take a long for pendingDataSize. Rename all *Future* methods to have *Promise* in the method name to better reflect usage.

Result:
ChannelFlushPromiseNotifier can be used with bigger data.
This commit is contained in:
Norman Maurer 2014-06-03 10:24:44 +02:00
parent ac4c1c88a7
commit 4666ac08d0

View File

@ -50,9 +50,9 @@ public final class ChannelFlushPromiseNotifier {
/**
* Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
* pendingDataSize was reached.
* {@code pendingDataSize} was reached.
*/
public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) {
public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) {
if (promise == null) {
throw new NullPointerException("promise");
}
@ -69,7 +69,6 @@ public final class ChannelFlushPromiseNotifier {
}
return this;
}
/**
* Increase the current write counter by the given delta
*/
@ -89,19 +88,19 @@ public final class ChannelFlushPromiseNotifier {
}
/**
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and
* their pendingDatasize is smaller after the the current writeCounter returned by {@link #writeCounter()}.
*
* After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
* so not receive anymore notification.
*/
public ChannelFlushPromiseNotifier notifyFlushFutures() {
notifyFlushFutures0(null);
public ChannelFlushPromiseNotifier notifyPromises() {
notifyPromises0(null);
return this;
}
/**
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and
* their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}.
*
* After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
@ -111,8 +110,8 @@ public final class ChannelFlushPromiseNotifier {
*
* So after this operation this {@link ChannelFutureListener} is empty.
*/
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
notifyFlushFutures();
public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) {
notifyPromises();
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
@ -128,7 +127,7 @@ public final class ChannelFlushPromiseNotifier {
}
/**
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and
* Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and
* their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using
* the given cause1.
*
@ -143,8 +142,8 @@ public final class ChannelFlushPromiseNotifier {
* pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
* @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
*/
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
notifyFlushFutures0(cause1);
public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) {
notifyPromises0(cause1);
for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll();
if (cp == null) {
@ -159,7 +158,15 @@ public final class ChannelFlushPromiseNotifier {
return this;
}
private void notifyFlushFutures0(Throwable cause) {
/**
* @deprecated use {@link #notifyPromises(Throwable, Throwable)}
*/
@Deprecated
public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
return notifyPromises(cause1, cause2);
}
private void notifyPromises0(Throwable cause) {
if (flushCheckpoints.isEmpty()) {
writeCounter = 0;
return;
@ -183,17 +190,18 @@ public final class ChannelFlushPromiseNotifier {
}
flushCheckpoints.remove();
ChannelPromise promise = cp.promise();
if (cause == null) {
if (tryNotify) {
cp.promise().trySuccess();
promise.trySuccess();
} else {
cp.promise().setSuccess();
promise.setSuccess();
}
} else {
if (tryNotify) {
cp.promise().tryFailure(cause);
promise.tryFailure(cause);
} else {
cp.promise().setFailure(cause);
promise.setFailure(cause);
}
}
}