diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java index aff4a1b..1acd355 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java @@ -4,9 +4,10 @@ import java.time.Duration; import java.util.function.Function; import reactor.core.CoreSubscriber; +import reactor.core.Scannable; import reactor.core.publisher.Flux; -public class ReplayFlux extends Flux { +public class ReplayFlux extends Flux implements Scannable { private final Flux source; private final Function timestampExtractor; @@ -18,14 +19,31 @@ public class ReplayFlux extends Flux { @Override public void subscribe(CoreSubscriber actual) { - + source.subscribe(actual); } - public ReplayLoopFlux inLoop(){ + @Override + public Object scanUnsafe(Attr attr) { + return getScannable().scanUnsafe(attr); + } + + private Scannable getScannable() { + return (Scannable) source; + } + + public ReplayFlux withOriginalTiming(){ + return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor); + } + + public ReplayFlux withTimeAcceleration(double acceleration){ + return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor); + } + + public Flux> inLoop(){ return inLoop(Duration.ofMillis(0)); } - public ReplayLoopFlux inLoop(Duration delayBeforeLoopRestart){ - return new ReplayLoopFlux<>(source, timestampExtractor, delayBeforeLoopRestart); + public Flux> inLoop(Duration delayBeforeLoopRestart){ + return source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)); } } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java index abbff29..1678e0f 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java @@ -1,7 +1,6 @@ package com.mgabriel.chronicle.flux.replay; import java.time.Duration; -import java.time.Instant; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -10,12 +9,14 @@ import org.slf4j.LoggerFactory; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Publisher; +import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; public class ReplayInLoop implements Function, Publisher>> { private static final Logger LOGGER = LoggerFactory.getLogger(ReplayInLoop.class); private final Duration delayBeforeRestart; + private final Boolean TOKEN = Boolean.FALSE; public ReplayInLoop(Duration delayBeforeRestart) { this.delayBeforeRestart = delayBeforeRestart; @@ -45,9 +46,8 @@ public class ReplayInLoop implements Function, Publisher source, FluxSink>> sink) { AtomicBoolean firstValueSent = new AtomicBoolean(false); - Flux> nextFlux = source.delaySequence(delayBeforeRestart).map(wrapAsReplayValue(firstValueSent)).doOnNext(v -> System.out - .println("\t\t\taaaa "+ Instant.now()+" " +v)); - sink.next(Flux.defer(() -> nextFlux)); + Flux> nextFlux = source.delaySubscription(delayBeforeRestart).map(wrapAsReplayValue(firstValueSent)); + sink.next(nextFlux); } @NotNull diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java index 8d5ec43..278616f 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java @@ -4,22 +4,30 @@ import java.time.Duration; import java.util.function.Function; import reactor.core.CoreSubscriber; +import reactor.core.Scannable; import reactor.core.publisher.Flux; -public class ReplayLoopFlux extends Flux> { +public class ReplayLoopFlux extends Flux> implements Scannable{ private final Flux source; - private final Function timestampExtractor; private final Duration delayBeforeLoopRestart; - public ReplayLoopFlux(Flux source, Function timestampExtractor, Duration delayBeforeLoopRestart) { + public ReplayLoopFlux(Flux source, Duration delayBeforeLoopRestart) { this.source = source; - this.timestampExtractor = timestampExtractor; this.delayBeforeLoopRestart = delayBeforeLoopRestart; } @Override public void subscribe(CoreSubscriber> actual) { + source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)).subscribe(actual); + } + @Override + public Object scanUnsafe(Scannable.Attr attr) { + return getScannable().scanUnsafe(attr); + } + + private Scannable getScannable() { + return (Scannable) source; } } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java index 385dc3a..c031eea 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java @@ -10,10 +10,16 @@ import reactor.core.publisher.Flux; public class ReplayWithOriginalTiming implements Function, Publisher> { private final Function timestampExtractor; + private final double timeAcceleration; private final Timed TOKEN = new TimedValue<>(0, null); public ReplayWithOriginalTiming(Function timestampExtractor) { + this(timestampExtractor, 1); + } + + public ReplayWithOriginalTiming(Function timestampExtractor, double timeAcceleration) { this.timestampExtractor = timestampExtractor; + this.timeAcceleration = timeAcceleration; } @Override @@ -33,7 +39,7 @@ public class ReplayWithOriginalTiming implements Function, Publisher< private Function, ValueToDelay> calculateDelay() { return tvp -> { - long timeDifference = tvp.timeDifference(); + long timeDifference = Double.valueOf(tvp.timeDifference() / timeAcceleration).longValue(); if (timeDifference < 0 || tvp.first == TOKEN) { timeDifference = 0; } @@ -42,8 +48,7 @@ public class ReplayWithOriginalTiming implements Function, Publisher< } private Function, Publisher> applyDelay() { - return vtd -> { - return Flux.just(TOKEN).delayElements(ofMillis(vtd.delay()));}; + return vtd -> Flux.just(TOKEN).delayElements(ofMillis(vtd.delay())); } private static class TimedValuePair { diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxDemo.java b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxDemo.java new file mode 100644 index 0000000..0e9aa91 --- /dev/null +++ b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxDemo.java @@ -0,0 +1,22 @@ +package com.mgabriel.chronicle.flux.replay; + +import static java.time.Duration.ofSeconds; + +import java.time.Instant; + +import reactor.core.publisher.Flux; + +public class ReplayFluxDemo { + + public static void main(String[] args) { + + Flux source = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L); + + ReplayFlux replayFlux = new ReplayFlux<>(source, v -> v); + + replayFlux.withTimeAcceleration(2).inLoop(ofSeconds(1)) + .doOnNext(i -> System.out.println(Instant.now() + " " + i)) + .blockLast(); + + } +} diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java index bc31551..71b596e 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java @@ -3,6 +3,7 @@ package com.mgabriel.chronicle.flux.replay; import java.time.Duration; import java.time.Instant; +import com.mgabriel.chronicle.flux.WrappedValue; import reactor.core.publisher.Flux; public class ReplayWithOriginalTimingDemo { @@ -11,8 +12,11 @@ public class ReplayWithOriginalTimingDemo { Flux just = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L); -// Flux> result = just.compose(new ReplayWithOriginalTiming<>(l -> l)).compose(new ReplayInLoop<>(Duration.ofSeconds(1))); - Flux> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(1))).compose(new ReplayWithOriginalTiming<>(l -> l.value())); + Flux> result = just.compose(new ReplayWithOriginalTiming<>(l -> l)).compose(new ReplayInLoop<>(Duration.ofSeconds(1))); + + // In this order the delay of the ReplayInLoop operator is not working because the elements in the replayInLoop + // are emitted while the delay is being applied in the ReplayWithOriginalTiming operator, making it looks as if there was no initial delay +// Flux> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(1))).compose(new ReplayWithOriginalTiming<>(WrappedValue::value)); result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();