Support method chaining in ChannelFlushPromiseNotifier

This commit is contained in:
Norman Maurer 2013-02-11 07:22:12 +01:00
parent ba71e3dcd0
commit 7e95be0295

View File

@ -31,7 +31,7 @@ public final class ChannelFlushPromiseNotifier {
* Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given
* pendingDataSize was reached. * pendingDataSize was reached.
*/ */
public void addFlushFuture(ChannelPromise future, int pendingDataSize) { public ChannelFlushPromiseNotifier addFlushFuture(ChannelPromise future, int pendingDataSize) {
if (future == null) { if (future == null) {
throw new NullPointerException("future"); throw new NullPointerException("future");
} }
@ -46,16 +46,18 @@ public final class ChannelFlushPromiseNotifier {
} else { } else {
flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future)); flushCheckpoints.add(new DefaultFlushCheckpoint(checkpoint, future));
} }
return this;
} }
/** /**
* Increase the current write counter by the given delta * Increase the current write counter by the given delta
*/ */
public void increaseWriteCounter(long delta) { public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) {
if (delta < 0) { if (delta < 0) {
throw new IllegalArgumentException("delta must be >= 0 but was" + delta); throw new IllegalArgumentException("delta must be >= 0 but was" + delta);
} }
writeCounter += delta; writeCounter += delta;
return this;
} }
/** /**
@ -72,8 +74,9 @@ public final class ChannelFlushPromiseNotifier {
* After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and
* so not receive anymore notificiation. * so not receive anymore notificiation.
*/ */
public void notifyFlushFutures() { public ChannelFlushPromiseNotifier notifyFlushFutures() {
notifyFlushFutures0(null); notifyFlushFutures0(null);
return this;
} }
/** /**
@ -87,7 +90,7 @@ public final class ChannelFlushPromiseNotifier {
* *
* So after this operation this {@link ChannelFutureListener} is empty. * So after this operation this {@link ChannelFutureListener} is empty.
*/ */
public void notifyFlushFutures(Throwable cause) { public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) {
notifyFlushFutures(); notifyFlushFutures();
for (;;) { for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll(); FlushCheckpoint cp = flushCheckpoints.poll();
@ -96,6 +99,7 @@ public final class ChannelFlushPromiseNotifier {
} }
cp.future().setFailure(cause); cp.future().setFailure(cause);
} }
return this;
} }
/** /**
@ -114,7 +118,7 @@ public final class ChannelFlushPromiseNotifier {
* pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()} * pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()}
* @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s * @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s
*/ */
public void notifyFlushFutures(Throwable cause1, Throwable cause2) { public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) {
notifyFlushFutures0(cause1); notifyFlushFutures0(cause1);
for (;;) { for (;;) {
FlushCheckpoint cp = flushCheckpoints.poll(); FlushCheckpoint cp = flushCheckpoints.poll();
@ -123,6 +127,7 @@ public final class ChannelFlushPromiseNotifier {
} }
cp.future().setFailure(cause2); cp.future().setFailure(cause2);
} }
return this;
} }
private void notifyFlushFutures0(Throwable cause) { private void notifyFlushFutures0(Throwable cause) {