diff --git a/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java b/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java index a78acd2..3fd30f5 100644 --- a/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java +++ b/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java @@ -30,13 +30,13 @@ import reactor.core.publisher.FluxSink; */ public abstract class AbstractChronicleStore implements FluxStore { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChronicleStore.class); - - private final Function serializer; protected final Function deserializer; + private final Function serializer; private final SingleChronicleQueue queue; private final RollCycle rollCycle; - protected AbstractChronicleStore(AbstractChronicleStoreBuilder builder) { + protected , B extends AbstractChronicleStoreBuilder> AbstractChronicleStore( + AbstractChronicleStoreBuilder builder) { serializer = builder.serializer; deserializer = builder.deserializer; rollCycle = builder.rollCycle; @@ -69,17 +69,28 @@ public abstract class AbstractChronicleStore implements FluxStore { return serializer.apply(v); } - protected abstract O deserializeValue(BytesIn rawData); - @Override public void store(I item) { ExcerptAppender appender = queue.acquireAppender(); storeValue(appender, item); } - private enum ReaderType { - ALL, - ONLY_HISTORY + @Override + public Flux retrieveAll(boolean deleteAfterRead) { + return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead)); + } + + private void launchTailer(FluxSink sink, ReaderType readerType, boolean deleteAfterRead) { + launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead); + } + + private void launchTailer(FluxSink sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) { + String path = tailer.queue().file().getAbsolutePath(); + Thread t = new Thread( + () -> readTailer(tailer, sink, readerType, deleteAfterRead), + "ChronicleStoreRetrieve_" + path); + t.setDaemon(true); + t.start(); } private void readTailer(ExcerptTailer tailer, FluxSink sink, @@ -113,6 +124,8 @@ public abstract class AbstractChronicleStore implements FluxStore { } } + protected abstract O deserializeValue(BytesIn rawData); + private void waitMillis(long time) { try { MILLISECONDS.sleep(time); @@ -153,24 +166,6 @@ public abstract class AbstractChronicleStore implements FluxStore { } } - @Override - public Flux retrieveAll(boolean deleteAfterRead) { - return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead)); - } - - private void launchTailer(FluxSink sink, ReaderType readerType, boolean deleteAfterRead) { - launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead); - } - - private void launchTailer(FluxSink sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) { - String path = tailer.queue().file().getAbsolutePath(); - Thread t = new Thread( - () -> readTailer(tailer, sink, readerType, deleteAfterRead), - "ChronicleStoreRetrieve_" + path); - t.setDaemon(true); - t.start(); - } - @Override public Flux retrieveHistory() { return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false)); @@ -188,7 +183,12 @@ public abstract class AbstractChronicleStore implements FluxStore { return new ReplayFlux<>(historySource, timestampExtractor); } - public abstract static class AbstractChronicleStoreBuilder { + private enum ReaderType { + ALL, + ONLY_HISTORY + } + + public abstract static class AbstractChronicleStoreBuilder, R extends AbstractChronicleStore, T> { private String path; private Function serializer; private Function deserializer; @@ -202,36 +202,40 @@ public abstract class AbstractChronicleStore implements FluxStore { * This path should not be a network file system (see the Chronicle queue documentation for more detail * @return this builder */ - public AbstractChronicleStoreBuilder path(String path) { + public B path(String path) { this.path = path; - return this; + return getThis(); } + protected abstract B getThis(); + /** * @param serializer data serializer * @return this builder */ - public AbstractChronicleStoreBuilder serializer(Function serializer) { + public B serializer(Function serializer) { this.serializer = serializer; - return this; + return getThis(); } /** * @param deserializer data deserializer * @return this builder */ - public AbstractChronicleStoreBuilder deserializer(Function deserializer) { + public B deserializer(Function deserializer) { this.deserializer = deserializer; - return this; + return getThis(); } /** * @param rollCycle roll cycle for the files * @return this builder */ - public AbstractChronicleStoreBuilder rollCycle(RollCycle rollCycle) { + public B rollCycle(RollCycle rollCycle) { this.rollCycle = rollCycle; - return this; + return getThis(); } + + public abstract R build(); } } \ No newline at end of file diff --git a/src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java b/src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java index d9db1e3..455d142 100644 --- a/src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java +++ b/src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java @@ -9,7 +9,6 @@ import net.openhft.chronicle.bytes.BytesIn; /** * Implementation of a {@link FluxJournal} backed by a Chronicle Queue. * This journal respects the backpressure on the data streams it produces. - * * All values saved in the journal are timed with the current time. * * @author mgabriel. @@ -30,10 +29,6 @@ public class ChronicleJournal extends AbstractChronicleStore> imp .deserializer(deserializer)); } - private ChronicleJournal(ChronicleJournalBuilder builder) { - super(builder); - } - /** * @param data type. * @return a ChronicleStore builder. @@ -42,6 +37,10 @@ public class ChronicleJournal extends AbstractChronicleStore> imp return new ChronicleJournalBuilder<>(); } + private ChronicleJournal(ChronicleJournalBuilder builder) { + super(builder); + } + @Override protected byte[] serializeValue(T v) { byte[] val = super.serializeValue(v); @@ -52,11 +51,6 @@ public class ChronicleJournal extends AbstractChronicleStore> imp return result; } - //package private for testing - long getCurrentTime() { - return System.currentTimeMillis(); - } - @Override protected Timed deserializeValue(BytesIn rawData) { int size = rawData.readInt(); @@ -72,6 +66,17 @@ public class ChronicleJournal extends AbstractChronicleStore> imp } + private static long fromByteArray(byte[] bytes) { + return (bytes[0] & 0xFFL) << 56 + | (bytes[1] & 0xFFL) << 48 + | (bytes[2] & 0xFFL) << 40 + | (bytes[3] & 0xFFL) << 32 + | (bytes[4] & 0xFFL) << 24 + | (bytes[5] & 0xFFL) << 16 + | (bytes[6] & 0xFFL) << 8 + | (bytes[7] & 0xFFL); + } + private static byte[] toByteArray(long value) { return new byte[] { (byte) (value >> 56), @@ -85,21 +90,23 @@ public class ChronicleJournal extends AbstractChronicleStore> imp }; } - private static long fromByteArray(byte[] bytes){ - return (bytes[0] & 0xFFL) << 56 - | (bytes[1] & 0xFFL) << 48 - | (bytes[2] & 0xFFL) << 40 - | (bytes[3] & 0xFFL) << 32 - | (bytes[4] & 0xFFL) << 24 - | (bytes[5] & 0xFFL) << 16 - | (bytes[6] & 0xFFL) << 8 - | (bytes[7] & 0xFFL); + //package private for testing + long getCurrentTime() { + return System.currentTimeMillis(); } - public static final class ChronicleJournalBuilder extends AbstractChronicleStoreBuilder { + public static final class ChronicleJournalBuilder + extends AbstractChronicleStoreBuilder, ChronicleJournal, T> { private ChronicleJournalBuilder() { super(); } + + @Override + protected ChronicleJournalBuilder getThis() { + return this; + } + + @Override public ChronicleJournal build() { return new ChronicleJournal<>(this); } diff --git a/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java b/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java index 09fc459..76786ea 100644 --- a/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java +++ b/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java @@ -26,10 +26,6 @@ public class ChronicleStore extends AbstractChronicleStore { .deserializer(deserializer)); } - private ChronicleStore(ChronicleStoreBuilder builder) { - super(builder); - } - /** * @param data type. * @return a ChronicleStore builder. @@ -38,6 +34,11 @@ public class ChronicleStore extends AbstractChronicleStore { return new ChronicleStoreBuilder<>(); } + //package private for testing + ChronicleStore(ChronicleStoreBuilder builder) { + super(builder); + } + @Override protected T deserializeValue(BytesIn rawData) { int size = rawData.readInt(); @@ -46,14 +47,21 @@ public class ChronicleStore extends AbstractChronicleStore { return deserializer.apply(bytes); } - public static final class ChronicleStoreBuilder extends AbstractChronicleStoreBuilder { + public static final class ChronicleStoreBuilder + extends AbstractChronicleStoreBuilder, ChronicleStore, T> { private ChronicleStoreBuilder() { super(); } + @Override + protected ChronicleStoreBuilder getThis() { + return this; + } + + @Override public ChronicleStore build() { - return new ChronicleStore(this); + return new ChronicleStore<>(this); } } } \ No newline at end of file diff --git a/src/test/java/ch/streamly/chronicle/flux/AbstractChronicleStoreTest.java b/src/test/java/ch/streamly/chronicle/flux/AbstractChronicleStoreTest.java index b97f73c..86d2de5 100644 --- a/src/test/java/ch/streamly/chronicle/flux/AbstractChronicleStoreTest.java +++ b/src/test/java/ch/streamly/chronicle/flux/AbstractChronicleStoreTest.java @@ -17,7 +17,6 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; -import ch.streamly.chronicle.flux.AbstractChronicleStore.AbstractChronicleStoreBuilder; import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.BytesIn; import net.openhft.chronicle.bytes.ReadBytesMarshallable; @@ -59,11 +58,10 @@ class AbstractChronicleStoreTest { when(wireStore.file()).thenReturn(file); when(file.delete()).thenReturn(true); - AbstractChronicleStoreBuilder builder = new AbstractChronicleStoreBuilder() { - }; + ChronicleStore.ChronicleStoreBuilder builder = ChronicleStore.newBuilder(); builder.rollCycle(rollCycle); - store = new AbstractChronicleStore(builder) { + store = new ChronicleStore(builder) { @Override SingleChronicleQueue createQueue(String path) { @@ -86,7 +84,7 @@ class AbstractChronicleStoreTest { } @Test - @DisplayName("tests that the an exception on file deletion is swallowed") + @DisplayName("tests that an exception on file deletion is swallowed") void shouldSwallowExceptionOnFileDelete() { when(file.delete()).thenThrow(new RuntimeException("Simulated for unit test")); Disposable sub = store.retrieveAll(true).subscribe(); @@ -133,4 +131,11 @@ class AbstractChronicleStoreTest { when(wireStore.file()).thenReturn(null); subscribeToValues(); } + + @Test + @DisplayName("tests that an exception thrown when reading from Chronicle is swallowed") + void testTailerException() { + when(tailer.readBytes(any(ReadBytesMarshallable.class))).thenThrow(new RuntimeException("simulated")).thenReturn(true); + subscribeToValues(); + } } \ No newline at end of file diff --git a/src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java b/src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java index 85827f4..65eb421 100644 --- a/src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java +++ b/src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java @@ -60,6 +60,21 @@ class ChronicleJournalTest { @Test @DisplayName("tests that a timed data stream is stored in the Chronicle journal") void shouldStoreStreamWithTimeStamps() { + verifyBasicOperations(); + } + + @Test + @DisplayName("tests store creation with builder") + void testStoreCreationWithBuilder() { + journal = ChronicleJournal.newBuilder() + .path(path) + .serializer(DummyObject::toBinary) + .deserializer(DummyObject::fromBinary) + .build(); + + } + + private void verifyBasicOperations() { journal.store(source); StepVerifier.create(journal.retrieveAll()) .expectSubscription() diff --git a/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java b/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java index a9f0788..cea5e5b 100644 --- a/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java +++ b/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import net.openhft.chronicle.queue.RollCycles; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -44,6 +45,22 @@ class ChronicleStoreTest { @Test @DisplayName("tests that a data stream is store in the Chronicle store") void shouldStoreStream() { + testBasicOperations(); + } + + @Test + @DisplayName("tests building the chronicle store with its builder") + void testWithBuilder() { + store = ChronicleStore.newBuilder() + .path(path) + .serializer(DummyObject::toBinary) + .deserializer(DummyObject::fromBinary) + .rollCycle(RollCycles.DAILY) + .build(); + testBasicOperations(); + } + + private void testBasicOperations() { store.store(source); StepVerifier.create(store.retrieveAll()) .expectSubscription()