diff --git a/src/test/java/com/mgabriel/chronicle/flux/demo/ChronicleStoreDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ChronicleStoreDemo.java index f6ffe75..e7e4cab 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/demo/ChronicleStoreDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ChronicleStoreDemo.java @@ -14,6 +14,9 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; /** + * Simple demo showing how to store values in a Chronicle store and replay them in a loop with the same timing as the + * original values + * * @author mgabriel. */ public class ChronicleStoreDemo { @@ -21,7 +24,6 @@ public class ChronicleStoreDemo { private static final String PATH = "demoChronicleStore"; public static void main(String[] args) throws InterruptedException { - deleteStoreIfItExists(); ChronicleStore store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary); diff --git a/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayFluxDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayFluxDemo.java index 6380513..89f1a8a 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayFluxDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayFluxDemo.java @@ -12,10 +12,9 @@ public class ReplayFluxDemo { public static void main(String[] args) { Flux source = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L); - ReplayFlux replayFlux = new ReplayFlux<>(source, v -> v); - - replayFlux.withTimeAcceleration(2).inLoop(ofSeconds(1)) + replayFlux.withTimeAcceleration(2) + .inLoop(ofSeconds(1)) .doOnNext(i -> System.out.println(Instant.now() + " " + i)) .blockLast(); diff --git a/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayLoopDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayLoopDemo.java index 787b5e6..d81089f 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayLoopDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayLoopDemo.java @@ -1,6 +1,7 @@ package com.mgabriel.chronicle.flux.demo; -import java.time.Duration; +import static java.time.Duration.ofSeconds; + import java.time.Instant; import com.mgabriel.chronicle.flux.replay.ReplayInLoop; @@ -10,12 +11,10 @@ import reactor.core.publisher.Flux; public class ReplayLoopDemo { public static void main(String[] args) { - - Flux just = Flux.just(0L, 1L, 2L, 3L, 4L, 5L); - - Flux> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(2))); - - result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast(); + Flux source = Flux.just(0L, 1L, 2L, 3L, 4L, 5L); + Flux> result = source.transform(new ReplayInLoop<>(ofSeconds(2))); + result.doOnNext(i -> System.out.println(Instant.now() + " " + i)) + .blockLast(); } diff --git a/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayWithOriginalTimingDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayWithOriginalTimingDemo.java index ea2a591..ae53912 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayWithOriginalTimingDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayWithOriginalTimingDemo.java @@ -1,27 +1,16 @@ package com.mgabriel.chronicle.flux.demo; -import java.time.Duration; import java.time.Instant; -import com.mgabriel.chronicle.flux.replay.ReplayInLoop; -import com.mgabriel.chronicle.flux.replay.ReplayValue; import com.mgabriel.chronicle.flux.replay.ReplayWithOriginalTiming; import reactor.core.publisher.Flux; public class ReplayWithOriginalTimingDemo { public static void main(String[] args) { - - Flux just = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L); - - Flux> result = just.compose(new ReplayWithOriginalTiming<>(l -> l)).compose(new ReplayInLoop<>(Duration.ofSeconds(1))); - - // In this order the delay of the ReplayInLoop operator is not working because the elements in the replayInLoop - // are emitted while the delay is being applied in the ReplayWithOriginalTiming operator, making it looks as if there was no initial delay -// Flux> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(1))).compose(new ReplayWithOriginalTiming<>(WrappedValue::value)); - + Flux source = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L); + Flux result = source.transform(new ReplayWithOriginalTiming<>(l -> l)); result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast(); - } } diff --git a/src/test/java/com/mgabriel/chronicle/flux/demo/TryFluxRequest.java b/src/test/java/com/mgabriel/chronicle/flux/demo/TryFluxRequest.java deleted file mode 100644 index 0032d4f..0000000 --- a/src/test/java/com/mgabriel/chronicle/flux/demo/TryFluxRequest.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.mgabriel.chronicle.flux.demo; - -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; - -/** - * @author mgabriel. - */ -public class TryFluxRequest { - - public static void main(String[] args) throws Exception { - - Flux source = Flux.create(sink -> { - int i = 0; - while (!sink.isCancelled()) { - if (sink.requestedFromDownstream() > 0) { - sink.next("" + i++); - if(i == 7){ - sink.complete(); - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } else { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - System.out.println("interrupted"); - } - } - } - System.out.println("CANCELLED!!!!!!!!!!!!!"); - } - ); - - System.out.println("max val =" + Long.MAX_VALUE); - Disposable sub = source.doOnRequest(r -> System.out.println("~~~~~ requested " + r)) - //.take(20) - .doOnNext(i -> System.out.println(i)) - .subscribeOn(Schedulers.newSingle("lpop")) - .subscribe(); - - Thread.sleep(10_000); - sub.dispose(); - System.out.println("disposed"); - } -}