diff --git a/build.gradle b/build.gradle index af53678..42674f5 100644 --- a/build.gradle +++ b/build.gradle @@ -74,6 +74,7 @@ dependencies { testCompile "org.junit.jupiter:junit-jupiter-engine:5.2.0" testCompile "org.junit.platform:junit-platform-launcher:1.2.0" testCompile "commons-io:commons-io:2.6" + testCompile "org.mockito:mockito-core:2.21.0" } test { diff --git a/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java b/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java index 7f2d361..a78acd2 100644 --- a/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java +++ b/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java @@ -37,11 +37,15 @@ public abstract class AbstractChronicleStore implements FluxStore { private final RollCycle rollCycle; protected AbstractChronicleStore(AbstractChronicleStoreBuilder builder) { - String path = builder.path; serializer = builder.serializer; deserializer = builder.deserializer; rollCycle = builder.rollCycle; - this.queue = SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build(); + this.queue = createQueue(builder.path); + } + + //package private for testing + SingleChronicleQueue createQueue(String path) { + return SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build(); } void close() { @@ -84,7 +88,8 @@ public abstract class AbstractChronicleStore implements FluxStore { try { while (!sink.isCancelled()) { if (sink.requestedFromDownstream() > 0) { - boolean present = tailer.readBytes(b -> sink.next(deserializeValue(b))); + boolean present = tailer.readBytes(b -> + sink.next(deserializeValue(b))); if (!present) { if (readerType == ReaderType.ONLY_HISTORY) { sink.complete(); diff --git a/src/test/java/ch/streamly/chronicle/flux/AbstractChronicleStoreTest.java b/src/test/java/ch/streamly/chronicle/flux/AbstractChronicleStoreTest.java new file mode 100644 index 0000000..b97f73c --- /dev/null +++ b/src/test/java/ch/streamly/chronicle/flux/AbstractChronicleStoreTest.java @@ -0,0 +1,136 @@ +package ch.streamly.chronicle.flux; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.time.Duration; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import ch.streamly.chronicle.flux.AbstractChronicleStore.AbstractChronicleStoreBuilder; +import net.openhft.chronicle.bytes.Bytes; +import net.openhft.chronicle.bytes.BytesIn; +import net.openhft.chronicle.bytes.ReadBytesMarshallable; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycle; +import net.openhft.chronicle.queue.impl.WireStore; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; +import reactor.core.Disposable; +import reactor.test.StepVerifier; + +class AbstractChronicleStoreTest { + + private static final String TEST_VALUE = "testValue"; + private AbstractChronicleStore store; + + private File file; + private SingleChronicleQueue queue; + private ArgumentCaptor reader; + private ExcerptTailer tailer; + private WireStore wireStore; + + @BeforeEach + void setUp() { + + RollCycle rollCycle = Mockito.mock(RollCycle.class); + file = Mockito.mock(File.class); + queue = Mockito.mock(SingleChronicleQueue.class); + reader = ArgumentCaptor.forClass(ReadBytesMarshallable.class); + tailer = Mockito.mock(ExcerptTailer.class); + wireStore = Mockito.mock(WireStore.class); + + when(queue.createTailer()).thenReturn(tailer); + when(tailer.readBytes(any(ReadBytesMarshallable.class))).thenReturn(true); + when(tailer.queue()).thenReturn(queue); + when(queue.file()).thenReturn(file); + when(file.getAbsolutePath()).thenReturn(""); + when(rollCycle.toCycle(anyLong())).thenReturn(0).thenReturn(1); + when(queue.storeForCycle(anyInt(), anyLong(), anyBoolean())).thenReturn(wireStore); + when(wireStore.file()).thenReturn(file); + when(file.delete()).thenReturn(true); + + AbstractChronicleStoreBuilder builder = new AbstractChronicleStoreBuilder() { + }; + builder.rollCycle(rollCycle); + + store = new AbstractChronicleStore(builder) { + + @Override + SingleChronicleQueue createQueue(String path) { + return queue; + } + + @Override + protected String deserializeValue(BytesIn rawData) { + return TEST_VALUE; + } + }; + } + + @Test + @DisplayName("tests that the file is deleted once it has rolled") + void shouldDeleteAfterRead() { + Disposable sub = store.retrieveAll(true).subscribe(); + verify(file, timeout(100)).delete(); + sub.dispose(); + } + + @Test + @DisplayName("tests that the an exception on file deletion is swallowed") + void shouldSwallowExceptionOnFileDelete() { + when(file.delete()).thenThrow(new RuntimeException("Simulated for unit test")); + Disposable sub = store.retrieveAll(true).subscribe(); + verify(file, timeout(100)).delete(); + sub.dispose(); + } + + @Test + @DisplayName("tests that failure of file deletion does not send an exception") + void shouldNotFailIfFileNotDeleted() { + when(file.delete()).thenReturn(false); + Disposable sub = store.retrieveAll(true).subscribe(); + verify(file, timeout(100)).delete(); + sub.dispose(); + } + + @Test + @DisplayName("tests that a null wirestore throws no exception") + void testNullWirestore() { + when(queue.storeForCycle(anyInt(), anyLong(), anyBoolean())).thenReturn(null); + subscribeToValues(); + } + + private void subscribeToValues() { + StepVerifier.create(store.retrieveAll(true)) + .expectSubscription() + .then(() -> { + verify(tailer, timeout(100).atLeastOnce()).readBytes(reader.capture()); + reader.getValue().readMarshallable(Bytes.elasticByteBuffer()); + reader.getValue().readMarshallable(Bytes.elasticByteBuffer()); + reader.getValue().readMarshallable(Bytes.elasticByteBuffer()); + }) + .expectNext(TEST_VALUE) + .expectNext(TEST_VALUE) + .expectNext(TEST_VALUE) + .thenCancel() + .verify(Duration.ofMillis(500)); + } + + @Test + @DisplayName("tests that a null file throws no exception") + void testNullFile() { + when(queue.storeForCycle(anyInt(), anyLong(), anyBoolean())).thenReturn(wireStore); + when(wireStore.file()).thenReturn(null); + subscribeToValues(); + } +} \ No newline at end of file diff --git a/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000..ca6ee9c --- /dev/null +++ b/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline \ No newline at end of file