From 13232b8c9992461a6ce94104900f99911fc7e20b Mon Sep 17 00:00:00 2001 From: mgabriel Date: Tue, 21 Aug 2018 16:31:25 +0200 Subject: [PATCH] Add ChronicleJournal --- build.gradle | 2 + .../flux/AbstractChronicleStore.java | 232 ++++++++++++++++++ .../chronicle/flux/ChronicleJournal.java | 107 ++++++++ .../chronicle/flux/ChronicleStore.java | 228 +---------------- .../streamly/chronicle/flux/FluxJournal.java | 17 ++ .../ch/streamly/chronicle/flux/FluxStore.java | 19 +- .../chronicle/flux/replay/ReplayFlux.java | 1 + .../chronicle/flux/replay/ReplayInLoop.java | 6 +- .../chronicle/flux/replay/ReplayValue.java | 14 -- .../flux/replay/ReplayValueImpl.java | 56 ----- .../flux/replay/ReplayWithOriginalTiming.java | 2 + .../streamly/chronicle/flux/replay/Timed.java | 10 - .../chronicle/flux/replay/TimedValue.java | 52 ---- .../chronicle/flux/replay/ValueToDelay.java | 1 + .../chronicle/flux/replay/WrappedValue.java | 5 - .../chronicle/flux/ChronicleJournalTest.java | 99 ++++++++ .../chronicle/flux/ChronicleStoreTest.java | 13 +- .../chronicle/flux/demo/ReplayLoopDemo.java | 2 +- .../chronicle/flux/replay/ReplayFluxTest.java | 1 + .../flux/replay/ReplayValueImplTest.java | 36 --- .../chronicle/flux/replay/TimedValueTest.java | 35 --- .../flux/util/ChronicleStoreCleanup.java | 25 +- 22 files changed, 514 insertions(+), 449 deletions(-) create mode 100644 src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java create mode 100644 src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java create mode 100644 src/main/java/ch/streamly/chronicle/flux/FluxJournal.java delete mode 100644 src/main/java/ch/streamly/chronicle/flux/replay/ReplayValue.java delete mode 100644 src/main/java/ch/streamly/chronicle/flux/replay/ReplayValueImpl.java delete mode 100644 src/main/java/ch/streamly/chronicle/flux/replay/Timed.java delete mode 100644 src/main/java/ch/streamly/chronicle/flux/replay/TimedValue.java delete mode 100644 src/main/java/ch/streamly/chronicle/flux/replay/WrappedValue.java create mode 100644 src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java delete mode 100644 src/test/java/ch/streamly/chronicle/flux/replay/ReplayValueImplTest.java delete mode 100644 src/test/java/ch/streamly/chronicle/flux/replay/TimedValueTest.java diff --git a/build.gradle b/build.gradle index 36e2ec6..af53678 100644 --- a/build.gradle +++ b/build.gradle @@ -50,6 +50,7 @@ def reactorVersion = '3.1.7.RELEASE' repositories { mavenCentral() + jcenter() } if (DEPLOYMENT) { @@ -62,6 +63,7 @@ if (DEPLOYMENT) { } dependencies { + compile "ch.streamly:streamly-domain:1.0.2" compile "org.slf4j:slf4j-api:1.7.25" compile "io.projectreactor:reactor-core:$reactorVersion" compile "net.openhft:chronicle-queue:$chronicleVersion" diff --git a/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java b/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java new file mode 100644 index 0000000..7f2d361 --- /dev/null +++ b/src/main/java/ch/streamly/chronicle/flux/AbstractChronicleStore.java @@ -0,0 +1,232 @@ +package ch.streamly.chronicle.flux; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.io.File; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.streamly.chronicle.flux.replay.ReplayFlux; +import net.openhft.chronicle.bytes.BytesIn; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.queue.ExcerptTailer; +import net.openhft.chronicle.queue.RollCycle; +import net.openhft.chronicle.queue.RollCycles; +import net.openhft.chronicle.queue.impl.WireStore; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; +import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +/** + * Implementation of a {@link FluxStore} backed by a Chronicle Queue. + * This store respects the backpressure on the data streams it produces. + * + * @author mgabriel. + */ +public abstract class AbstractChronicleStore implements FluxStore { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChronicleStore.class); + + private final Function serializer; + protected final Function deserializer; + private final SingleChronicleQueue queue; + private final RollCycle rollCycle; + + protected AbstractChronicleStore(AbstractChronicleStoreBuilder builder) { + String path = builder.path; + serializer = builder.serializer; + deserializer = builder.deserializer; + rollCycle = builder.rollCycle; + this.queue = SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build(); + } + + void close() { + queue.close(); + } + + @Override + public Disposable store(Publisher toStore) { + ExcerptAppender appender = queue.acquireAppender(); + return Flux.from(toStore) + .doOnError(err -> LOGGER.error("Error received", err)) + .subscribe(v -> storeValue(appender, v)); + } + + private void storeValue(ExcerptAppender appender, I v) { + byte[] bytesToStore = serializeValue(v); + appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore)); + } + + protected byte[] serializeValue(I v) { + return serializer.apply(v); + } + + protected abstract O deserializeValue(BytesIn rawData); + + @Override + public void store(I item) { + ExcerptAppender appender = queue.acquireAppender(); + storeValue(appender, item); + } + + private enum ReaderType { + ALL, + ONLY_HISTORY + } + + private void readTailer(ExcerptTailer tailer, FluxSink sink, + ReaderType readerType, boolean deleteAfterRead) { + int previousCycle = 0; + try { + while (!sink.isCancelled()) { + if (sink.requestedFromDownstream() > 0) { + boolean present = tailer.readBytes(b -> sink.next(deserializeValue(b))); + if (!present) { + if (readerType == ReaderType.ONLY_HISTORY) { + sink.complete(); + } else { + waitMillis(10); // wait for values to appear on the queue + } + } + } else { + waitMillis(100); // wait for requests + } + int cycle = rollCycle.toCycle(tailer.index()); + if (cycle != previousCycle) { + if (deleteAfterRead) { + deleteFile(previousCycle); + } + previousCycle = cycle; + } + } + } catch (Exception e) { + LOGGER.error("Error while tailing on queue {}", tailer.queue().file().getAbsolutePath(), e); + } + } + + private void waitMillis(long time) { + try { + MILLISECONDS.sleep(time); + } catch (InterruptedException e) { + //interrupt can happen when the flux is cancelled + Thread.currentThread().interrupt(); + } + } + + private void deleteFile(int previousCycle) { + WireStore wireStore = queue.storeForCycle(previousCycle, 0, false); + if (wireStore != null) { + File file = wireStore.file(); + if (file != null) { + deleteWireStore(file); + } else { + LOGGER.error("Could not find file for cycle {}", previousCycle); + } + } else { + LOGGER.trace("wirestore is null for cycle {}", previousCycle); + } + } + + private void deleteWireStore(File file) { + try { + boolean deleted = file.delete(); + logDeletionResult(file, deleted); + } catch (Exception e) { + LOGGER.error("Could not delete file {}", file.getAbsolutePath(), e); + } + } + + private void logDeletionResult(File file, boolean deleted) { + if (deleted) { + LOGGER.trace("file {} deleted after read", file.getAbsolutePath()); + } else { + LOGGER.error("Could not delete file {}", file.getAbsolutePath()); + } + } + + @Override + public Flux retrieveAll(boolean deleteAfterRead) { + return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead)); + } + + private void launchTailer(FluxSink sink, ReaderType readerType, boolean deleteAfterRead) { + launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead); + } + + private void launchTailer(FluxSink sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) { + String path = tailer.queue().file().getAbsolutePath(); + Thread t = new Thread( + () -> readTailer(tailer, sink, readerType, deleteAfterRead), + "ChronicleStoreRetrieve_" + path); + t.setDaemon(true); + t.start(); + } + + @Override + public Flux retrieveHistory() { + return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false)); + } + + @Override + public Flux retrieveNewValues() { + ExcerptTailer tailer = queue.createTailer().toEnd(); + return Flux.create(sink -> launchTailer(sink, tailer, ReaderType.ALL, false)); + } + + @Override + public ReplayFlux replayHistory(Function timestampExtractor) { + Flux historySource = Flux.defer(this::retrieveHistory); + return new ReplayFlux<>(historySource, timestampExtractor); + } + + public abstract static class AbstractChronicleStoreBuilder { + private String path; + private Function serializer; + private Function deserializer; + private RollCycle rollCycle = RollCycles.DAILY; + + protected AbstractChronicleStoreBuilder() { + } + + /** + * @param path path were the Chronicle Queue will store the files. + * This path should not be a network file system (see the Chronicle queue documentation for more detail + * @return this builder + */ + public AbstractChronicleStoreBuilder path(String path) { + this.path = path; + return this; + } + + /** + * @param serializer data serializer + * @return this builder + */ + public AbstractChronicleStoreBuilder serializer(Function serializer) { + this.serializer = serializer; + return this; + } + + /** + * @param deserializer data deserializer + * @return this builder + */ + public AbstractChronicleStoreBuilder deserializer(Function deserializer) { + this.deserializer = deserializer; + return this; + } + + /** + * @param rollCycle roll cycle for the files + * @return this builder + */ + public AbstractChronicleStoreBuilder rollCycle(RollCycle rollCycle) { + this.rollCycle = rollCycle; + return this; + } + } +} \ No newline at end of file diff --git a/src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java b/src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java new file mode 100644 index 0000000..d9db1e3 --- /dev/null +++ b/src/main/java/ch/streamly/chronicle/flux/ChronicleJournal.java @@ -0,0 +1,107 @@ +package ch.streamly.chronicle.flux; + +import java.util.function.Function; + +import ch.streamly.domain.Timed; +import ch.streamly.domain.TimedValue; +import net.openhft.chronicle.bytes.BytesIn; + +/** + * Implementation of a {@link FluxJournal} backed by a Chronicle Queue. + * This journal respects the backpressure on the data streams it produces. + * + * All values saved in the journal are timed with the current time. + * + * @author mgabriel. + */ +public class ChronicleJournal extends AbstractChronicleStore> implements FluxJournal { + + /** + * @param path path were the Chronicle Queue will store the files. + * This path should not be a network file system (see the Chronicle queue documentation for more detail + * @param serializer data serializer + * @param deserializer data deserializer + */ + public ChronicleJournal(String path, Function serializer, + Function deserializer) { + super(ChronicleJournal.newBuilder() + .path(path) + .serializer(serializer) + .deserializer(deserializer)); + } + + private ChronicleJournal(ChronicleJournalBuilder builder) { + super(builder); + } + + /** + * @param data type. + * @return a ChronicleStore builder. + */ + public static ChronicleJournalBuilder newBuilder() { + return new ChronicleJournalBuilder<>(); + } + + @Override + protected byte[] serializeValue(T v) { + byte[] val = super.serializeValue(v); + byte[] time = toByteArray(getCurrentTime()); + byte[] result = new byte[val.length + time.length]; + System.arraycopy(time, 0, result, 0, time.length); + System.arraycopy(val, 0, result, time.length, val.length); + return result; + } + + //package private for testing + long getCurrentTime() { + return System.currentTimeMillis(); + } + + @Override + protected Timed deserializeValue(BytesIn rawData) { + int size = rawData.readInt(); + byte[] bytes = new byte[size]; + rawData.read(bytes); + byte[] time = new byte[8]; + byte[] val = new byte[bytes.length - 8]; + System.arraycopy(bytes, 0, time, 0, time.length); + System.arraycopy(bytes, 8, val, 0, val.length); + T value = deserializer.apply(val); + long receptionTime = fromByteArray(time); + return new TimedValue<>(receptionTime, value); + + } + + private static byte[] toByteArray(long value) { + return new byte[] { + (byte) (value >> 56), + (byte) (value >> 48), + (byte) (value >> 40), + (byte) (value >> 32), + (byte) (value >> 24), + (byte) (value >> 16), + (byte) (value >> 8), + (byte) value + }; + } + + private static long fromByteArray(byte[] bytes){ + return (bytes[0] & 0xFFL) << 56 + | (bytes[1] & 0xFFL) << 48 + | (bytes[2] & 0xFFL) << 40 + | (bytes[3] & 0xFFL) << 32 + | (bytes[4] & 0xFFL) << 24 + | (bytes[5] & 0xFFL) << 16 + | (bytes[6] & 0xFFL) << 8 + | (bytes[7] & 0xFFL); + } + + public static final class ChronicleJournalBuilder extends AbstractChronicleStoreBuilder { + private ChronicleJournalBuilder() { + super(); + } + public ChronicleJournal build() { + return new ChronicleJournal<>(this); + } + } +} \ No newline at end of file diff --git a/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java b/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java index f99f4ea..09fc459 100644 --- a/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java +++ b/src/main/java/ch/streamly/chronicle/flux/ChronicleStore.java @@ -1,69 +1,36 @@ package ch.streamly.chronicle.flux; -import java.io.File; import java.util.function.Function; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import ch.streamly.chronicle.flux.replay.ReplayFlux; import net.openhft.chronicle.bytes.BytesIn; -import net.openhft.chronicle.queue.ExcerptAppender; -import net.openhft.chronicle.queue.ExcerptTailer; -import net.openhft.chronicle.queue.RollCycle; -import net.openhft.chronicle.queue.RollCycles; -import net.openhft.chronicle.queue.impl.WireStore; -import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue; -import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder; -import org.reactivestreams.Publisher; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; /** * Implementation of a {@link FluxStore} backed by a Chronicle Queue. - * * This store respects the backpressure on the data streams it produces. * * @author mgabriel. */ -public class ChronicleStore implements FluxStore { - private static final Logger LOGGER = LoggerFactory.getLogger(ChronicleStore.class); - - private final Function serializer; - private final Function deserializer; - private final SingleChronicleQueue queue; - private final RollCycle rollCycle; +public class ChronicleStore extends AbstractChronicleStore { /** - * - * @param path path were the Chronicle Queue will store the files. - * This path should not be a network file system (see the Chronicle queue documentation for more detail - * @param serializer data serializer + * @param path path were the Chronicle Queue will store the files. + * This path should not be a network file system (see the Chronicle queue documentation for more detail + * @param serializer data serializer * @param deserializer data deserializer */ public ChronicleStore(String path, Function serializer, Function deserializer) { - this(ChronicleStore.newBuilder() + super(ChronicleStore.newBuilder() .path(path) .serializer(serializer) .deserializer(deserializer)); } private ChronicleStore(ChronicleStoreBuilder builder) { - String path = builder.path; - serializer = builder.serializer; - deserializer = builder.deserializer; - rollCycle = builder.rollCycle; - this.queue = SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build(); - } - - void close(){ - queue.close(); + super(builder); } /** - * * @param data type. * @return a ChronicleStore builder. */ @@ -72,190 +39,21 @@ public class ChronicleStore implements FluxStore { } @Override - public Disposable store(Publisher toStore) { - ExcerptAppender appender = queue.acquireAppender(); - return Flux.from(toStore) - .doOnError(err -> LOGGER.error("Error received", err)) - .subscribe(v -> { - byte[] bytesToStore = serializer.apply(v); - appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore)); - }); - } - - @Override - public void store(T item) { - ExcerptAppender appender = queue.acquireAppender(); - byte[] bytesToStore = serializer.apply(item); - appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore)); - } - - private enum ReaderType { - ALL, - ONLY_HISTORY - } - - private void readTailer(ExcerptTailer tailer, FluxSink sink, - ReaderType readerType, boolean deleteAfterRead) { - int previousCycle = 0; - try { - while (!sink.isCancelled()) { - if (sink.requestedFromDownstream() > 0) { - boolean present = tailer.readBytes(b -> readAndSendValue(sink, b)); - if (!present) { - if (readerType == ReaderType.ONLY_HISTORY) { - sink.complete(); - } else { - try { - Thread.sleep(10); //waiting for data to appear on the queue - } catch (InterruptedException e) { - //interrupt can happen when the flux is cancelled - Thread.currentThread().interrupt(); - } - } - } - } else { - try { - Thread.sleep(100); //waiting for requests on the flux - } catch (InterruptedException e) { - //interrupt can happen when the flux is cancelled - Thread.currentThread().interrupt(); - } - } - int cycle = rollCycle.toCycle(tailer.index()); - if (cycle != previousCycle) { - if (deleteAfterRead) { - deleteFile(previousCycle); - } - previousCycle = cycle; - } - } - } catch (Exception e) { - LOGGER.error("Error while tailing on queue {}", tailer.queue().file().getAbsolutePath(), e); - } - } - - private void deleteFile(int previousCycle) { - WireStore wireStore = queue.storeForCycle(previousCycle, 0, false); - if (wireStore != null) { - File file = wireStore.file(); - if (file != null) { - deleteWireStore(file); - } else { - LOGGER.error("Could not find file for cycle {}", previousCycle); - } - } else { - LOGGER.trace("wirestore is null for cycle {}", previousCycle); - } - } - - private void deleteWireStore(File file) { - try { - boolean deleted = file.delete(); - logDeletionResult(file, deleted); - } catch (Exception e) { - LOGGER.error("Could not delete file {}", file.getAbsolutePath(), e); - } - } - - private void logDeletionResult(File file, boolean deleted) { - if (deleted) { - LOGGER.trace("file {} deleted after read", file.getAbsolutePath()); - } else { - LOGGER.error("Could not delete file {}", file.getAbsolutePath()); - } - } - - private void readAndSendValue(FluxSink sink, BytesIn b) { - int size = b.readInt(); + protected T deserializeValue(BytesIn rawData) { + int size = rawData.readInt(); byte[] bytes = new byte[size]; - b.read(bytes); - T value = deserializer.apply(bytes); - sink.next(value); + rawData.read(bytes); + return deserializer.apply(bytes); } - @Override - public Flux retrieveAll(boolean deleteAfterRead) { - return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead)); - } - - private void launchTailer(FluxSink sink, ReaderType readerType, boolean deleteAfterRead) { - launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead); - } - - private void launchTailer(FluxSink sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) { - String path = tailer.queue().file().getAbsolutePath(); - Thread t = new Thread( - () -> readTailer(tailer, sink, readerType, deleteAfterRead), - "ChronicleStoreRetrieve_" + path); - t.setDaemon(true); - t.start(); - } - - @Override - public Flux retrieveHistory() { - return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false)); - } - - @Override - public Flux retrieveNewValues() { - ExcerptTailer tailer = queue.createTailer().toEnd(); - return Flux.create(sink -> launchTailer(sink, tailer, ReaderType.ALL, false)); - } - - @Override - public ReplayFlux replayHistory(Function timestampExtractor) { - Flux historySource = Flux.defer(this::retrieveHistory); - return new ReplayFlux<>(historySource, timestampExtractor); - } - - public static final class ChronicleStoreBuilder { - private String path; - private Function serializer; - private Function deserializer; - private RollCycle rollCycle = RollCycles.DAILY; + public static final class ChronicleStoreBuilder extends AbstractChronicleStoreBuilder { private ChronicleStoreBuilder() { - } - - /** - * @param path path were the Chronicle Queue will store the files. - * This path should not be a network file system (see the Chronicle queue documentation for more detail - * @return this builder - */ - public ChronicleStoreBuilder path(String path) { - this.path = path; - return this; - } - - /** - * @param serializer data serializer - * @return this builder - */ - public ChronicleStoreBuilder serializer(Function serializer) { - this.serializer = serializer; - return this; - } - - /** - * @param deserializer data deserializer - * @return this builder - */ - public ChronicleStoreBuilder deserializer(Function deserializer) { - this.deserializer = deserializer; - return this; - } - - /** - * @param rollCycle roll cycle for the files - * @return this builder - */ - public ChronicleStoreBuilder rollCycle(RollCycle rollCycle) { - this.rollCycle = rollCycle; - return this; + super(); } public ChronicleStore build() { - return new ChronicleStore<>(this); + return new ChronicleStore(this); } } } \ No newline at end of file diff --git a/src/main/java/ch/streamly/chronicle/flux/FluxJournal.java b/src/main/java/ch/streamly/chronicle/flux/FluxJournal.java new file mode 100644 index 0000000..263832c --- /dev/null +++ b/src/main/java/ch/streamly/chronicle/flux/FluxJournal.java @@ -0,0 +1,17 @@ +package ch.streamly.chronicle.flux; + +import ch.streamly.chronicle.flux.replay.ReplayFlux; +import ch.streamly.domain.Timed; + +/** + * @author mgabriel. + */ +public interface FluxJournal extends FluxStore> { + + /** + * @return a Flux that can be used to replay the history with multiple strategies. The history timestamps are the ones assigned by the journal. + */ + default ReplayFlux> replayHistory() { + return replayHistory(Timed::time); + } +} diff --git a/src/main/java/ch/streamly/chronicle/flux/FluxStore.java b/src/main/java/ch/streamly/chronicle/flux/FluxStore.java index 2eac6a2..e286044 100644 --- a/src/main/java/ch/streamly/chronicle/flux/FluxStore.java +++ b/src/main/java/ch/streamly/chronicle/flux/FluxStore.java @@ -10,9 +10,10 @@ import reactor.core.publisher.Flux; /** * Reactive store used to store and replay a Flux. * - * @param the data type + * @param input data type + * @param input data type */ -public interface FluxStore { +public interface FluxStore { /** * Stores all items of the given stream until the stream completes or the returned {@link Disposable} is disposed. @@ -21,19 +22,19 @@ public interface FluxStore { * @param toStore data stream to store. * @return a disposable that can be used to stop the storage process. */ - Disposable store(Publisher toStore); + Disposable store(Publisher toStore); /** * Stores one item. * * @param item item to store. */ - void store(T item); + void store(I item); /** * @return all values present in the store and new values being stored in this FluxStore. */ - default Flux retrieveAll() { + default Flux retrieveAll() { return retrieveAll(false); } @@ -41,22 +42,22 @@ public interface FluxStore { * @param deleteAfterRead if true, the file storing the data on disk will be deleted once it has been read. * @return all values present in the store and new values being stored in this FluxStore. */ - Flux retrieveAll(boolean deleteAfterRead); + Flux retrieveAll(boolean deleteAfterRead); /** * @return all values present in the store and completes the stream. */ - Flux retrieveHistory(); + Flux retrieveHistory(); /** * @return the stream of new values being stored in this FluxStore (history is ignored). */ - Flux retrieveNewValues(); + Flux retrieveNewValues(); /** * @param timestampExtractor a function to extract the epoch time from the values. * @return a Flux that can be used to replay the history with multiple strategies. */ - ReplayFlux replayHistory(Function timestampExtractor); + ReplayFlux replayHistory(Function timestampExtractor); } diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayFlux.java b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayFlux.java index 9344a6a..87cd87e 100644 --- a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayFlux.java +++ b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayFlux.java @@ -3,6 +3,7 @@ package ch.streamly.chronicle.flux.replay; import java.time.Duration; import java.util.function.Function; +import ch.streamly.domain.ReplayValue; import reactor.core.CoreSubscriber; import reactor.core.Scannable; import reactor.core.publisher.Flux; diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayInLoop.java b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayInLoop.java index 2e7cb29..7eacd8a 100644 --- a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayInLoop.java +++ b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayInLoop.java @@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.LongStream; +import ch.streamly.domain.ReplayValue; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -42,12 +43,11 @@ public class ReplayInLoop implements Function, Publisher> wrapAsReplayValue(AtomicBoolean firstValueSent) { - return val -> { if (!firstValueSent.getAndSet(true)) { - return new ReplayValueImpl<>(true, val); + return ReplayValue.newLoopRestartValue(val); } - return new ReplayValueImpl<>(val); + return ReplayValue.newValue(val); }; } } diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayValue.java b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayValue.java deleted file mode 100644 index 43827c8..0000000 --- a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayValue.java +++ /dev/null @@ -1,14 +0,0 @@ -package ch.streamly.chronicle.flux.replay; - -/** - * A value wrapper that indicates if the current value is the first value replayed in the replay loop. - * @see ReplayInLoop - * @param data type - */ -public interface ReplayValue extends WrappedValue { - - /** - * @return true if this object is the loop restart signal (meaning that the replay loop has restarted from the beginning) - */ - boolean isLoopRestart(); -} diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayValueImpl.java b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayValueImpl.java deleted file mode 100644 index 8ad0e43..0000000 --- a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayValueImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -package ch.streamly.chronicle.flux.replay; - -import java.util.Objects; - -/** - * Default implementation of a {@link ReplayValue} - * @param data type - */ -public class ReplayValueImpl implements ReplayValue{ - private final boolean isLoopReset; - private final T value; - - ReplayValueImpl(T value) { - this.isLoopReset = false; - this.value = value; - } - - ReplayValueImpl(boolean isLoopReset, T value) { - this.isLoopReset = isLoopReset; - this.value = value; - } - - @Override - public boolean isLoopRestart() { - return isLoopReset; - } - - @Override - public T value() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - ReplayValueImpl that = (ReplayValueImpl) o; - return isLoopReset == that.isLoopReset && - Objects.equals(value, that.value); - } - - @Override - public int hashCode() { - return Objects.hash(isLoopReset, value); - } - - @Override - public String toString() { - return "ReplayValueImpl{" + - "isLoopRestart=" + isLoopReset + - ", value=" + value + - '}'; - } -} diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayWithOriginalTiming.java b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayWithOriginalTiming.java index 8f161ac..905056f 100644 --- a/src/main/java/ch/streamly/chronicle/flux/replay/ReplayWithOriginalTiming.java +++ b/src/main/java/ch/streamly/chronicle/flux/replay/ReplayWithOriginalTiming.java @@ -5,6 +5,8 @@ import static java.time.Duration.ofMillis; import java.util.function.Function; import java.util.function.Predicate; +import ch.streamly.domain.Timed; +import ch.streamly.domain.TimedValue; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/Timed.java b/src/main/java/ch/streamly/chronicle/flux/replay/Timed.java deleted file mode 100644 index 25f1e71..0000000 --- a/src/main/java/ch/streamly/chronicle/flux/replay/Timed.java +++ /dev/null @@ -1,10 +0,0 @@ -package ch.streamly.chronicle.flux.replay; - -/** - * Wraps a value with its timestamp. - * - * @param data type - */ -interface Timed extends WrappedValue { - long time(); -} diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/TimedValue.java b/src/main/java/ch/streamly/chronicle/flux/replay/TimedValue.java deleted file mode 100644 index b828d9c..0000000 --- a/src/main/java/ch/streamly/chronicle/flux/replay/TimedValue.java +++ /dev/null @@ -1,52 +0,0 @@ -package ch.streamly.chronicle.flux.replay; - -import java.util.Objects; - -/** - * Default implementation of a {@link Timed} value. - * - * @param - */ -class TimedValue implements Timed { - private final long time; - private final T value; - - TimedValue(long time, T value) { - this.time = time; - this.value = value; - } - - @Override - public T value() { - return value; - } - - @Override - public long time() { - return time; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - TimedValue that = (TimedValue) o; - return time == that.time && - Objects.equals(value, that.value); - } - - @Override - public int hashCode() { - return Objects.hash(time, value); - } - - @Override - public String toString() { - return "TimedValue{" + - "time=" + time + - ", value=" + value + - '}'; - } -} diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/ValueToDelay.java b/src/main/java/ch/streamly/chronicle/flux/replay/ValueToDelay.java index 97e45dc..5c4ab97 100644 --- a/src/main/java/ch/streamly/chronicle/flux/replay/ValueToDelay.java +++ b/src/main/java/ch/streamly/chronicle/flux/replay/ValueToDelay.java @@ -1,5 +1,6 @@ package ch.streamly.chronicle.flux.replay; +import ch.streamly.domain.WrappedValue; class ValueToDelay implements WrappedValue { private final long delay; diff --git a/src/main/java/ch/streamly/chronicle/flux/replay/WrappedValue.java b/src/main/java/ch/streamly/chronicle/flux/replay/WrappedValue.java deleted file mode 100644 index bd51390..0000000 --- a/src/main/java/ch/streamly/chronicle/flux/replay/WrappedValue.java +++ /dev/null @@ -1,5 +0,0 @@ -package ch.streamly.chronicle.flux.replay; - -interface WrappedValue { - T value(); -} diff --git a/src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java b/src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java new file mode 100644 index 0000000..85827f4 --- /dev/null +++ b/src/test/java/ch/streamly/chronicle/flux/ChronicleJournalTest.java @@ -0,0 +1,99 @@ +package ch.streamly.chronicle.flux; + +import static ch.streamly.chronicle.flux.util.ChronicleStoreCleanup.deleteStoreIfItExists; +import static java.time.Duration.ofSeconds; + +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import ch.streamly.domain.WrappedValue; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +class ChronicleJournalTest { + private static final String PREFIX = "ChronicleJournalTest"; + private static final String ONE = "one"; + private static final String TWO = "two"; + private static final String THREE = "three"; + private static final String FOUR = "four"; + private static final DummyObject FIRST = new DummyObject(10000, ONE); + private static final DummyObject SECOND = new DummyObject(11000, TWO); + private static final DummyObject THIRD = new DummyObject(12000, THREE); + private static final DummyObject FOURTH = new DummyObject(15000, FOUR); + private static final Flux source = Flux.just(FIRST, SECOND, THIRD, FOURTH); + private static final long TIME_1 = 1000L; + private static final long TIME_2 = 2000L; + private static final long TIME_3 = 3000L; + private static final long TIME_4 = 7000L; + private ChronicleJournal journal; + private String path; + + @BeforeEach + void setUp() { + path = PREFIX + UUID.randomUUID().toString(); + ConcurrentLinkedQueue timeQueue = new ConcurrentLinkedQueue<>(); + timeQueue.add(TIME_1); + timeQueue.add(TIME_2); + timeQueue.add(TIME_3); + timeQueue.add(TIME_4); + journal = new ChronicleJournal(path, DummyObject::toBinary, DummyObject::fromBinary) { + @Override + long getCurrentTime() { + return timeQueue.poll(); + } + }; + } + + @AfterEach + void tearDown() { + journal.close(); + deleteStoreIfItExists(path); + } + + @Test + @DisplayName("tests that a timed data stream is stored in the Chronicle journal") + void shouldStoreStreamWithTimeStamps() { + journal.store(source); + StepVerifier.create(journal.retrieveAll()) + .expectSubscription() + .assertNext(i -> { + Assertions.assertEquals(FIRST, i.value()); + Assertions.assertEquals(TIME_1, i.time()); + }) + .assertNext(i -> { + Assertions.assertEquals(SECOND, i.value()); + Assertions.assertEquals(TIME_2, i.time()); + }) + .assertNext(i -> { + Assertions.assertEquals(THIRD, i.value()); + Assertions.assertEquals(TIME_3, i.time()); + }) + .thenCancel() + .verify(Duration.ofMillis(500)); + } + + @Test + @DisplayName("tests that the Chronicle journal can replay history with the timestamps assigned by the journal") + void replayHistoryWithJournalTime() { + journal.store(source); + StepVerifier.withVirtualTime(() -> journal.replayHistory().withOriginalTiming().map(WrappedValue::value)) + .expectSubscription() + .expectNext(FIRST) + .expectNoEvent(ofSeconds(1)) + .expectNext(SECOND) + .expectNoEvent(ofSeconds(1)) + .expectNext(THIRD) + .expectNoEvent(ofSeconds(4)) + .expectNext(FOURTH) + .thenCancel() + .verify(Duration.ofMillis(500)); + } + +} \ No newline at end of file diff --git a/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java b/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java index 4766943..a9f0788 100644 --- a/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java +++ b/src/test/java/ch/streamly/chronicle/flux/ChronicleStoreTest.java @@ -4,6 +4,7 @@ import static ch.streamly.chronicle.flux.util.ChronicleStoreCleanup.deleteStoreI import static java.time.Duration.ofSeconds; import java.time.Duration; +import java.util.UUID; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -13,8 +14,9 @@ import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; + class ChronicleStoreTest { - private static final String PATH = "ChronicleStoreTest"; + private static final String PREFIX = "ChronicleStoreTest"; private static final String ONE = "one"; private static final String TWO = "two"; private static final String THREE = "three"; @@ -23,19 +25,20 @@ class ChronicleStoreTest { private static final DummyObject SECOND = new DummyObject(11000, TWO); private static final DummyObject THIRD = new DummyObject(12000, THREE); private static final DummyObject FOURTH = new DummyObject(15000, FOUR); - private static final Flux source = Flux.just(FIRST, SECOND, THIRD, FOURTH - ); + private static final Flux source = Flux.just(FIRST, SECOND, THIRD, FOURTH); private ChronicleStore store; + private String path; @BeforeEach void setUp() { - deleteStoreIfItExists(PATH); - store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary); + path = PREFIX + UUID.randomUUID().toString(); + store = new ChronicleStore<>(path, DummyObject::toBinary, DummyObject::fromBinary); } @AfterEach void tearDown() { store.close(); + deleteStoreIfItExists(path); } @Test diff --git a/src/test/java/ch/streamly/chronicle/flux/demo/ReplayLoopDemo.java b/src/test/java/ch/streamly/chronicle/flux/demo/ReplayLoopDemo.java index 73ca874..c847776 100644 --- a/src/test/java/ch/streamly/chronicle/flux/demo/ReplayLoopDemo.java +++ b/src/test/java/ch/streamly/chronicle/flux/demo/ReplayLoopDemo.java @@ -4,8 +4,8 @@ import static java.time.Duration.ofSeconds; import java.time.Instant; -import ch.streamly.chronicle.flux.replay.ReplayValue; import ch.streamly.chronicle.flux.replay.ReplayInLoop; +import ch.streamly.domain.ReplayValue; import reactor.core.publisher.Flux; class ReplayLoopDemo { diff --git a/src/test/java/ch/streamly/chronicle/flux/replay/ReplayFluxTest.java b/src/test/java/ch/streamly/chronicle/flux/replay/ReplayFluxTest.java index 727d99f..0a013e0 100644 --- a/src/test/java/ch/streamly/chronicle/flux/replay/ReplayFluxTest.java +++ b/src/test/java/ch/streamly/chronicle/flux/replay/ReplayFluxTest.java @@ -12,6 +12,7 @@ import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import ch.streamly.chronicle.flux.DummyObject; +import ch.streamly.domain.ReplayValue; import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; diff --git a/src/test/java/ch/streamly/chronicle/flux/replay/ReplayValueImplTest.java b/src/test/java/ch/streamly/chronicle/flux/replay/ReplayValueImplTest.java deleted file mode 100644 index 906b0d3..0000000 --- a/src/test/java/ch/streamly/chronicle/flux/replay/ReplayValueImplTest.java +++ /dev/null @@ -1,36 +0,0 @@ -package ch.streamly.chronicle.flux.replay; - -import static org.junit.jupiter.api.Assertions.*; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -class ReplayValueImplTest { - - @Test - @DisplayName("test equals and hashcode") - void testDataClass() { - String value = "testValue"; - ReplayValue first = new ReplayValueImpl<>(true, value); - ReplayValue second = new ReplayValueImpl<>(true, value); - ReplayValue third = new ReplayValueImpl<>(false, value); - assertEquals(first, second); - assertEquals(first, first); - assertNotEquals(first, value); - assertNotEquals(first, third); - assertEquals(first.hashCode(), second.hashCode()); - } - - @Test - @DisplayName("test equals and hashcode with null value") - void testDataClassWithNullValue() { - ReplayValue first = new ReplayValueImpl<>(true, null); - ReplayValue second = new ReplayValueImpl<>(true, null); - ReplayValue third = new ReplayValueImpl<>(false, null); - assertEquals(first, second); - assertEquals(first, first); - assertNotEquals(first, third); - assertEquals(first.hashCode(), second.hashCode()); - } - -} \ No newline at end of file diff --git a/src/test/java/ch/streamly/chronicle/flux/replay/TimedValueTest.java b/src/test/java/ch/streamly/chronicle/flux/replay/TimedValueTest.java deleted file mode 100644 index 7a90932..0000000 --- a/src/test/java/ch/streamly/chronicle/flux/replay/TimedValueTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package ch.streamly.chronicle.flux.replay; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -class TimedValueTest { - - private static final long TIME = 42; - - @Test - @DisplayName("test equals and hashcode") - void testDataClass() { - String value = "testValue"; - TimedValue first = new TimedValue<>(TIME, value); - TimedValue second = new TimedValue<>(TIME, value); - assertEquals(first, second); - assertEquals(first, first); - assertNotEquals(first, TIME); - assertEquals(first.hashCode(), second.hashCode()); - } - - @Test - @DisplayName("test equals and hashcode with null value") - void testDataClassWithNullValue() { - TimedValue first = new TimedValue<>(TIME, null); - TimedValue second = new TimedValue<>(TIME, null); - assertEquals(first, second); - assertEquals(first, first); - assertEquals(first.hashCode(), second.hashCode()); - } - -} \ No newline at end of file diff --git a/src/test/java/ch/streamly/chronicle/flux/util/ChronicleStoreCleanup.java b/src/test/java/ch/streamly/chronicle/flux/util/ChronicleStoreCleanup.java index 82be1ec..15d0757 100644 --- a/src/test/java/ch/streamly/chronicle/flux/util/ChronicleStoreCleanup.java +++ b/src/test/java/ch/streamly/chronicle/flux/util/ChronicleStoreCleanup.java @@ -9,17 +9,24 @@ public class ChronicleStoreCleanup { public static void deleteStoreIfItExists(String path) { File directory = new File("."); - try { - deleteStore(path, directory); - } catch (IOException e) { - //try again + boolean done = false; + long startTime = System.currentTimeMillis(); + Exception exception = null; + while(!done && System.currentTimeMillis() - startTime < 5000){ try { - Thread.sleep(500); deleteStore(path, directory); - } catch (Exception exception) { - System.err.println("Error while deleting store "+exception); + done = true; + } catch (IOException e) { + exception = e; + try { + Thread.sleep(100); + } catch (InterruptedException interrupted) { + System.out.println("store deletion interrupted "+interrupted); + } } - + } + if(!done){ + System.err.println("Error while deleting store "+exception); } } @@ -29,6 +36,8 @@ public class ChronicleStoreCleanup { if (storePath.exists()) { FileUtils.deleteDirectory(storePath); System.out.println("Deleted existing store"); + }else{ + System.out.println("Path does not exists "+path); } } }