Add unit tests

This commit is contained in:
mgabriel 2018-08-15 11:32:59 +02:00
parent 09de60e24c
commit e504f03e31
18 changed files with 336 additions and 92 deletions

View File

@ -31,6 +31,7 @@ dependencies {
testCompile "org.junit.jupiter:junit-jupiter-api:5.2.0"
testCompile "org.junit.jupiter:junit-jupiter-engine:5.2.0"
testCompile "org.junit.platform:junit-platform-launcher:1.2.0"
testCompile "commons-io:commons-io:2.6"
}
test {

View File

@ -6,6 +6,7 @@ import java.util.function.Function;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.util.annotation.NonNull;
/**
* A flux that can be used to replay historical values with different strategies.
@ -26,12 +27,12 @@ public class ReplayFlux<T> extends Flux<T> implements Scannable {
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
public void subscribe(@NonNull CoreSubscriber<? super T> actual) {
source.subscribe(actual);
}
@Override
public Object scanUnsafe(Attr attr) {
public Object scanUnsafe(@NonNull Attr attr) {
return getScannable().scanUnsafe(attr);
}
@ -44,7 +45,7 @@ public class ReplayFlux<T> extends Flux<T> implements Scannable {
* (e.g. if the values were received with a 1 second interval, the returned flux will emit at a 1 second interval).
*/
public ReplayFlux<T> withOriginalTiming(){
return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor);
return new ReplayFlux<>(source.transform(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor);
}
/**
@ -52,7 +53,7 @@ public class ReplayFlux<T> extends Flux<T> implements Scannable {
* (e.g. if the values were received with a 2 second interval, and the time acceleration is 2, then the returned flux will emit at a 1 second interval).
*/
public ReplayFlux<T> withTimeAcceleration(double acceleration){
return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor);
return new ReplayFlux<>(source.transform(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor);
}
/**
@ -67,6 +68,6 @@ public class ReplayFlux<T> extends Flux<T> implements Scannable {
* @return a flux that will replay the values in a loop.
*/
public Flux<ReplayValue<T>> inLoop(Duration delayBeforeLoopRestart){
return source.compose(new ReplayInLoop<>(delayBeforeLoopRestart));
return source.transform(new ReplayInLoop<>(delayBeforeLoopRestart));
}
}

View File

@ -3,6 +3,7 @@ package com.mgabriel.chronicle.flux.replay;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.LongStream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@ -11,7 +12,6 @@ import reactor.core.publisher.Flux;
* A transformer that takes a source flux and replays it in a loop.
* The values are wrapped in a {@link ReplayValue} object to indicate when the loop restarts.
* This information can be used by the application to perform some actions when the loop restarts (clear caches, etc.)
*
* It is possible to specify a delay before each loop restart.
* Please note that if you add other operators in the reactive stream after this transformer, you might not see the
* impact of the restart delay since it will be applied as soon as items are requested on the subscription.
@ -27,33 +27,22 @@ public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<
@Override
public Publisher<ReplayValue<T>> apply(Flux<T> source) {
Flux<Flux<ReplayValue<T>>> fluxLoop = Flux.create(sink -> {
while (!sink.isCancelled()) {
long requested = sink.requestedFromDownstream();
if (requested > 0) {
sink.next(wrapValues(source));
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
//interrupt can happen when the flux is cancelled
Thread.currentThread().interrupt();
}
}
}
}
Flux<Flux<ReplayValue<T>>> fluxLoop = Flux.create(
sink -> sink.onRequest(req -> LongStream.range(0, req).forEach(i -> sink.next(wrapValues(source))))
);
return Flux.concat(fluxLoop.limitRate(1)); // limit the rate to avoid creating too many source flux in advance.
return Flux.concat(fluxLoop.limitRate(1)
); // limit the rate to avoid creating too many source flux in advance.
}
private Flux<ReplayValue<T>> wrapValues(Flux<T> source) {
AtomicBoolean firstValueSent = new AtomicBoolean(false);
return source.delaySubscription(delayBeforeRestart)
return source
.delaySubscription(delayBeforeRestart)
.map(wrapAsReplayValue(firstValueSent));
}
private Function<T, ReplayValue<T>> wrapAsReplayValue(AtomicBoolean firstValueSent) {
return val -> {
if (!firstValueSent.getAndSet(true)) {
return new ReplayValueImpl<>(true, val);

View File

@ -2,7 +2,7 @@ package com.mgabriel.chronicle.flux.replay;
/**
* A value wrapper that indicates if the current value is the first value replayed in the replay loop.
* @see {@link ReplayInLoop}
* @see ReplayInLoop
* @param <T> data type
*/
public interface ReplayValue<T> extends WrappedValue<T> {

View File

@ -10,12 +10,12 @@ public class ReplayValueImpl<T> implements ReplayValue<T>{
private final boolean isLoopReset;
private final T value;
public ReplayValueImpl(T value) {
ReplayValueImpl(T value) {
this.isLoopReset = false;
this.value = value;
}
public ReplayValueImpl(boolean isLoopReset, T value) {
ReplayValueImpl(boolean isLoopReset, T value) {
this.isLoopReset = isLoopReset;
this.value = value;
}

View File

@ -69,9 +69,6 @@ public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<
private final Timed<T> second;
private TimedValuePair(Timed<T> first, Timed<T> second) {
if (first == null || second == null) {
throw new IllegalArgumentException("values should not be null");
}
this.first = first;
this.second = second;
}
@ -79,5 +76,13 @@ public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<
long timeDifference() {
return second.time() - first.time();
}
@Override
public String toString() {
return "TimedValuePair{" +
"first=" + first +
", second=" + second +
'}';
}
}
}

View File

@ -1,9 +0,0 @@
package com.mgabriel.chronicle.flux.replay;
/**
* Wraps a {@link ReplayValue} with its timestamp.
*
* @param <T> data type
*/
interface TimedReplayValue<T> extends Timed<T>, ReplayValue<T>{
}

View File

@ -44,6 +44,9 @@ class TimedValue<T> implements Timed<T> {
@Override
public String toString() {
return super.toString();
return "TimedValue{" +
"time=" + time +
", value=" + value +
'}';
}
}

View File

@ -1,33 +0,0 @@
package com.mgabriel.chronicle.flux;
import java.time.Duration;
import java.time.Instant;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
/**
* @author mgabriel.
*/
public class ChronicleStoreDemo {
public static void main(String[] args) throws InterruptedException {
ChronicleStore<DummyObject> store = new ChronicleStore<>("demoChronicleStore", v -> v.toBinary(), DummyObject::fromBinary);
Flux<String> source = Flux.just("one", "two", "three")
.concatWith(Flux.just("four").delayElements(Duration.ofSeconds(1)))
.concatWith(Flux.just("five").delayElements(Duration.ofSeconds(2)));
//source.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();
// Disposable handle = store.store(source.map(v -> new DummyObject(System.currentTimeMillis(), v)));
Thread.sleep(4000);
store.replayHistory(DummyObject::getTimestamp)
.withOriginalTiming()
.inLoop(Duration.ofSeconds(1))
.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();
}
}

View File

@ -6,6 +6,10 @@ import java.util.Objects;
import com.google.common.primitives.Longs;
/**
* A dummy test object that can be serialized / deserialized as binary.
*
* Please note that for production it makes more sense to rely on a binary protocol such as Protof, Avro, etc.
*
* @author mgabriel.
*/
public class DummyObject {
@ -19,15 +23,23 @@ public class DummyObject {
this.value = value;
}
public String getValue() {
public static DummyObject fromBinary(byte[] bytes) {
byte[] time = new byte[8];
byte[] val = new byte[bytes.length - 8];
System.arraycopy(bytes, 0, time, 0, time.length);
System.arraycopy(bytes, 8, val, 0, val.length);
return new DummyObject(Longs.fromByteArray(time), new String(val, UTF_8));
}
public String value() {
return value;
}
public long getTimestamp() {
public long timestamp() {
return timestamp;
}
public byte[] toBinary(){
public byte[] toBinary() {
byte[] time = Longs.toByteArray(timestamp);
byte[] val = value.getBytes(UTF_8);
byte[] result = new byte[time.length + val.length];
@ -36,15 +48,11 @@ public class DummyObject {
return result;
}
public static DummyObject fromBinary(byte[] bytes){
byte[] time = new byte[8];
byte[] val = new byte[bytes.length - 8];
System.arraycopy(bytes, 0, time, 0, time.length);
System.arraycopy(bytes, 8, val, 0, val.length);
return new DummyObject(Longs.fromByteArray(time), new String(val, UTF_8));
@Override
public int hashCode() {
return Objects.hash(value);
}
@Override
public boolean equals(Object o) {
if (this == o)
@ -55,11 +63,6 @@ public class DummyObject {
return Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
@Override
public String toString() {
return "DummyObject{" +

View File

@ -0,0 +1,61 @@
package com.mgabriel.chronicle.flux.demo;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import com.mgabriel.chronicle.flux.ChronicleStore;
import com.mgabriel.chronicle.flux.DummyObject;
import org.apache.commons.io.FileUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
/**
* @author mgabriel.
*/
public class ChronicleStoreDemo {
private static final String PATH = "demoChronicleStore";
public static void main(String[] args) throws InterruptedException {
deleteStoreIfItExists();
ChronicleStore<DummyObject> store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary);
Flux<String> source = Flux.just("one", "two", "three")
.concatWith(Flux.just("four").delayElements(Duration.ofSeconds(1)))
.concatWith(Flux.just("five").delayElements(Duration.ofSeconds(2)));
System.out.println("Storing source flux...");
Disposable handle = store.store(source.map(v -> new DummyObject(System.currentTimeMillis(), v)));
SECONDS.sleep(4); //wait until all items are stored
handle.dispose();
System.out.println("Storage achieved, replaying from store");
store.replayHistory(DummyObject::timestamp)
.withOriginalTiming()
.inLoop(Duration.ofSeconds(1))
.doOnNext(i -> System.out.println(Instant.now() + " " + i))
.blockLast();
}
private static void deleteStoreIfItExists() {
File directory = new File(".");
try {
String FULL_PATH = directory.getCanonicalPath() + File.separator + PATH;
File storePath = new File(FULL_PATH);
if (storePath.exists()) {
FileUtils.deleteDirectory(storePath);
System.out.println("Deleted existing store");
}
} catch (IOException e) {
System.err.println("Error while deleting store");
}
}
}

View File

@ -1,9 +1,10 @@
package com.mgabriel.chronicle.flux.replay;
package com.mgabriel.chronicle.flux.demo;
import static java.time.Duration.ofSeconds;
import java.time.Instant;
import com.mgabriel.chronicle.flux.replay.ReplayFlux;
import reactor.core.publisher.Flux;
public class ReplayFluxDemo {

View File

@ -1,8 +1,10 @@
package com.mgabriel.chronicle.flux.replay;
package com.mgabriel.chronicle.flux.demo;
import java.time.Duration;
import java.time.Instant;
import com.mgabriel.chronicle.flux.replay.ReplayInLoop;
import com.mgabriel.chronicle.flux.replay.ReplayValue;
import reactor.core.publisher.Flux;
public class ReplayLoopDemo {

View File

@ -1,8 +1,11 @@
package com.mgabriel.chronicle.flux.replay;
package com.mgabriel.chronicle.flux.demo;
import java.time.Duration;
import java.time.Instant;
import com.mgabriel.chronicle.flux.replay.ReplayInLoop;
import com.mgabriel.chronicle.flux.replay.ReplayValue;
import com.mgabriel.chronicle.flux.replay.ReplayWithOriginalTiming;
import reactor.core.publisher.Flux;
public class ReplayWithOriginalTimingDemo {

View File

@ -1,4 +1,4 @@
package com.mgabriel.chronicle.flux;
package com.mgabriel.chronicle.flux.demo;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

View File

@ -0,0 +1,146 @@
package com.mgabriel.chronicle.flux.replay;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import java.time.Duration;
import java.util.function.Consumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import com.mgabriel.chronicle.flux.DummyObject;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
/**
* @author mgabriel.
*/
@SuppressWarnings("javadoc")
class ReplayFluxTest {
private static final String ONE = "one";
private static final String TWO = "two";
private static final String THREE = "three";
private static final String FOUR = "four";
private static final Duration ONE_SECOND = ofSeconds(1);
private static final Duration TWO_SECONDS = ofSeconds(2);
private static final Duration THREE_SECONDS = ofSeconds(3);
private static final Duration MILLIS_500 = ofMillis(500);
private static Flux<DummyObject> source = Flux.just(new DummyObject(10000, ONE),
new DummyObject(11000, TWO),
new DummyObject(12000, THREE),
new DummyObject(15000, FOUR)
);
private static ReplayFlux<DummyObject> replayFlux = new ReplayFlux<>(source, DummyObject::timestamp);
@BeforeEach
void setUp() {
}
@Test
@DisplayName("tests that the flux is replayed with the original timing")
void shouldRespectOriginalTiming() {
StepVerifier.withVirtualTime(() -> replayFlux.withOriginalTiming())
.expectSubscription()
.assertNext(i -> Assertions.assertEquals(ONE, i.value()))
.expectNoEvent(ONE_SECOND)
.assertNext(i -> Assertions.assertEquals(TWO, i.value()))
.expectNoEvent(ONE_SECOND)
.assertNext(i -> Assertions.assertEquals(THREE, i.value()))
.expectNoEvent(THREE_SECONDS)
.assertNext(i -> Assertions.assertEquals(FOUR, i.value()))
.expectComplete()
.verify(ofMillis(500));
}
@Test
@DisplayName("tests that the flux is replayed with a time acceleration over the original timing")
void shouldReplayWithTimeAcceleration() {
StepVerifier.withVirtualTime(() -> replayFlux.withTimeAcceleration(2))
.expectSubscription()
.assertNext(i -> Assertions.assertEquals(ONE, i.value()))
.expectNoEvent(MILLIS_500)
.assertNext(i -> Assertions.assertEquals(TWO, i.value()))
.expectNoEvent(MILLIS_500)
.assertNext(i -> Assertions.assertEquals(THREE, i.value()))
.expectNoEvent(ONE_SECOND.plus(MILLIS_500))
.assertNext(i -> Assertions.assertEquals(FOUR, i.value()))
.expectComplete()
.verify(ofMillis(500));
}
@Test
@DisplayName("tests that the flux is replayed with a time deceleration over the original timing")
void shouldReplayWithTimeDeceleration() {
StepVerifier.withVirtualTime(() -> replayFlux.withTimeAcceleration(0.5))
.expectSubscription()
.assertNext(i -> Assertions.assertEquals(ONE, i.value()))
.expectNoEvent(TWO_SECONDS)
.assertNext(i -> Assertions.assertEquals(TWO, i.value()))
.expectNoEvent(TWO_SECONDS)
.assertNext(i -> Assertions.assertEquals(THREE, i.value()))
.expectNoEvent(ofSeconds(6))
.assertNext(i -> Assertions.assertEquals(FOUR, i.value()))
.expectComplete()
.verify(ofMillis(500));
}
@Test
@DisplayName("tests that the flux is replayed in a loop")
void shouldReplayInLoop() {
StepVerifier.create(replayFlux.inLoop())
.expectSubscription()
.assertNext(i -> {
Assertions.assertEquals(ONE, i.value().value());
Assertions.assertTrue(i.isLoopRestart());
})
.assertNext(assertValue(TWO))
.assertNext(assertValue(THREE))
.assertNext(assertValue(FOUR))
.assertNext(i -> {
Assertions.assertEquals(ONE, i.value().value());
Assertions.assertTrue(i.isLoopRestart());
})
.assertNext(assertValue(TWO))
.assertNext(assertValue(THREE))
.assertNext(assertValue(FOUR))
.thenCancel()
.verify(ofMillis(500));
}
private static Consumer<ReplayValue<DummyObject>> assertValue(String expected) {
return i -> Assertions.assertEquals(expected, i.value().value());
}
@Test
@DisplayName("tests that the flux is replayed in a loop with a delay before each loop restart")
void shouldReplayInLoopWithRestartDelay() {
StepVerifier.withVirtualTime(() -> replayFlux.inLoop(TWO_SECONDS))
.expectSubscription()
.expectNoEvent(TWO_SECONDS)
.assertNext(assertLoopRestart())
.assertNext(assertValue(TWO))
.assertNext(assertValue(THREE))
.assertNext(assertValue(FOUR))
.expectNoEvent(TWO_SECONDS)
.assertNext(assertLoopRestart())
.assertNext(assertValue(TWO))
.assertNext(assertValue(THREE))
.assertNext(assertValue(FOUR))
.expectNoEvent(TWO_SECONDS)
.thenCancel()
.verify(ofMillis(500));
}
private static Consumer<ReplayValue<DummyObject>> assertLoopRestart() {
return i -> {
Assertions.assertEquals(ONE, i.value().value());
Assertions.assertTrue(i.isLoopRestart());
};
}
}

View File

@ -0,0 +1,36 @@
package com.mgabriel.chronicle.flux.replay;
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
class ReplayValueImplTest {
@Test
@DisplayName("test equals and hashcode")
void testDataClass() {
String value = "testValue";
ReplayValue<String> first = new ReplayValueImpl<>(true, value);
ReplayValue<String> second = new ReplayValueImpl<>(true, value);
ReplayValue<String> third = new ReplayValueImpl<>(false, value);
assertEquals(first, second);
assertEquals(first, first);
assertNotEquals(first, value);
assertNotEquals(first, third);
assertEquals(first.hashCode(), second.hashCode());
}
@Test
@DisplayName("test equals and hashcode with null value")
void testDataClassWithNullValue() {
ReplayValue<String> first = new ReplayValueImpl<>(true, null);
ReplayValue<String> second = new ReplayValueImpl<>(true, null);
ReplayValue<String> third = new ReplayValueImpl<>(false, null);
assertEquals(first, second);
assertEquals(first, first);
assertNotEquals(first, third);
assertEquals(first.hashCode(), second.hashCode());
}
}

View File

@ -0,0 +1,35 @@
package com.mgabriel.chronicle.flux.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
class TimedValueTest {
private static final long TIME = 42;
@Test
@DisplayName("test equals and hashcode")
void testDataClass() {
String value = "testValue";
TimedValue<String> first = new TimedValue<>(TIME, value);
TimedValue<String> second = new TimedValue<>(TIME, value);
assertEquals(first, second);
assertEquals(first, first);
assertNotEquals(first, TIME);
assertEquals(first.hashCode(), second.hashCode());
}
@Test
@DisplayName("test equals and hashcode with null value")
void testDataClassWithNullValue() {
TimedValue<?> first = new TimedValue<>(TIME, null);
TimedValue<?> second = new TimedValue<>(TIME, null);
assertEquals(first, second);
assertEquals(first, first);
assertEquals(first.hashCode(), second.hashCode());
}
}