Add ChronicleJournal

This commit is contained in:
mgabriel 2018-08-21 16:31:25 +02:00
parent 55e45be644
commit 13232b8c99
22 changed files with 514 additions and 449 deletions

View File

@ -50,6 +50,7 @@ def reactorVersion = '3.1.7.RELEASE'
repositories {
mavenCentral()
jcenter()
}
if (DEPLOYMENT) {
@ -62,6 +63,7 @@ if (DEPLOYMENT) {
}
dependencies {
compile "ch.streamly:streamly-domain:1.0.2"
compile "org.slf4j:slf4j-api:1.7.25"
compile "io.projectreactor:reactor-core:$reactorVersion"
compile "net.openhft:chronicle-queue:$chronicleVersion"

View File

@ -0,0 +1,232 @@
package ch.streamly.chronicle.flux;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.io.File;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.streamly.chronicle.flux.replay.ReplayFlux;
import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* Implementation of a {@link FluxStore} backed by a Chronicle Queue.
* This store respects the backpressure on the data streams it produces.
*
* @author mgabriel.
*/
public abstract class AbstractChronicleStore<I, O> implements FluxStore<I, O> {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractChronicleStore.class);
private final Function<I, byte[]> serializer;
protected final Function<byte[], I> deserializer;
private final SingleChronicleQueue queue;
private final RollCycle rollCycle;
protected AbstractChronicleStore(AbstractChronicleStoreBuilder<I> builder) {
String path = builder.path;
serializer = builder.serializer;
deserializer = builder.deserializer;
rollCycle = builder.rollCycle;
this.queue = SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build();
}
void close() {
queue.close();
}
@Override
public Disposable store(Publisher<I> toStore) {
ExcerptAppender appender = queue.acquireAppender();
return Flux.from(toStore)
.doOnError(err -> LOGGER.error("Error received", err))
.subscribe(v -> storeValue(appender, v));
}
private void storeValue(ExcerptAppender appender, I v) {
byte[] bytesToStore = serializeValue(v);
appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore));
}
protected byte[] serializeValue(I v) {
return serializer.apply(v);
}
protected abstract O deserializeValue(BytesIn rawData);
@Override
public void store(I item) {
ExcerptAppender appender = queue.acquireAppender();
storeValue(appender, item);
}
private enum ReaderType {
ALL,
ONLY_HISTORY
}
private void readTailer(ExcerptTailer tailer, FluxSink<O> sink,
ReaderType readerType, boolean deleteAfterRead) {
int previousCycle = 0;
try {
while (!sink.isCancelled()) {
if (sink.requestedFromDownstream() > 0) {
boolean present = tailer.readBytes(b -> sink.next(deserializeValue(b)));
if (!present) {
if (readerType == ReaderType.ONLY_HISTORY) {
sink.complete();
} else {
waitMillis(10); // wait for values to appear on the queue
}
}
} else {
waitMillis(100); // wait for requests
}
int cycle = rollCycle.toCycle(tailer.index());
if (cycle != previousCycle) {
if (deleteAfterRead) {
deleteFile(previousCycle);
}
previousCycle = cycle;
}
}
} catch (Exception e) {
LOGGER.error("Error while tailing on queue {}", tailer.queue().file().getAbsolutePath(), e);
}
}
private void waitMillis(long time) {
try {
MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
//interrupt can happen when the flux is cancelled
Thread.currentThread().interrupt();
}
}
private void deleteFile(int previousCycle) {
WireStore wireStore = queue.storeForCycle(previousCycle, 0, false);
if (wireStore != null) {
File file = wireStore.file();
if (file != null) {
deleteWireStore(file);
} else {
LOGGER.error("Could not find file for cycle {}", previousCycle);
}
} else {
LOGGER.trace("wirestore is null for cycle {}", previousCycle);
}
}
private void deleteWireStore(File file) {
try {
boolean deleted = file.delete();
logDeletionResult(file, deleted);
} catch (Exception e) {
LOGGER.error("Could not delete file {}", file.getAbsolutePath(), e);
}
}
private void logDeletionResult(File file, boolean deleted) {
if (deleted) {
LOGGER.trace("file {} deleted after read", file.getAbsolutePath());
} else {
LOGGER.error("Could not delete file {}", file.getAbsolutePath());
}
}
@Override
public Flux<O> retrieveAll(boolean deleteAfterRead) {
return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead));
}
private void launchTailer(FluxSink<O> sink, ReaderType readerType, boolean deleteAfterRead) {
launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead);
}
private void launchTailer(FluxSink<O> sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) {
String path = tailer.queue().file().getAbsolutePath();
Thread t = new Thread(
() -> readTailer(tailer, sink, readerType, deleteAfterRead),
"ChronicleStoreRetrieve_" + path);
t.setDaemon(true);
t.start();
}
@Override
public Flux<O> retrieveHistory() {
return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false));
}
@Override
public Flux<O> retrieveNewValues() {
ExcerptTailer tailer = queue.createTailer().toEnd();
return Flux.create(sink -> launchTailer(sink, tailer, ReaderType.ALL, false));
}
@Override
public ReplayFlux<O> replayHistory(Function<O, Long> timestampExtractor) {
Flux<O> historySource = Flux.defer(this::retrieveHistory);
return new ReplayFlux<>(historySource, timestampExtractor);
}
public abstract static class AbstractChronicleStoreBuilder<T> {
private String path;
private Function<T, byte[]> serializer;
private Function<byte[], T> deserializer;
private RollCycle rollCycle = RollCycles.DAILY;
protected AbstractChronicleStoreBuilder() {
}
/**
* @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a>
* @return this builder
*/
public AbstractChronicleStoreBuilder<T> path(String path) {
this.path = path;
return this;
}
/**
* @param serializer data serializer
* @return this builder
*/
public AbstractChronicleStoreBuilder<T> serializer(Function<T, byte[]> serializer) {
this.serializer = serializer;
return this;
}
/**
* @param deserializer data deserializer
* @return this builder
*/
public AbstractChronicleStoreBuilder<T> deserializer(Function<byte[], T> deserializer) {
this.deserializer = deserializer;
return this;
}
/**
* @param rollCycle roll cycle for the files
* @return this builder
*/
public AbstractChronicleStoreBuilder<T> rollCycle(RollCycle rollCycle) {
this.rollCycle = rollCycle;
return this;
}
}
}

View File

@ -0,0 +1,107 @@
package ch.streamly.chronicle.flux;
import java.util.function.Function;
import ch.streamly.domain.Timed;
import ch.streamly.domain.TimedValue;
import net.openhft.chronicle.bytes.BytesIn;
/**
* Implementation of a {@link FluxJournal} backed by a Chronicle Queue.
* This journal respects the backpressure on the data streams it produces.
*
* All values saved in the journal are timed with the current time.
*
* @author mgabriel.
*/
public class ChronicleJournal<T> extends AbstractChronicleStore<T, Timed<T>> implements FluxJournal<T> {
/**
* @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a>
* @param serializer data serializer
* @param deserializer data deserializer
*/
public ChronicleJournal(String path, Function<T, byte[]> serializer,
Function<byte[], T> deserializer) {
super(ChronicleJournal.<T>newBuilder()
.path(path)
.serializer(serializer)
.deserializer(deserializer));
}
private ChronicleJournal(ChronicleJournalBuilder<T> builder) {
super(builder);
}
/**
* @param <BT> data type.
* @return a ChronicleStore builder.
*/
public static <BT> ChronicleJournalBuilder<BT> newBuilder() {
return new ChronicleJournalBuilder<>();
}
@Override
protected byte[] serializeValue(T v) {
byte[] val = super.serializeValue(v);
byte[] time = toByteArray(getCurrentTime());
byte[] result = new byte[val.length + time.length];
System.arraycopy(time, 0, result, 0, time.length);
System.arraycopy(val, 0, result, time.length, val.length);
return result;
}
//package private for testing
long getCurrentTime() {
return System.currentTimeMillis();
}
@Override
protected Timed<T> deserializeValue(BytesIn rawData) {
int size = rawData.readInt();
byte[] bytes = new byte[size];
rawData.read(bytes);
byte[] time = new byte[8];
byte[] val = new byte[bytes.length - 8];
System.arraycopy(bytes, 0, time, 0, time.length);
System.arraycopy(bytes, 8, val, 0, val.length);
T value = deserializer.apply(val);
long receptionTime = fromByteArray(time);
return new TimedValue<>(receptionTime, value);
}
private static byte[] toByteArray(long value) {
return new byte[] {
(byte) (value >> 56),
(byte) (value >> 48),
(byte) (value >> 40),
(byte) (value >> 32),
(byte) (value >> 24),
(byte) (value >> 16),
(byte) (value >> 8),
(byte) value
};
}
private static long fromByteArray(byte[] bytes){
return (bytes[0] & 0xFFL) << 56
| (bytes[1] & 0xFFL) << 48
| (bytes[2] & 0xFFL) << 40
| (bytes[3] & 0xFFL) << 32
| (bytes[4] & 0xFFL) << 24
| (bytes[5] & 0xFFL) << 16
| (bytes[6] & 0xFFL) << 8
| (bytes[7] & 0xFFL);
}
public static final class ChronicleJournalBuilder<T> extends AbstractChronicleStoreBuilder<T> {
private ChronicleJournalBuilder() {
super();
}
public ChronicleJournal<T> build() {
return new ChronicleJournal<>(this);
}
}
}

View File

@ -1,42 +1,18 @@
package ch.streamly.chronicle.flux;
import java.io.File;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ch.streamly.chronicle.flux.replay.ReplayFlux;
import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* Implementation of a {@link FluxStore} backed by a Chronicle Queue.
*
* This store respects the backpressure on the data streams it produces.
*
* @author mgabriel.
*/
public class ChronicleStore<T> implements FluxStore<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ChronicleStore.class);
private final Function<T, byte[]> serializer;
private final Function<byte[], T> deserializer;
private final SingleChronicleQueue queue;
private final RollCycle rollCycle;
public class ChronicleStore<T> extends AbstractChronicleStore<T, T> {
/**
*
* @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a>
* @param serializer data serializer
@ -44,26 +20,17 @@ public class ChronicleStore<T> implements FluxStore<T> {
*/
public ChronicleStore(String path, Function<T, byte[]> serializer,
Function<byte[], T> deserializer) {
this(ChronicleStore.<T>newBuilder()
super(ChronicleStore.<T>newBuilder()
.path(path)
.serializer(serializer)
.deserializer(deserializer));
}
private ChronicleStore(ChronicleStoreBuilder<T> builder) {
String path = builder.path;
serializer = builder.serializer;
deserializer = builder.deserializer;
rollCycle = builder.rollCycle;
this.queue = SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build();
}
void close(){
queue.close();
super(builder);
}
/**
*
* @param <BT> data type.
* @return a ChronicleStore builder.
*/
@ -72,190 +39,21 @@ public class ChronicleStore<T> implements FluxStore<T> {
}
@Override
public Disposable store(Publisher<T> toStore) {
ExcerptAppender appender = queue.acquireAppender();
return Flux.from(toStore)
.doOnError(err -> LOGGER.error("Error received", err))
.subscribe(v -> {
byte[] bytesToStore = serializer.apply(v);
appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore));
});
}
@Override
public void store(T item) {
ExcerptAppender appender = queue.acquireAppender();
byte[] bytesToStore = serializer.apply(item);
appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore));
}
private enum ReaderType {
ALL,
ONLY_HISTORY
}
private void readTailer(ExcerptTailer tailer, FluxSink<T> sink,
ReaderType readerType, boolean deleteAfterRead) {
int previousCycle = 0;
try {
while (!sink.isCancelled()) {
if (sink.requestedFromDownstream() > 0) {
boolean present = tailer.readBytes(b -> readAndSendValue(sink, b));
if (!present) {
if (readerType == ReaderType.ONLY_HISTORY) {
sink.complete();
} else {
try {
Thread.sleep(10); //waiting for data to appear on the queue
} catch (InterruptedException e) {
//interrupt can happen when the flux is cancelled
Thread.currentThread().interrupt();
}
}
}
} else {
try {
Thread.sleep(100); //waiting for requests on the flux
} catch (InterruptedException e) {
//interrupt can happen when the flux is cancelled
Thread.currentThread().interrupt();
}
}
int cycle = rollCycle.toCycle(tailer.index());
if (cycle != previousCycle) {
if (deleteAfterRead) {
deleteFile(previousCycle);
}
previousCycle = cycle;
}
}
} catch (Exception e) {
LOGGER.error("Error while tailing on queue {}", tailer.queue().file().getAbsolutePath(), e);
}
}
private void deleteFile(int previousCycle) {
WireStore wireStore = queue.storeForCycle(previousCycle, 0, false);
if (wireStore != null) {
File file = wireStore.file();
if (file != null) {
deleteWireStore(file);
} else {
LOGGER.error("Could not find file for cycle {}", previousCycle);
}
} else {
LOGGER.trace("wirestore is null for cycle {}", previousCycle);
}
}
private void deleteWireStore(File file) {
try {
boolean deleted = file.delete();
logDeletionResult(file, deleted);
} catch (Exception e) {
LOGGER.error("Could not delete file {}", file.getAbsolutePath(), e);
}
}
private void logDeletionResult(File file, boolean deleted) {
if (deleted) {
LOGGER.trace("file {} deleted after read", file.getAbsolutePath());
} else {
LOGGER.error("Could not delete file {}", file.getAbsolutePath());
}
}
private void readAndSendValue(FluxSink<T> sink, BytesIn b) {
int size = b.readInt();
protected T deserializeValue(BytesIn rawData) {
int size = rawData.readInt();
byte[] bytes = new byte[size];
b.read(bytes);
T value = deserializer.apply(bytes);
sink.next(value);
rawData.read(bytes);
return deserializer.apply(bytes);
}
@Override
public Flux<T> retrieveAll(boolean deleteAfterRead) {
return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead));
}
private void launchTailer(FluxSink<T> sink, ReaderType readerType, boolean deleteAfterRead) {
launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead);
}
private void launchTailer(FluxSink<T> sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) {
String path = tailer.queue().file().getAbsolutePath();
Thread t = new Thread(
() -> readTailer(tailer, sink, readerType, deleteAfterRead),
"ChronicleStoreRetrieve_" + path);
t.setDaemon(true);
t.start();
}
@Override
public Flux<T> retrieveHistory() {
return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false));
}
@Override
public Flux<T> retrieveNewValues() {
ExcerptTailer tailer = queue.createTailer().toEnd();
return Flux.create(sink -> launchTailer(sink, tailer, ReaderType.ALL, false));
}
@Override
public ReplayFlux<T> replayHistory(Function<T, Long> timestampExtractor) {
Flux<T> historySource = Flux.defer(this::retrieveHistory);
return new ReplayFlux<>(historySource, timestampExtractor);
}
public static final class ChronicleStoreBuilder<T> {
private String path;
private Function<T, byte[]> serializer;
private Function<byte[], T> deserializer;
private RollCycle rollCycle = RollCycles.DAILY;
public static final class ChronicleStoreBuilder<T> extends AbstractChronicleStoreBuilder<T> {
private ChronicleStoreBuilder() {
}
/**
* @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a>
* @return this builder
*/
public ChronicleStoreBuilder<T> path(String path) {
this.path = path;
return this;
}
/**
* @param serializer data serializer
* @return this builder
*/
public ChronicleStoreBuilder<T> serializer(Function<T, byte[]> serializer) {
this.serializer = serializer;
return this;
}
/**
* @param deserializer data deserializer
* @return this builder
*/
public ChronicleStoreBuilder<T> deserializer(Function<byte[], T> deserializer) {
this.deserializer = deserializer;
return this;
}
/**
* @param rollCycle roll cycle for the files
* @return this builder
*/
public ChronicleStoreBuilder<T> rollCycle(RollCycle rollCycle) {
this.rollCycle = rollCycle;
return this;
super();
}
public ChronicleStore<T> build() {
return new ChronicleStore<>(this);
return new ChronicleStore<T>(this);
}
}
}

View File

@ -0,0 +1,17 @@
package ch.streamly.chronicle.flux;
import ch.streamly.chronicle.flux.replay.ReplayFlux;
import ch.streamly.domain.Timed;
/**
* @author mgabriel.
*/
public interface FluxJournal<T> extends FluxStore<T, Timed<T>> {
/**
* @return a Flux that can be used to replay the history with multiple strategies. The history timestamps are the ones assigned by the journal.
*/
default ReplayFlux<Timed<T>> replayHistory() {
return replayHistory(Timed::time);
}
}

View File

@ -10,9 +10,10 @@ import reactor.core.publisher.Flux;
/**
* Reactive store used to store and replay a Flux.
*
* @param <T> the data type
* @param <I> input data type
* @param <O> input data type
*/
public interface FluxStore<T> {
public interface FluxStore<I, O> {
/**
* Stores all items of the given stream until the stream completes or the returned {@link Disposable} is disposed.
@ -21,19 +22,19 @@ public interface FluxStore<T> {
* @param toStore data stream to store.
* @return a disposable that can be used to stop the storage process.
*/
Disposable store(Publisher<T> toStore);
Disposable store(Publisher<I> toStore);
/**
* Stores one item.
*
* @param item item to store.
*/
void store(T item);
void store(I item);
/**
* @return all values present in the store and new values being stored in this FluxStore.
*/
default Flux<T> retrieveAll() {
default Flux<O> retrieveAll() {
return retrieveAll(false);
}
@ -41,22 +42,22 @@ public interface FluxStore<T> {
* @param deleteAfterRead if true, the file storing the data on disk will be deleted once it has been read.
* @return all values present in the store and new values being stored in this FluxStore.
*/
Flux<T> retrieveAll(boolean deleteAfterRead);
Flux<O> retrieveAll(boolean deleteAfterRead);
/**
* @return all values present in the store and completes the stream.
*/
Flux<T> retrieveHistory();
Flux<O> retrieveHistory();
/**
* @return the stream of new values being stored in this FluxStore (history is ignored).
*/
Flux<T> retrieveNewValues();
Flux<O> retrieveNewValues();
/**
* @param timestampExtractor a function to extract the epoch time from the values.
* @return a Flux that can be used to replay the history with multiple strategies.
*/
ReplayFlux<T> replayHistory(Function<T, Long> timestampExtractor);
ReplayFlux<O> replayHistory(Function<O, Long> timestampExtractor);
}

View File

@ -3,6 +3,7 @@ package ch.streamly.chronicle.flux.replay;
import java.time.Duration;
import java.util.function.Function;
import ch.streamly.domain.ReplayValue;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;

View File

@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.LongStream;
import ch.streamly.domain.ReplayValue;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@ -42,12 +43,11 @@ public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<
}
private Function<T, ReplayValue<T>> wrapAsReplayValue(AtomicBoolean firstValueSent) {
return val -> {
if (!firstValueSent.getAndSet(true)) {
return new ReplayValueImpl<>(true, val);
return ReplayValue.newLoopRestartValue(val);
}
return new ReplayValueImpl<>(val);
return ReplayValue.newValue(val);
};
}
}

View File

@ -1,14 +0,0 @@
package ch.streamly.chronicle.flux.replay;
/**
* A value wrapper that indicates if the current value is the first value replayed in the replay loop.
* @see ReplayInLoop
* @param <T> data type
*/
public interface ReplayValue<T> extends WrappedValue<T> {
/**
* @return true if this object is the loop restart signal (meaning that the replay loop has restarted from the beginning)
*/
boolean isLoopRestart();
}

View File

@ -1,56 +0,0 @@
package ch.streamly.chronicle.flux.replay;
import java.util.Objects;
/**
* Default implementation of a {@link ReplayValue}
* @param <T> data type
*/
public class ReplayValueImpl<T> implements ReplayValue<T>{
private final boolean isLoopReset;
private final T value;
ReplayValueImpl(T value) {
this.isLoopReset = false;
this.value = value;
}
ReplayValueImpl(boolean isLoopReset, T value) {
this.isLoopReset = isLoopReset;
this.value = value;
}
@Override
public boolean isLoopRestart() {
return isLoopReset;
}
@Override
public T value() {
return value;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
ReplayValueImpl<?> that = (ReplayValueImpl<?>) o;
return isLoopReset == that.isLoopReset &&
Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(isLoopReset, value);
}
@Override
public String toString() {
return "ReplayValueImpl{" +
"isLoopRestart=" + isLoopReset +
", value=" + value +
'}';
}
}

View File

@ -5,6 +5,8 @@ import static java.time.Duration.ofMillis;
import java.util.function.Function;
import java.util.function.Predicate;
import ch.streamly.domain.Timed;
import ch.streamly.domain.TimedValue;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

View File

@ -1,10 +0,0 @@
package ch.streamly.chronicle.flux.replay;
/**
* Wraps a value with its timestamp.
*
* @param <T> data type
*/
interface Timed<T> extends WrappedValue<T> {
long time();
}

View File

@ -1,52 +0,0 @@
package ch.streamly.chronicle.flux.replay;
import java.util.Objects;
/**
* Default implementation of a {@link Timed} value.
*
* @param <T>
*/
class TimedValue<T> implements Timed<T> {
private final long time;
private final T value;
TimedValue(long time, T value) {
this.time = time;
this.value = value;
}
@Override
public T value() {
return value;
}
@Override
public long time() {
return time;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TimedValue<?> that = (TimedValue<?>) o;
return time == that.time &&
Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(time, value);
}
@Override
public String toString() {
return "TimedValue{" +
"time=" + time +
", value=" + value +
'}';
}
}

View File

@ -1,5 +1,6 @@
package ch.streamly.chronicle.flux.replay;
import ch.streamly.domain.WrappedValue;
class ValueToDelay<T> implements WrappedValue<T> {
private final long delay;

View File

@ -1,5 +0,0 @@
package ch.streamly.chronicle.flux.replay;
interface WrappedValue<T> {
T value();
}

View File

@ -0,0 +1,99 @@
package ch.streamly.chronicle.flux;
import static ch.streamly.chronicle.flux.util.ChronicleStoreCleanup.deleteStoreIfItExists;
import static java.time.Duration.ofSeconds;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import ch.streamly.domain.WrappedValue;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class ChronicleJournalTest {
private static final String PREFIX = "ChronicleJournalTest";
private static final String ONE = "one";
private static final String TWO = "two";
private static final String THREE = "three";
private static final String FOUR = "four";
private static final DummyObject FIRST = new DummyObject(10000, ONE);
private static final DummyObject SECOND = new DummyObject(11000, TWO);
private static final DummyObject THIRD = new DummyObject(12000, THREE);
private static final DummyObject FOURTH = new DummyObject(15000, FOUR);
private static final Flux<DummyObject> source = Flux.just(FIRST, SECOND, THIRD, FOURTH);
private static final long TIME_1 = 1000L;
private static final long TIME_2 = 2000L;
private static final long TIME_3 = 3000L;
private static final long TIME_4 = 7000L;
private ChronicleJournal<DummyObject> journal;
private String path;
@BeforeEach
void setUp() {
path = PREFIX + UUID.randomUUID().toString();
ConcurrentLinkedQueue<Long> timeQueue = new ConcurrentLinkedQueue<>();
timeQueue.add(TIME_1);
timeQueue.add(TIME_2);
timeQueue.add(TIME_3);
timeQueue.add(TIME_4);
journal = new ChronicleJournal<DummyObject>(path, DummyObject::toBinary, DummyObject::fromBinary) {
@Override
long getCurrentTime() {
return timeQueue.poll();
}
};
}
@AfterEach
void tearDown() {
journal.close();
deleteStoreIfItExists(path);
}
@Test
@DisplayName("tests that a timed data stream is stored in the Chronicle journal")
void shouldStoreStreamWithTimeStamps() {
journal.store(source);
StepVerifier.create(journal.retrieveAll())
.expectSubscription()
.assertNext(i -> {
Assertions.assertEquals(FIRST, i.value());
Assertions.assertEquals(TIME_1, i.time());
})
.assertNext(i -> {
Assertions.assertEquals(SECOND, i.value());
Assertions.assertEquals(TIME_2, i.time());
})
.assertNext(i -> {
Assertions.assertEquals(THIRD, i.value());
Assertions.assertEquals(TIME_3, i.time());
})
.thenCancel()
.verify(Duration.ofMillis(500));
}
@Test
@DisplayName("tests that the Chronicle journal can replay history with the timestamps assigned by the journal")
void replayHistoryWithJournalTime() {
journal.store(source);
StepVerifier.withVirtualTime(() -> journal.replayHistory().withOriginalTiming().map(WrappedValue::value))
.expectSubscription()
.expectNext(FIRST)
.expectNoEvent(ofSeconds(1))
.expectNext(SECOND)
.expectNoEvent(ofSeconds(1))
.expectNext(THIRD)
.expectNoEvent(ofSeconds(4))
.expectNext(FOURTH)
.thenCancel()
.verify(Duration.ofMillis(500));
}
}

View File

@ -4,6 +4,7 @@ import static ch.streamly.chronicle.flux.util.ChronicleStoreCleanup.deleteStoreI
import static java.time.Duration.ofSeconds;
import java.time.Duration;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -13,8 +14,9 @@ import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
class ChronicleStoreTest {
private static final String PATH = "ChronicleStoreTest";
private static final String PREFIX = "ChronicleStoreTest";
private static final String ONE = "one";
private static final String TWO = "two";
private static final String THREE = "three";
@ -23,19 +25,20 @@ class ChronicleStoreTest {
private static final DummyObject SECOND = new DummyObject(11000, TWO);
private static final DummyObject THIRD = new DummyObject(12000, THREE);
private static final DummyObject FOURTH = new DummyObject(15000, FOUR);
private static final Flux<DummyObject> source = Flux.just(FIRST, SECOND, THIRD, FOURTH
);
private static final Flux<DummyObject> source = Flux.just(FIRST, SECOND, THIRD, FOURTH);
private ChronicleStore<DummyObject> store;
private String path;
@BeforeEach
void setUp() {
deleteStoreIfItExists(PATH);
store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary);
path = PREFIX + UUID.randomUUID().toString();
store = new ChronicleStore<>(path, DummyObject::toBinary, DummyObject::fromBinary);
}
@AfterEach
void tearDown() {
store.close();
deleteStoreIfItExists(path);
}
@Test

View File

@ -4,8 +4,8 @@ import static java.time.Duration.ofSeconds;
import java.time.Instant;
import ch.streamly.chronicle.flux.replay.ReplayValue;
import ch.streamly.chronicle.flux.replay.ReplayInLoop;
import ch.streamly.domain.ReplayValue;
import reactor.core.publisher.Flux;
class ReplayLoopDemo {

View File

@ -12,6 +12,7 @@ import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import ch.streamly.chronicle.flux.DummyObject;
import ch.streamly.domain.ReplayValue;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

View File

@ -1,36 +0,0 @@
package ch.streamly.chronicle.flux.replay;
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
class ReplayValueImplTest {
@Test
@DisplayName("test equals and hashcode")
void testDataClass() {
String value = "testValue";
ReplayValue<String> first = new ReplayValueImpl<>(true, value);
ReplayValue<String> second = new ReplayValueImpl<>(true, value);
ReplayValue<String> third = new ReplayValueImpl<>(false, value);
assertEquals(first, second);
assertEquals(first, first);
assertNotEquals(first, value);
assertNotEquals(first, third);
assertEquals(first.hashCode(), second.hashCode());
}
@Test
@DisplayName("test equals and hashcode with null value")
void testDataClassWithNullValue() {
ReplayValue<String> first = new ReplayValueImpl<>(true, null);
ReplayValue<String> second = new ReplayValueImpl<>(true, null);
ReplayValue<String> third = new ReplayValueImpl<>(false, null);
assertEquals(first, second);
assertEquals(first, first);
assertNotEquals(first, third);
assertEquals(first.hashCode(), second.hashCode());
}
}

View File

@ -1,35 +0,0 @@
package ch.streamly.chronicle.flux.replay;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
class TimedValueTest {
private static final long TIME = 42;
@Test
@DisplayName("test equals and hashcode")
void testDataClass() {
String value = "testValue";
TimedValue<String> first = new TimedValue<>(TIME, value);
TimedValue<String> second = new TimedValue<>(TIME, value);
assertEquals(first, second);
assertEquals(first, first);
assertNotEquals(first, TIME);
assertEquals(first.hashCode(), second.hashCode());
}
@Test
@DisplayName("test equals and hashcode with null value")
void testDataClassWithNullValue() {
TimedValue<?> first = new TimedValue<>(TIME, null);
TimedValue<?> second = new TimedValue<>(TIME, null);
assertEquals(first, second);
assertEquals(first, first);
assertEquals(first.hashCode(), second.hashCode());
}
}

View File

@ -9,17 +9,24 @@ public class ChronicleStoreCleanup {
public static void deleteStoreIfItExists(String path) {
File directory = new File(".");
boolean done = false;
long startTime = System.currentTimeMillis();
Exception exception = null;
while(!done && System.currentTimeMillis() - startTime < 5000){
try {
deleteStore(path, directory);
done = true;
} catch (IOException e) {
//try again
exception = e;
try {
Thread.sleep(500);
deleteStore(path, directory);
} catch (Exception exception) {
System.err.println("Error while deleting store "+exception);
Thread.sleep(100);
} catch (InterruptedException interrupted) {
System.out.println("store deletion interrupted "+interrupted);
}
}
}
if(!done){
System.err.println("Error while deleting store "+exception);
}
}
@ -29,6 +36,8 @@ public class ChronicleStoreCleanup {
if (storePath.exists()) {
FileUtils.deleteDirectory(storePath);
System.out.println("Deleted existing store");
}else{
System.out.println("Path does not exists "+path);
}
}
}