diff --git a/src/main/java/com/mgabriel/chronicle/flux/ChronicleStore.java b/src/main/java/com/mgabriel/chronicle/flux/ChronicleStore.java index 8a67d42..2575454 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/ChronicleStore.java +++ b/src/main/java/com/mgabriel/chronicle/flux/ChronicleStore.java @@ -38,7 +38,7 @@ public class ChronicleStore implements FluxStore { /** * * @param path path were the Chronicle Queue will store the files. - * This path should not be a network file system (see https://github.com/OpenHFT/Chronicle-Queue for more details) + * This path should not be a network file system (see the Chronicle queue documentation for more detail * @param serializer data serializer * @param deserializer data deserializer */ @@ -208,7 +208,7 @@ public class ChronicleStore implements FluxStore { /** * @param path path were the Chronicle Queue will store the files. - * This path should not be a network file system (see https://github.com/OpenHFT/Chronicle-Queue for more details) + * This path should not be a network file system (see the Chronicle queue documentation for more detail * @return this builder */ public ChronicleStoreBuilder path(String path) { diff --git a/src/main/java/com/mgabriel/chronicle/flux/FluxStore.java b/src/main/java/com/mgabriel/chronicle/flux/FluxStore.java index b00ef0f..a708cf3 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/FluxStore.java +++ b/src/main/java/com/mgabriel/chronicle/flux/FluxStore.java @@ -10,7 +10,7 @@ import reactor.core.publisher.Flux; /** * Reactive store used to store and replay a Flux. * - * @param the value type + * @param the data type */ public interface FluxStore { diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java index 7fb5527..22c3dce 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java @@ -7,10 +7,19 @@ import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.publisher.Flux; +/** + * A flux that can be used to replay historical values with different strategies. + * + * @param data type + */ public class ReplayFlux extends Flux implements Scannable { private final Flux source; private final Function timestampExtractor; + /** + * @param source the source flux. + * @param timestampExtractor extracts the epoch time in ms from the values. + */ public ReplayFlux(Flux source, Function timestampExtractor) { this.source = source; this.timestampExtractor = timestampExtractor; @@ -30,18 +39,33 @@ public class ReplayFlux extends Flux implements Scannable { return (Scannable) source; } + /** + * @return a flux that will replay the values with their original timing + * (e.g. if the values were received with a 1 second interval, the returned flux will emit at a 1 second interval). + */ public ReplayFlux withOriginalTiming(){ return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor); } + /** + * @return a flux that will replay the values with a time acceleration applied to their original timing + * (e.g. if the values were received with a 2 second interval, and the time acceleration is 2, then the returned flux will emit at a 1 second interval). + */ public ReplayFlux withTimeAcceleration(double acceleration){ return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor); } + /** + * @return a flux that will replay the values in a loop. + */ public Flux> inLoop(){ return inLoop(Duration.ofMillis(0)); } + /** + * @param delayBeforeLoopRestart duration to wait before each loop. + * @return a flux that will replay the values in a loop. + */ public Flux> inLoop(Duration delayBeforeLoopRestart){ return source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)); } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java index ead4340..78309c3 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java @@ -4,17 +4,17 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; /** * A transformer that takes a source flux and replays it in a loop. * The values are wrapped in a {@link ReplayValue} object to indicate when the loop restarts. - * This information can be used by the application to perform some action when the loop restarts (clear caches, etc.) + * This information can be used by the application to perform some actions when the loop restarts (clear caches, etc.) * * It is possible to specify a delay before each loop restart. - * Please note that if you chain + * Please note that if you add other operators in the reactive stream after this transformer, you might not see the + * impact of the restart delay since it will be applied as soon as items are requested on the subscription. * * @param data type */ @@ -53,7 +53,6 @@ public class ReplayInLoop implements Function, Publisher> wrapAsReplayValue(AtomicBoolean firstValueSent) { return val -> { if (!firstValueSent.getAndSet(true)) { diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java deleted file mode 100644 index 278616f..0000000 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.mgabriel.chronicle.flux.replay; - -import java.time.Duration; -import java.util.function.Function; - -import reactor.core.CoreSubscriber; -import reactor.core.Scannable; -import reactor.core.publisher.Flux; - -public class ReplayLoopFlux extends Flux> implements Scannable{ - - private final Flux source; - private final Duration delayBeforeLoopRestart; - - public ReplayLoopFlux(Flux source, Duration delayBeforeLoopRestart) { - this.source = source; - this.delayBeforeLoopRestart = delayBeforeLoopRestart; - } - - @Override - public void subscribe(CoreSubscriber> actual) { - source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)).subscribe(actual); - } - - @Override - public Object scanUnsafe(Scannable.Attr attr) { - return getScannable().scanUnsafe(attr); - } - - private Scannable getScannable() { - return (Scannable) source; - } -} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java index c05bdaf..f5cd529 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java @@ -2,6 +2,10 @@ package com.mgabriel.chronicle.flux.replay; import java.util.Objects; +/** + * Default implementation of a {@link ReplayValue} + * @param data type + */ public class ReplayValueImpl implements ReplayValue{ private final boolean isLoopReset; private final T value; diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java index c031eea..b798320 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java @@ -8,15 +8,28 @@ import java.util.function.Predicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +/** + * A transformer that takes a source flux and replays the values with their original timing. + * It is also possible to specify a time acceleration factor to increase or decrease the replay speed. + * + * @param data type + */ public class ReplayWithOriginalTiming implements Function, Publisher> { private final Function timestampExtractor; private final double timeAcceleration; private final Timed TOKEN = new TimedValue<>(0, null); + /** + * @param timestampExtractor extracts the epoch time in ms from the values. + */ public ReplayWithOriginalTiming(Function timestampExtractor) { this(timestampExtractor, 1); } + /** + * @param timestampExtractor extracts the epoch time in ms from the values. + * @param timeAcceleration time acceleration factor. + */ public ReplayWithOriginalTiming(Function timestampExtractor, double timeAcceleration) { this.timestampExtractor = timestampExtractor; this.timeAcceleration = timeAcceleration; diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java b/src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java index d535cf9..0c748f9 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java @@ -1,5 +1,10 @@ package com.mgabriel.chronicle.flux.replay; -public interface Timed extends WrappedValue { +/** + * Wraps a value with its timestamp. + * + * @param data type + */ +interface Timed extends WrappedValue { long time(); } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java index 2eb0b8b..fcccff8 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java @@ -1,4 +1,9 @@ package com.mgabriel.chronicle.flux.replay; -public interface TimedReplayValue extends Timed, ReplayValue{ +/** + * Wraps a {@link ReplayValue} with its timestamp. + * + * @param data type + */ +interface TimedReplayValue extends Timed, ReplayValue{ } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java index 226988a..a25fc00 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java @@ -2,11 +2,16 @@ package com.mgabriel.chronicle.flux.replay; import java.util.Objects; -public class TimedValue implements Timed { +/** + * Default implementation of a {@link Timed} value. + * + * @param + */ +class TimedValue implements Timed { private final long time; private final T value; - public TimedValue(long time, T value) { + TimedValue(long time, T value) { this.time = time; this.value = value; } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java index 4572dfb..2a95fc0 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java @@ -1,5 +1,6 @@ package com.mgabriel.chronicle.flux.replay; + class ValueToDelay implements WrappedValue { private final long delay; private final T value; @@ -14,7 +15,7 @@ class ValueToDelay implements WrappedValue { return value; } - public long delay() { + long delay() { return delay; } }