add unit tests
This commit is contained in:
parent
f9b4c563fc
commit
82de6cf410
@ -56,7 +56,10 @@ public class ChronicleStore<T> implements FluxStore<T> {
|
||||
deserializer = builder.deserializer;
|
||||
rollCycle = builder.rollCycle;
|
||||
this.queue = SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build();
|
||||
}
|
||||
|
||||
void close(){
|
||||
queue.close();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -177,7 +180,7 @@ public class ChronicleStore<T> implements FluxStore<T> {
|
||||
() -> readTailer(tailer, sink, readerType, deleteAfterRead),
|
||||
"ChronicleStoreRetrieve_" + path);
|
||||
t.setDaemon(true);
|
||||
t.run();
|
||||
t.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1,5 +1,5 @@
|
||||
package com.mgabriel.chronicle.flux.replay;
|
||||
|
||||
public interface WrappedValue<T> {
|
||||
interface WrappedValue<T> {
|
||||
T value();
|
||||
}
|
||||
|
@ -0,0 +1,117 @@
|
||||
package com.mgabriel.chronicle.flux;
|
||||
|
||||
import static com.mgabriel.chronicle.flux.util.ChronicleStoreCleanup.deleteStoreIfItExists;
|
||||
import static java.time.Duration.ofSeconds;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
class ChronicleStoreTest {
|
||||
private static final String PATH = "ChronicleStoreTest";
|
||||
private static final String ONE = "one";
|
||||
private static final String TWO = "two";
|
||||
private static final String THREE = "three";
|
||||
private static final String FOUR = "four";
|
||||
private static final DummyObject FIRST = new DummyObject(10000, ONE);
|
||||
private static final DummyObject SECOND = new DummyObject(11000, TWO);
|
||||
private static final DummyObject THIRD = new DummyObject(12000, THREE);
|
||||
private static final DummyObject FOURTH = new DummyObject(15000, FOUR);
|
||||
private static final Flux<DummyObject> source = Flux.just(FIRST, SECOND, THIRD, FOURTH
|
||||
);
|
||||
private ChronicleStore<DummyObject> store;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
deleteStoreIfItExists(PATH);
|
||||
store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() {
|
||||
store.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("tests that a data stream is store in the Chronicle store")
|
||||
void shouldStoreStream() {
|
||||
store.store(source);
|
||||
StepVerifier.create(store.retrieveAll())
|
||||
.expectSubscription()
|
||||
.expectNext(FIRST)
|
||||
.expectNext(SECOND)
|
||||
.expectNext(THIRD)
|
||||
.expectNext(FOURTH)
|
||||
.thenCancel()
|
||||
.verify(Duration.ofMillis(500));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("tests that individual items are stored in the Chronicle store")
|
||||
void shouldStoreIndividualItems() {
|
||||
store.store(FIRST);
|
||||
store.store(THIRD);
|
||||
|
||||
StepVerifier.create(store.retrieveAll())
|
||||
.expectSubscription()
|
||||
.expectNext(FIRST)
|
||||
.expectNext(THIRD)
|
||||
.thenCancel()
|
||||
.verify(Duration.ofMillis(500));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@DisplayName("tests that the history is retrieved from the Chronicle store")
|
||||
void shouldRetrieveHistory() {
|
||||
store.store(FIRST);
|
||||
store.store(SECOND);
|
||||
store.store(THIRD);
|
||||
|
||||
StepVerifier.create(store.retrieveHistory())
|
||||
.expectSubscription()
|
||||
.expectNext(FIRST)
|
||||
.expectNext(SECOND)
|
||||
.expectNext(THIRD)
|
||||
.expectComplete()
|
||||
.verify(Duration.ofMillis(500));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("tests that new values are retrieved from the Chronicle store")
|
||||
void retrieveNewValues() {
|
||||
store.store(FIRST);
|
||||
StepVerifier.withVirtualTime(() -> store.retrieveNewValues())
|
||||
.expectSubscription()
|
||||
.expectNoEvent(ofSeconds(1))
|
||||
.then( () -> store.store(SECOND))
|
||||
.expectNext(SECOND)
|
||||
.then( () -> store.store(THIRD))
|
||||
.expectNext(THIRD)
|
||||
.thenCancel()
|
||||
.verify(Duration.ofMillis(500));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("tests that the Chronicle store can replay history")
|
||||
void replayHistory() {
|
||||
store.store(source);
|
||||
StepVerifier.withVirtualTime(() -> store.replayHistory(DummyObject::timestamp).withOriginalTiming())
|
||||
.expectSubscription()
|
||||
.expectNext(FIRST)
|
||||
.expectNoEvent(ofSeconds(1))
|
||||
.expectNext(SECOND)
|
||||
.expectNoEvent(ofSeconds(1))
|
||||
.expectNext(THIRD)
|
||||
.expectNoEvent(ofSeconds(3))
|
||||
.expectNext(FOURTH)
|
||||
.thenCancel()
|
||||
.verify(Duration.ofMillis(500));
|
||||
}
|
||||
}
|
@ -8,7 +8,7 @@ import com.google.common.primitives.Longs;
|
||||
/**
|
||||
* A dummy test object that can be serialized / deserialized as binary.
|
||||
*
|
||||
* Please note that for production it makes more sense to rely on a binary protocol such as Protof, Avro, etc.
|
||||
* Please note that for production it makes more sense to rely on a binary protocol such as Protobuf, Avro, etc.
|
||||
*
|
||||
* @author mgabriel.
|
||||
*/
|
||||
|
@ -1,15 +1,13 @@
|
||||
package com.mgabriel.chronicle.flux.demo;
|
||||
|
||||
import static com.mgabriel.chronicle.flux.util.ChronicleStoreCleanup.deleteStoreIfItExists;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import com.mgabriel.chronicle.flux.ChronicleStore;
|
||||
import com.mgabriel.chronicle.flux.DummyObject;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@ -19,12 +17,12 @@ import reactor.core.publisher.Flux;
|
||||
*
|
||||
* @author mgabriel.
|
||||
*/
|
||||
public class ChronicleStoreDemo {
|
||||
class ChronicleStoreDemo {
|
||||
|
||||
private static final String PATH = "demoChronicleStore";
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
deleteStoreIfItExists();
|
||||
deleteStoreIfItExists(PATH);
|
||||
|
||||
ChronicleStore<DummyObject> store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary);
|
||||
|
||||
@ -47,17 +45,4 @@ public class ChronicleStoreDemo {
|
||||
.blockLast();
|
||||
}
|
||||
|
||||
private static void deleteStoreIfItExists() {
|
||||
File directory = new File(".");
|
||||
try {
|
||||
String FULL_PATH = directory.getCanonicalPath() + File.separator + PATH;
|
||||
File storePath = new File(FULL_PATH);
|
||||
if (storePath.exists()) {
|
||||
FileUtils.deleteDirectory(storePath);
|
||||
System.out.println("Deleted existing store");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
System.err.println("Error while deleting store");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ import java.time.Instant;
|
||||
import com.mgabriel.chronicle.flux.replay.ReplayFlux;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class ReplayFluxDemo {
|
||||
class ReplayFluxDemo {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
|
@ -8,7 +8,7 @@ import com.mgabriel.chronicle.flux.replay.ReplayInLoop;
|
||||
import com.mgabriel.chronicle.flux.replay.ReplayValue;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class ReplayLoopDemo {
|
||||
class ReplayLoopDemo {
|
||||
|
||||
public static void main(String[] args) {
|
||||
Flux<Long> source = Flux.just(0L, 1L, 2L, 3L, 4L, 5L);
|
||||
|
@ -5,7 +5,7 @@ import java.time.Instant;
|
||||
import com.mgabriel.chronicle.flux.replay.ReplayWithOriginalTiming;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class ReplayWithOriginalTimingDemo {
|
||||
class ReplayWithOriginalTimingDemo {
|
||||
|
||||
public static void main(String[] args) {
|
||||
Flux<Long> source = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L);
|
||||
|
@ -2,16 +2,17 @@ package com.mgabriel.chronicle.flux.replay;
|
||||
|
||||
import static java.time.Duration.ofMillis;
|
||||
import static java.time.Duration.ofSeconds;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import com.mgabriel.chronicle.flux.DummyObject;
|
||||
import reactor.core.Scannable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.test.StepVerifier;
|
||||
|
||||
@ -30,32 +31,28 @@ class ReplayFluxTest {
|
||||
private static final Duration THREE_SECONDS = ofSeconds(3);
|
||||
private static final Duration MILLIS_500 = ofMillis(500);
|
||||
|
||||
private static Flux<DummyObject> source = Flux.just(new DummyObject(10000, ONE),
|
||||
private static final Flux<DummyObject> source = Flux.just(new DummyObject(10000, ONE),
|
||||
new DummyObject(11000, TWO),
|
||||
new DummyObject(12000, THREE),
|
||||
new DummyObject(15000, FOUR)
|
||||
);
|
||||
|
||||
private static ReplayFlux<DummyObject> replayFlux = new ReplayFlux<>(source, DummyObject::timestamp);
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
}
|
||||
private static final ReplayFlux<DummyObject> replayFlux = new ReplayFlux<>(source, DummyObject::timestamp);
|
||||
|
||||
@Test
|
||||
@DisplayName("tests that the flux is replayed with the original timing")
|
||||
void shouldRespectOriginalTiming() {
|
||||
StepVerifier.withVirtualTime(() -> replayFlux.withOriginalTiming())
|
||||
StepVerifier.withVirtualTime(replayFlux::withOriginalTiming)
|
||||
.expectSubscription()
|
||||
.assertNext(i -> Assertions.assertEquals(ONE, i.value()))
|
||||
.assertNext(i -> assertEquals(ONE, i.value()))
|
||||
.expectNoEvent(ONE_SECOND)
|
||||
.assertNext(i -> Assertions.assertEquals(TWO, i.value()))
|
||||
.assertNext(i -> assertEquals(TWO, i.value()))
|
||||
.expectNoEvent(ONE_SECOND)
|
||||
.assertNext(i -> Assertions.assertEquals(THREE, i.value()))
|
||||
.assertNext(i -> assertEquals(THREE, i.value()))
|
||||
.expectNoEvent(THREE_SECONDS)
|
||||
.assertNext(i -> Assertions.assertEquals(FOUR, i.value()))
|
||||
.assertNext(i -> assertEquals(FOUR, i.value()))
|
||||
.expectComplete()
|
||||
.verify(ofMillis(500));
|
||||
.verify(MILLIS_500);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -63,15 +60,15 @@ class ReplayFluxTest {
|
||||
void shouldReplayWithTimeAcceleration() {
|
||||
StepVerifier.withVirtualTime(() -> replayFlux.withTimeAcceleration(2))
|
||||
.expectSubscription()
|
||||
.assertNext(i -> Assertions.assertEquals(ONE, i.value()))
|
||||
.assertNext(i -> assertEquals(ONE, i.value()))
|
||||
.expectNoEvent(MILLIS_500)
|
||||
.assertNext(i -> Assertions.assertEquals(TWO, i.value()))
|
||||
.assertNext(i -> assertEquals(TWO, i.value()))
|
||||
.expectNoEvent(MILLIS_500)
|
||||
.assertNext(i -> Assertions.assertEquals(THREE, i.value()))
|
||||
.assertNext(i -> assertEquals(THREE, i.value()))
|
||||
.expectNoEvent(ONE_SECOND.plus(MILLIS_500))
|
||||
.assertNext(i -> Assertions.assertEquals(FOUR, i.value()))
|
||||
.assertNext(i -> assertEquals(FOUR, i.value()))
|
||||
.expectComplete()
|
||||
.verify(ofMillis(500));
|
||||
.verify(MILLIS_500);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -79,15 +76,15 @@ class ReplayFluxTest {
|
||||
void shouldReplayWithTimeDeceleration() {
|
||||
StepVerifier.withVirtualTime(() -> replayFlux.withTimeAcceleration(0.5))
|
||||
.expectSubscription()
|
||||
.assertNext(i -> Assertions.assertEquals(ONE, i.value()))
|
||||
.assertNext(i -> assertEquals(ONE, i.value()))
|
||||
.expectNoEvent(TWO_SECONDS)
|
||||
.assertNext(i -> Assertions.assertEquals(TWO, i.value()))
|
||||
.assertNext(i -> assertEquals(TWO, i.value()))
|
||||
.expectNoEvent(TWO_SECONDS)
|
||||
.assertNext(i -> Assertions.assertEquals(THREE, i.value()))
|
||||
.assertNext(i -> assertEquals(THREE, i.value()))
|
||||
.expectNoEvent(ofSeconds(6))
|
||||
.assertNext(i -> Assertions.assertEquals(FOUR, i.value()))
|
||||
.assertNext(i -> assertEquals(FOUR, i.value()))
|
||||
.expectComplete()
|
||||
.verify(ofMillis(500));
|
||||
.verify(MILLIS_500);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -96,25 +93,25 @@ class ReplayFluxTest {
|
||||
StepVerifier.create(replayFlux.inLoop())
|
||||
.expectSubscription()
|
||||
.assertNext(i -> {
|
||||
Assertions.assertEquals(ONE, i.value().value());
|
||||
assertEquals(ONE, i.value().value());
|
||||
Assertions.assertTrue(i.isLoopRestart());
|
||||
})
|
||||
.assertNext(assertValue(TWO))
|
||||
.assertNext(assertValue(THREE))
|
||||
.assertNext(assertValue(FOUR))
|
||||
.assertNext(i -> {
|
||||
Assertions.assertEquals(ONE, i.value().value());
|
||||
assertEquals(ONE, i.value().value());
|
||||
Assertions.assertTrue(i.isLoopRestart());
|
||||
})
|
||||
.assertNext(assertValue(TWO))
|
||||
.assertNext(assertValue(THREE))
|
||||
.assertNext(assertValue(FOUR))
|
||||
.thenCancel()
|
||||
.verify(ofMillis(500));
|
||||
.verify(MILLIS_500);
|
||||
}
|
||||
|
||||
private static Consumer<ReplayValue<DummyObject>> assertValue(String expected) {
|
||||
return i -> Assertions.assertEquals(expected, i.value().value());
|
||||
return i -> assertEquals(expected, i.value().value());
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -134,13 +131,22 @@ class ReplayFluxTest {
|
||||
.assertNext(assertValue(FOUR))
|
||||
.expectNoEvent(TWO_SECONDS)
|
||||
.thenCancel()
|
||||
.verify(ofMillis(500));
|
||||
.verify(MILLIS_500);
|
||||
}
|
||||
|
||||
private static Consumer<ReplayValue<DummyObject>> assertLoopRestart() {
|
||||
return i -> {
|
||||
Assertions.assertEquals(ONE, i.value().value());
|
||||
assertEquals(ONE, i.value().value());
|
||||
Assertions.assertTrue(i.isLoopRestart());
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("tests that the replay flux forwards the context of its source")
|
||||
void shouldForwardSourceContext() {
|
||||
String testName = "testName";
|
||||
ReplayFlux<DummyObject> replayFlux = new ReplayFlux<>(source.name(testName), i -> 1L);
|
||||
String replayFluxName = Scannable.from(replayFlux).name();
|
||||
assertEquals(testName, replayFluxName);
|
||||
}
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
package com.mgabriel.chronicle.flux.util;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
||||
public class ChronicleStoreCleanup {
|
||||
|
||||
public static void deleteStoreIfItExists(String path) {
|
||||
File directory = new File(".");
|
||||
try {
|
||||
deleteStore(path, directory);
|
||||
} catch (IOException e) {
|
||||
//try again
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
deleteStore(path, directory);
|
||||
} catch (Exception exception) {
|
||||
System.err.println("Error while deleting store "+exception);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteStore(String path, File directory) throws IOException {
|
||||
String FULL_PATH = directory.getCanonicalPath() + File.separator + path;
|
||||
File storePath = new File(FULL_PATH);
|
||||
if (storePath.exists()) {
|
||||
FileUtils.deleteDirectory(storePath);
|
||||
System.out.println("Deleted existing store");
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user