diff --git a/build.gradle b/build.gradle index 08131f3..3b9527d 100644 --- a/build.gradle +++ b/build.gradle @@ -31,6 +31,7 @@ dependencies { testCompile "org.junit.jupiter:junit-jupiter-api:5.2.0" testCompile "org.junit.jupiter:junit-jupiter-engine:5.2.0" testCompile "org.junit.platform:junit-platform-launcher:1.2.0" + testCompile "commons-io:commons-io:2.6" } test { diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java index 22c3dce..4ac4ccb 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java @@ -6,6 +6,7 @@ import java.util.function.Function; import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.publisher.Flux; +import reactor.util.annotation.NonNull; /** * A flux that can be used to replay historical values with different strategies. @@ -26,12 +27,12 @@ public class ReplayFlux extends Flux implements Scannable { } @Override - public void subscribe(CoreSubscriber actual) { + public void subscribe(@NonNull CoreSubscriber actual) { source.subscribe(actual); } @Override - public Object scanUnsafe(Attr attr) { + public Object scanUnsafe(@NonNull Attr attr) { return getScannable().scanUnsafe(attr); } @@ -44,7 +45,7 @@ public class ReplayFlux extends Flux implements Scannable { * (e.g. if the values were received with a 1 second interval, the returned flux will emit at a 1 second interval). */ public ReplayFlux withOriginalTiming(){ - return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor); + return new ReplayFlux<>(source.transform(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor); } /** @@ -52,7 +53,7 @@ public class ReplayFlux extends Flux implements Scannable { * (e.g. if the values were received with a 2 second interval, and the time acceleration is 2, then the returned flux will emit at a 1 second interval). */ public ReplayFlux withTimeAcceleration(double acceleration){ - return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor); + return new ReplayFlux<>(source.transform(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor); } /** @@ -67,6 +68,6 @@ public class ReplayFlux extends Flux implements Scannable { * @return a flux that will replay the values in a loop. */ public Flux> inLoop(Duration delayBeforeLoopRestart){ - return source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)); + return source.transform(new ReplayInLoop<>(delayBeforeLoopRestart)); } } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java index 78309c3..eeee5f6 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java @@ -3,6 +3,7 @@ package com.mgabriel.chronicle.flux.replay; import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.stream.LongStream; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -11,7 +12,6 @@ import reactor.core.publisher.Flux; * A transformer that takes a source flux and replays it in a loop. * The values are wrapped in a {@link ReplayValue} object to indicate when the loop restarts. * This information can be used by the application to perform some actions when the loop restarts (clear caches, etc.) - * * It is possible to specify a delay before each loop restart. * Please note that if you add other operators in the reactive stream after this transformer, you might not see the * impact of the restart delay since it will be applied as soon as items are requested on the subscription. @@ -27,33 +27,22 @@ public class ReplayInLoop implements Function, Publisher> apply(Flux source) { - Flux>> fluxLoop = Flux.create(sink -> { - while (!sink.isCancelled()) { - long requested = sink.requestedFromDownstream(); - if (requested > 0) { - sink.next(wrapValues(source)); - } else { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - //interrupt can happen when the flux is cancelled - Thread.currentThread().interrupt(); - } - } - } - } + Flux>> fluxLoop = Flux.create( + sink -> sink.onRequest(req -> LongStream.range(0, req).forEach(i -> sink.next(wrapValues(source)))) ); - return Flux.concat(fluxLoop.limitRate(1)); // limit the rate to avoid creating too many source flux in advance. + return Flux.concat(fluxLoop.limitRate(1) + ); // limit the rate to avoid creating too many source flux in advance. } private Flux> wrapValues(Flux source) { AtomicBoolean firstValueSent = new AtomicBoolean(false); - return source.delaySubscription(delayBeforeRestart) + return source + .delaySubscription(delayBeforeRestart) .map(wrapAsReplayValue(firstValueSent)); - } private Function> wrapAsReplayValue(AtomicBoolean firstValueSent) { + return val -> { if (!firstValueSent.getAndSet(true)) { return new ReplayValueImpl<>(true, val); diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java index 5c83b9e..ef2c5f3 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java @@ -2,7 +2,7 @@ package com.mgabriel.chronicle.flux.replay; /** * A value wrapper that indicates if the current value is the first value replayed in the replay loop. - * @see {@link ReplayInLoop} + * @see ReplayInLoop * @param data type */ public interface ReplayValue extends WrappedValue { diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java index f5cd529..486179d 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java @@ -10,12 +10,12 @@ public class ReplayValueImpl implements ReplayValue{ private final boolean isLoopReset; private final T value; - public ReplayValueImpl(T value) { + ReplayValueImpl(T value) { this.isLoopReset = false; this.value = value; } - public ReplayValueImpl(boolean isLoopReset, T value) { + ReplayValueImpl(boolean isLoopReset, T value) { this.isLoopReset = isLoopReset; this.value = value; } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java index b798320..33dd93d 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java @@ -69,9 +69,6 @@ public class ReplayWithOriginalTiming implements Function, Publisher< private final Timed second; private TimedValuePair(Timed first, Timed second) { - if (first == null || second == null) { - throw new IllegalArgumentException("values should not be null"); - } this.first = first; this.second = second; } @@ -79,5 +76,13 @@ public class ReplayWithOriginalTiming implements Function, Publisher< long timeDifference() { return second.time() - first.time(); } + + @Override + public String toString() { + return "TimedValuePair{" + + "first=" + first + + ", second=" + second + + '}'; + } } } diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java deleted file mode 100644 index fcccff8..0000000 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.mgabriel.chronicle.flux.replay; - -/** - * Wraps a {@link ReplayValue} with its timestamp. - * - * @param data type - */ -interface TimedReplayValue extends Timed, ReplayValue{ -} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java index a25fc00..588e2e0 100644 --- a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java @@ -44,6 +44,9 @@ class TimedValue implements Timed { @Override public String toString() { - return super.toString(); + return "TimedValue{" + + "time=" + time + + ", value=" + value + + '}'; } } diff --git a/src/test/java/com/mgabriel/chronicle/flux/ChronicleStoreDemo.java b/src/test/java/com/mgabriel/chronicle/flux/ChronicleStoreDemo.java deleted file mode 100644 index 81a2039..0000000 --- a/src/test/java/com/mgabriel/chronicle/flux/ChronicleStoreDemo.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.mgabriel.chronicle.flux; - -import java.time.Duration; -import java.time.Instant; - -import reactor.core.Disposable; -import reactor.core.publisher.Flux; - -/** - * @author mgabriel. - */ -public class ChronicleStoreDemo { - - public static void main(String[] args) throws InterruptedException { - ChronicleStore store = new ChronicleStore<>("demoChronicleStore", v -> v.toBinary(), DummyObject::fromBinary); - - Flux source = Flux.just("one", "two", "three") - .concatWith(Flux.just("four").delayElements(Duration.ofSeconds(1))) - .concatWith(Flux.just("five").delayElements(Duration.ofSeconds(2))); - - //source.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast(); - - // Disposable handle = store.store(source.map(v -> new DummyObject(System.currentTimeMillis(), v))); - - Thread.sleep(4000); - - store.replayHistory(DummyObject::getTimestamp) - .withOriginalTiming() - .inLoop(Duration.ofSeconds(1)) - - .doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast(); - } -} diff --git a/src/test/java/com/mgabriel/chronicle/flux/DummyObject.java b/src/test/java/com/mgabriel/chronicle/flux/DummyObject.java index afc752b..d43f1df 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/DummyObject.java +++ b/src/test/java/com/mgabriel/chronicle/flux/DummyObject.java @@ -6,6 +6,10 @@ import java.util.Objects; import com.google.common.primitives.Longs; /** + * A dummy test object that can be serialized / deserialized as binary. + * + * Please note that for production it makes more sense to rely on a binary protocol such as Protof, Avro, etc. + * * @author mgabriel. */ public class DummyObject { @@ -19,15 +23,23 @@ public class DummyObject { this.value = value; } - public String getValue() { + public static DummyObject fromBinary(byte[] bytes) { + byte[] time = new byte[8]; + byte[] val = new byte[bytes.length - 8]; + System.arraycopy(bytes, 0, time, 0, time.length); + System.arraycopy(bytes, 8, val, 0, val.length); + return new DummyObject(Longs.fromByteArray(time), new String(val, UTF_8)); + } + + public String value() { return value; } - public long getTimestamp() { + public long timestamp() { return timestamp; } - public byte[] toBinary(){ + public byte[] toBinary() { byte[] time = Longs.toByteArray(timestamp); byte[] val = value.getBytes(UTF_8); byte[] result = new byte[time.length + val.length]; @@ -36,15 +48,11 @@ public class DummyObject { return result; } - public static DummyObject fromBinary(byte[] bytes){ - byte[] time = new byte[8]; - byte[] val = new byte[bytes.length - 8]; - System.arraycopy(bytes, 0, time, 0, time.length); - System.arraycopy(bytes, 8, val, 0, val.length); - return new DummyObject(Longs.fromByteArray(time), new String(val, UTF_8)); + @Override + public int hashCode() { + return Objects.hash(value); } - @Override public boolean equals(Object o) { if (this == o) @@ -55,11 +63,6 @@ public class DummyObject { return Objects.equals(value, that.value); } - @Override - public int hashCode() { - return Objects.hash(value); - } - @Override public String toString() { return "DummyObject{" + diff --git a/src/test/java/com/mgabriel/chronicle/flux/demo/ChronicleStoreDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ChronicleStoreDemo.java new file mode 100644 index 0000000..f6ffe75 --- /dev/null +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ChronicleStoreDemo.java @@ -0,0 +1,61 @@ +package com.mgabriel.chronicle.flux.demo; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; + +import com.mgabriel.chronicle.flux.ChronicleStore; +import com.mgabriel.chronicle.flux.DummyObject; +import org.apache.commons.io.FileUtils; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +/** + * @author mgabriel. + */ +public class ChronicleStoreDemo { + + private static final String PATH = "demoChronicleStore"; + + public static void main(String[] args) throws InterruptedException { + + deleteStoreIfItExists(); + + ChronicleStore store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary); + + Flux source = Flux.just("one", "two", "three") + .concatWith(Flux.just("four").delayElements(Duration.ofSeconds(1))) + .concatWith(Flux.just("five").delayElements(Duration.ofSeconds(2))); + + System.out.println("Storing source flux..."); + Disposable handle = store.store(source.map(v -> new DummyObject(System.currentTimeMillis(), v))); + + SECONDS.sleep(4); //wait until all items are stored + handle.dispose(); + + System.out.println("Storage achieved, replaying from store"); + + store.replayHistory(DummyObject::timestamp) + .withOriginalTiming() + .inLoop(Duration.ofSeconds(1)) + .doOnNext(i -> System.out.println(Instant.now() + " " + i)) + .blockLast(); + } + + private static void deleteStoreIfItExists() { + File directory = new File("."); + try { + String FULL_PATH = directory.getCanonicalPath() + File.separator + PATH; + File storePath = new File(FULL_PATH); + if (storePath.exists()) { + FileUtils.deleteDirectory(storePath); + System.out.println("Deleted existing store"); + } + } catch (IOException e) { + System.err.println("Error while deleting store"); + } + } +} diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayFluxDemo.java similarity index 84% rename from src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxDemo.java rename to src/test/java/com/mgabriel/chronicle/flux/demo/ReplayFluxDemo.java index 0e9aa91..6380513 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayFluxDemo.java @@ -1,9 +1,10 @@ -package com.mgabriel.chronicle.flux.replay; +package com.mgabriel.chronicle.flux.demo; import static java.time.Duration.ofSeconds; import java.time.Instant; +import com.mgabriel.chronicle.flux.replay.ReplayFlux; import reactor.core.publisher.Flux; public class ReplayFluxDemo { diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayLoopDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayLoopDemo.java similarity index 73% rename from src/test/java/com/mgabriel/chronicle/flux/replay/ReplayLoopDemo.java rename to src/test/java/com/mgabriel/chronicle/flux/demo/ReplayLoopDemo.java index 33c7016..787b5e6 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayLoopDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayLoopDemo.java @@ -1,8 +1,10 @@ -package com.mgabriel.chronicle.flux.replay; +package com.mgabriel.chronicle.flux.demo; import java.time.Duration; import java.time.Instant; +import com.mgabriel.chronicle.flux.replay.ReplayInLoop; +import com.mgabriel.chronicle.flux.replay.ReplayValue; import reactor.core.publisher.Flux; public class ReplayLoopDemo { diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayWithOriginalTimingDemo.java similarity index 80% rename from src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java rename to src/test/java/com/mgabriel/chronicle/flux/demo/ReplayWithOriginalTimingDemo.java index c87b4c5..ea2a591 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/ReplayWithOriginalTimingDemo.java @@ -1,8 +1,11 @@ -package com.mgabriel.chronicle.flux.replay; +package com.mgabriel.chronicle.flux.demo; import java.time.Duration; import java.time.Instant; +import com.mgabriel.chronicle.flux.replay.ReplayInLoop; +import com.mgabriel.chronicle.flux.replay.ReplayValue; +import com.mgabriel.chronicle.flux.replay.ReplayWithOriginalTiming; import reactor.core.publisher.Flux; public class ReplayWithOriginalTimingDemo { diff --git a/src/test/java/com/mgabriel/chronicle/flux/TryFluxRequest.java b/src/test/java/com/mgabriel/chronicle/flux/demo/TryFluxRequest.java similarity index 97% rename from src/test/java/com/mgabriel/chronicle/flux/TryFluxRequest.java rename to src/test/java/com/mgabriel/chronicle/flux/demo/TryFluxRequest.java index c9073d3..0032d4f 100644 --- a/src/test/java/com/mgabriel/chronicle/flux/TryFluxRequest.java +++ b/src/test/java/com/mgabriel/chronicle/flux/demo/TryFluxRequest.java @@ -1,4 +1,4 @@ -package com.mgabriel.chronicle.flux; +package com.mgabriel.chronicle.flux.demo; import reactor.core.Disposable; import reactor.core.publisher.Flux; diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxTest.java b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxTest.java new file mode 100644 index 0000000..8da3855 --- /dev/null +++ b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayFluxTest.java @@ -0,0 +1,146 @@ +package com.mgabriel.chronicle.flux.replay; + +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofSeconds; + +import java.time.Duration; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import com.mgabriel.chronicle.flux.DummyObject; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +/** + * @author mgabriel. + */ +@SuppressWarnings("javadoc") +class ReplayFluxTest { + + private static final String ONE = "one"; + private static final String TWO = "two"; + private static final String THREE = "three"; + private static final String FOUR = "four"; + private static final Duration ONE_SECOND = ofSeconds(1); + private static final Duration TWO_SECONDS = ofSeconds(2); + private static final Duration THREE_SECONDS = ofSeconds(3); + private static final Duration MILLIS_500 = ofMillis(500); + + private static Flux source = Flux.just(new DummyObject(10000, ONE), + new DummyObject(11000, TWO), + new DummyObject(12000, THREE), + new DummyObject(15000, FOUR) + ); + + private static ReplayFlux replayFlux = new ReplayFlux<>(source, DummyObject::timestamp); + + @BeforeEach + void setUp() { + } + + @Test + @DisplayName("tests that the flux is replayed with the original timing") + void shouldRespectOriginalTiming() { + StepVerifier.withVirtualTime(() -> replayFlux.withOriginalTiming()) + .expectSubscription() + .assertNext(i -> Assertions.assertEquals(ONE, i.value())) + .expectNoEvent(ONE_SECOND) + .assertNext(i -> Assertions.assertEquals(TWO, i.value())) + .expectNoEvent(ONE_SECOND) + .assertNext(i -> Assertions.assertEquals(THREE, i.value())) + .expectNoEvent(THREE_SECONDS) + .assertNext(i -> Assertions.assertEquals(FOUR, i.value())) + .expectComplete() + .verify(ofMillis(500)); + } + + @Test + @DisplayName("tests that the flux is replayed with a time acceleration over the original timing") + void shouldReplayWithTimeAcceleration() { + StepVerifier.withVirtualTime(() -> replayFlux.withTimeAcceleration(2)) + .expectSubscription() + .assertNext(i -> Assertions.assertEquals(ONE, i.value())) + .expectNoEvent(MILLIS_500) + .assertNext(i -> Assertions.assertEquals(TWO, i.value())) + .expectNoEvent(MILLIS_500) + .assertNext(i -> Assertions.assertEquals(THREE, i.value())) + .expectNoEvent(ONE_SECOND.plus(MILLIS_500)) + .assertNext(i -> Assertions.assertEquals(FOUR, i.value())) + .expectComplete() + .verify(ofMillis(500)); + } + + @Test + @DisplayName("tests that the flux is replayed with a time deceleration over the original timing") + void shouldReplayWithTimeDeceleration() { + StepVerifier.withVirtualTime(() -> replayFlux.withTimeAcceleration(0.5)) + .expectSubscription() + .assertNext(i -> Assertions.assertEquals(ONE, i.value())) + .expectNoEvent(TWO_SECONDS) + .assertNext(i -> Assertions.assertEquals(TWO, i.value())) + .expectNoEvent(TWO_SECONDS) + .assertNext(i -> Assertions.assertEquals(THREE, i.value())) + .expectNoEvent(ofSeconds(6)) + .assertNext(i -> Assertions.assertEquals(FOUR, i.value())) + .expectComplete() + .verify(ofMillis(500)); + } + + @Test + @DisplayName("tests that the flux is replayed in a loop") + void shouldReplayInLoop() { + StepVerifier.create(replayFlux.inLoop()) + .expectSubscription() + .assertNext(i -> { + Assertions.assertEquals(ONE, i.value().value()); + Assertions.assertTrue(i.isLoopRestart()); + }) + .assertNext(assertValue(TWO)) + .assertNext(assertValue(THREE)) + .assertNext(assertValue(FOUR)) + .assertNext(i -> { + Assertions.assertEquals(ONE, i.value().value()); + Assertions.assertTrue(i.isLoopRestart()); + }) + .assertNext(assertValue(TWO)) + .assertNext(assertValue(THREE)) + .assertNext(assertValue(FOUR)) + .thenCancel() + .verify(ofMillis(500)); + } + + private static Consumer> assertValue(String expected) { + return i -> Assertions.assertEquals(expected, i.value().value()); + } + + @Test + @DisplayName("tests that the flux is replayed in a loop with a delay before each loop restart") + void shouldReplayInLoopWithRestartDelay() { + StepVerifier.withVirtualTime(() -> replayFlux.inLoop(TWO_SECONDS)) + .expectSubscription() + .expectNoEvent(TWO_SECONDS) + .assertNext(assertLoopRestart()) + .assertNext(assertValue(TWO)) + .assertNext(assertValue(THREE)) + .assertNext(assertValue(FOUR)) + .expectNoEvent(TWO_SECONDS) + .assertNext(assertLoopRestart()) + .assertNext(assertValue(TWO)) + .assertNext(assertValue(THREE)) + .assertNext(assertValue(FOUR)) + .expectNoEvent(TWO_SECONDS) + .thenCancel() + .verify(ofMillis(500)); + } + + private static Consumer> assertLoopRestart() { + return i -> { + Assertions.assertEquals(ONE, i.value().value()); + Assertions.assertTrue(i.isLoopRestart()); + }; + } +} \ No newline at end of file diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayValueImplTest.java b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayValueImplTest.java new file mode 100644 index 0000000..dc53479 --- /dev/null +++ b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayValueImplTest.java @@ -0,0 +1,36 @@ +package com.mgabriel.chronicle.flux.replay; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class ReplayValueImplTest { + + @Test + @DisplayName("test equals and hashcode") + void testDataClass() { + String value = "testValue"; + ReplayValue first = new ReplayValueImpl<>(true, value); + ReplayValue second = new ReplayValueImpl<>(true, value); + ReplayValue third = new ReplayValueImpl<>(false, value); + assertEquals(first, second); + assertEquals(first, first); + assertNotEquals(first, value); + assertNotEquals(first, third); + assertEquals(first.hashCode(), second.hashCode()); + } + + @Test + @DisplayName("test equals and hashcode with null value") + void testDataClassWithNullValue() { + ReplayValue first = new ReplayValueImpl<>(true, null); + ReplayValue second = new ReplayValueImpl<>(true, null); + ReplayValue third = new ReplayValueImpl<>(false, null); + assertEquals(first, second); + assertEquals(first, first); + assertNotEquals(first, third); + assertEquals(first.hashCode(), second.hashCode()); + } + +} \ No newline at end of file diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/TimedValueTest.java b/src/test/java/com/mgabriel/chronicle/flux/replay/TimedValueTest.java new file mode 100644 index 0000000..aabf35a --- /dev/null +++ b/src/test/java/com/mgabriel/chronicle/flux/replay/TimedValueTest.java @@ -0,0 +1,35 @@ +package com.mgabriel.chronicle.flux.replay; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class TimedValueTest { + + private static final long TIME = 42; + + @Test + @DisplayName("test equals and hashcode") + void testDataClass() { + String value = "testValue"; + TimedValue first = new TimedValue<>(TIME, value); + TimedValue second = new TimedValue<>(TIME, value); + assertEquals(first, second); + assertEquals(first, first); + assertNotEquals(first, TIME); + assertEquals(first.hashCode(), second.hashCode()); + } + + @Test + @DisplayName("test equals and hashcode with null value") + void testDataClassWithNullValue() { + TimedValue first = new TimedValue<>(TIME, null); + TimedValue second = new TimedValue<>(TIME, null); + assertEquals(first, second); + assertEquals(first, first); + assertEquals(first.hashCode(), second.hashCode()); + } + +} \ No newline at end of file