From 4666ac08d023b29d500a8522a155c4f273007b33 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 3 Jun 2014 10:24:44 +0200 Subject: [PATCH] ChannelFlushPromiseNotifier should allow long value for pendingDataSize Motivation: At the moment ChannelFlushPromiseNotifier.add(....) takes an int value for pendingDataSize, which may be too small as a user may need to use a long. This can for example be useful when a user writes a FileRegion etc. Beside this the notify* method names are kind of missleading as these should not contain *Future* because it is about ChannelPromises. Modification: Alter add(...) method to take a long for pendingDataSize. Rename all *Future* methods to have *Promise* in the method name to better reflect usage. Result: ChannelFlushPromiseNotifier can be used with bigger data. --- .../channel/ChannelFlushPromiseNotifier.java | 46 +++++++++++-------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java b/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java index 949806d48e..d9cfddd726 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java +++ b/transport/src/main/java/io/netty/channel/ChannelFlushPromiseNotifier.java @@ -50,14 +50,14 @@ public final class ChannelFlushPromiseNotifier { /** * Add a {@link ChannelPromise} to this {@link ChannelFlushPromiseNotifier} which will be notified after the given - * pendingDataSize was reached. + * {@code pendingDataSize} was reached. */ - public ChannelFlushPromiseNotifier add(ChannelPromise promise, int pendingDataSize) { + public ChannelFlushPromiseNotifier add(ChannelPromise promise, long pendingDataSize) { if (promise == null) { throw new NullPointerException("promise"); } if (pendingDataSize < 0) { - throw new IllegalArgumentException("pendingDataSize must be >= 0 but was" + pendingDataSize); + throw new IllegalArgumentException("pendingDataSize must be >= 0 but was " + pendingDataSize); } long checkpoint = writeCounter + pendingDataSize; if (promise instanceof FlushCheckpoint) { @@ -69,13 +69,12 @@ public final class ChannelFlushPromiseNotifier { } return this; } - /** * Increase the current write counter by the given delta */ public ChannelFlushPromiseNotifier increaseWriteCounter(long delta) { if (delta < 0) { - throw new IllegalArgumentException("delta must be >= 0 but was" + delta); + throw new IllegalArgumentException("delta must be >= 0 but was " + delta); } writeCounter += delta; return this; @@ -89,19 +88,19 @@ public final class ChannelFlushPromiseNotifier { } /** - * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and + * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and * their pendingDatasize is smaller after the the current writeCounter returned by {@link #writeCounter()}. * * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and * so not receive anymore notification. */ - public ChannelFlushPromiseNotifier notifyFlushFutures() { - notifyFlushFutures0(null); + public ChannelFlushPromiseNotifier notifyPromises() { + notifyPromises0(null); return this; } /** - * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and + * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and * their pendingDatasize isis smaller then the current writeCounter returned by {@link #writeCounter()}. * * After a {@link ChannelFuture} was notified it will be removed from this {@link ChannelFlushPromiseNotifier} and @@ -111,8 +110,8 @@ public final class ChannelFlushPromiseNotifier { * * So after this operation this {@link ChannelFutureListener} is empty. */ - public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause) { - notifyFlushFutures(); + public ChannelFlushPromiseNotifier notifyPromises(Throwable cause) { + notifyPromises(); for (;;) { FlushCheckpoint cp = flushCheckpoints.poll(); if (cp == null) { @@ -128,7 +127,7 @@ public final class ChannelFlushPromiseNotifier { } /** - * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, int)} and + * Notify all {@link ChannelFuture}s that were registered with {@link #add(ChannelPromise, long)} and * their pendingDatasize is smaller then the current writeCounter returned by {@link #writeCounter()} using * the given cause1. * @@ -143,8 +142,8 @@ public final class ChannelFlushPromiseNotifier { * pendingDataSize is smaller then the current writeCounter returned by {@link #writeCounter()} * @param cause2 the {@link Throwable} which will be used to fail the remaining {@link ChannelFuture}s */ - public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) { - notifyFlushFutures0(cause1); + public ChannelFlushPromiseNotifier notifyPromises(Throwable cause1, Throwable cause2) { + notifyPromises0(cause1); for (;;) { FlushCheckpoint cp = flushCheckpoints.poll(); if (cp == null) { @@ -159,7 +158,15 @@ public final class ChannelFlushPromiseNotifier { return this; } - private void notifyFlushFutures0(Throwable cause) { + /** + * @deprecated use {@link #notifyPromises(Throwable, Throwable)} + */ + @Deprecated + public ChannelFlushPromiseNotifier notifyFlushFutures(Throwable cause1, Throwable cause2) { + return notifyPromises(cause1, cause2); + } + + private void notifyPromises0(Throwable cause) { if (flushCheckpoints.isEmpty()) { writeCounter = 0; return; @@ -183,17 +190,18 @@ public final class ChannelFlushPromiseNotifier { } flushCheckpoints.remove(); + ChannelPromise promise = cp.promise(); if (cause == null) { if (tryNotify) { - cp.promise().trySuccess(); + promise.trySuccess(); } else { - cp.promise().setSuccess(); + promise.setSuccess(); } } else { if (tryNotify) { - cp.promise().tryFailure(cause); + promise.tryFailure(cause); } else { - cp.promise().setFailure(cause); + promise.setFailure(cause); } } }