documentation

This commit is contained in:
mgabriel 2018-08-14 09:35:56 +02:00
parent f64991bd70
commit 403dc84f2d
11 changed files with 68 additions and 45 deletions

View File

@ -38,7 +38,7 @@ public class ChronicleStore<T> implements FluxStore<T> {
/** /**
* *
* @param path path were the Chronicle Queue will store the files. * @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see https://github.com/OpenHFT/Chronicle-Queue for more details) * This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a>
* @param serializer data serializer * @param serializer data serializer
* @param deserializer data deserializer * @param deserializer data deserializer
*/ */
@ -208,7 +208,7 @@ public class ChronicleStore<T> implements FluxStore<T> {
/** /**
* @param path path were the Chronicle Queue will store the files. * @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see https://github.com/OpenHFT/Chronicle-Queue for more details) * This path should not be a network file system (see <a href="https://github.com/OpenHFT/Chronicle-Queue">the Chronicle queue documentation for more detail</a>
* @return this builder * @return this builder
*/ */
public ChronicleStoreBuilder<T> path(String path) { public ChronicleStoreBuilder<T> path(String path) {

View File

@ -10,7 +10,7 @@ import reactor.core.publisher.Flux;
/** /**
* Reactive store used to store and replay a Flux. * Reactive store used to store and replay a Flux.
* *
* @param <T> the value type * @param <T> the data type
*/ */
public interface FluxStore<T> { public interface FluxStore<T> {

View File

@ -7,10 +7,19 @@ import reactor.core.CoreSubscriber;
import reactor.core.Scannable; import reactor.core.Scannable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
/**
* A flux that can be used to replay historical values with different strategies.
*
* @param <T> data type
*/
public class ReplayFlux<T> extends Flux<T> implements Scannable { public class ReplayFlux<T> extends Flux<T> implements Scannable {
private final Flux<T> source; private final Flux<T> source;
private final Function<T, Long> timestampExtractor; private final Function<T, Long> timestampExtractor;
/**
* @param source the source flux.
* @param timestampExtractor extracts the epoch time in ms from the values.
*/
public ReplayFlux(Flux<T> source, Function<T, Long> timestampExtractor) { public ReplayFlux(Flux<T> source, Function<T, Long> timestampExtractor) {
this.source = source; this.source = source;
this.timestampExtractor = timestampExtractor; this.timestampExtractor = timestampExtractor;
@ -30,18 +39,33 @@ public class ReplayFlux<T> extends Flux<T> implements Scannable {
return (Scannable) source; return (Scannable) source;
} }
/**
* @return a flux that will replay the values with their original timing
* (e.g. if the values were received with a 1 second interval, the returned flux will emit at a 1 second interval).
*/
public ReplayFlux<T> withOriginalTiming(){ public ReplayFlux<T> withOriginalTiming(){
return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor); return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor);
} }
/**
* @return a flux that will replay the values with a time acceleration applied to their original timing
* (e.g. if the values were received with a 2 second interval, and the time acceleration is 2, then the returned flux will emit at a 1 second interval).
*/
public ReplayFlux<T> withTimeAcceleration(double acceleration){ public ReplayFlux<T> withTimeAcceleration(double acceleration){
return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor); return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor);
} }
/**
* @return a flux that will replay the values in a loop.
*/
public Flux<ReplayValue<T>> inLoop(){ public Flux<ReplayValue<T>> inLoop(){
return inLoop(Duration.ofMillis(0)); return inLoop(Duration.ofMillis(0));
} }
/**
* @param delayBeforeLoopRestart duration to wait before each loop.
* @return a flux that will replay the values in a loop.
*/
public Flux<ReplayValue<T>> inLoop(Duration delayBeforeLoopRestart){ public Flux<ReplayValue<T>> inLoop(Duration delayBeforeLoopRestart){
return source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)); return source.compose(new ReplayInLoop<>(delayBeforeLoopRestart));
} }

View File

@ -4,17 +4,17 @@ import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
/** /**
* A transformer that takes a source flux and replays it in a loop. * A transformer that takes a source flux and replays it in a loop.
* The values are wrapped in a {@link ReplayValue} object to indicate when the loop restarts. * The values are wrapped in a {@link ReplayValue} object to indicate when the loop restarts.
* This information can be used by the application to perform some action when the loop restarts (clear caches, etc.) * This information can be used by the application to perform some actions when the loop restarts (clear caches, etc.)
* *
* It is possible to specify a delay before each loop restart. * It is possible to specify a delay before each loop restart.
* Please note that if you chain * Please note that if you add other operators in the reactive stream after this transformer, you might not see the
* impact of the restart delay since it will be applied as soon as items are requested on the subscription.
* *
* @param <T> data type * @param <T> data type
*/ */
@ -53,7 +53,6 @@ public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<
} }
@NotNull
private Function<T, ReplayValue<T>> wrapAsReplayValue(AtomicBoolean firstValueSent) { private Function<T, ReplayValue<T>> wrapAsReplayValue(AtomicBoolean firstValueSent) {
return val -> { return val -> {
if (!firstValueSent.getAndSet(true)) { if (!firstValueSent.getAndSet(true)) {

View File

@ -1,33 +0,0 @@
package com.mgabriel.chronicle.flux.replay;
import java.time.Duration;
import java.util.function.Function;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
public class ReplayLoopFlux<T> extends Flux<ReplayValue<T>> implements Scannable{
private final Flux<T> source;
private final Duration delayBeforeLoopRestart;
public ReplayLoopFlux(Flux<T> source, Duration delayBeforeLoopRestart) {
this.source = source;
this.delayBeforeLoopRestart = delayBeforeLoopRestart;
}
@Override
public void subscribe(CoreSubscriber<? super ReplayValue<T>> actual) {
source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)).subscribe(actual);
}
@Override
public Object scanUnsafe(Scannable.Attr attr) {
return getScannable().scanUnsafe(attr);
}
private Scannable getScannable() {
return (Scannable) source;
}
}

View File

@ -2,6 +2,10 @@ package com.mgabriel.chronicle.flux.replay;
import java.util.Objects; import java.util.Objects;
/**
* Default implementation of a {@link ReplayValue}
* @param <T> data type
*/
public class ReplayValueImpl<T> implements ReplayValue<T>{ public class ReplayValueImpl<T> implements ReplayValue<T>{
private final boolean isLoopReset; private final boolean isLoopReset;
private final T value; private final T value;

View File

@ -8,15 +8,28 @@ import java.util.function.Predicate;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
/**
* A transformer that takes a source flux and replays the values with their original timing.
* It is also possible to specify a time acceleration factor to increase or decrease the replay speed.
*
* @param <T> data type
*/
public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<T>> { public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<T>> {
private final Function<T, Long> timestampExtractor; private final Function<T, Long> timestampExtractor;
private final double timeAcceleration; private final double timeAcceleration;
private final Timed<T> TOKEN = new TimedValue<>(0, null); private final Timed<T> TOKEN = new TimedValue<>(0, null);
/**
* @param timestampExtractor extracts the epoch time in ms from the values.
*/
public ReplayWithOriginalTiming(Function<T, Long> timestampExtractor) { public ReplayWithOriginalTiming(Function<T, Long> timestampExtractor) {
this(timestampExtractor, 1); this(timestampExtractor, 1);
} }
/**
* @param timestampExtractor extracts the epoch time in ms from the values.
* @param timeAcceleration time acceleration factor.
*/
public ReplayWithOriginalTiming(Function<T, Long> timestampExtractor, double timeAcceleration) { public ReplayWithOriginalTiming(Function<T, Long> timestampExtractor, double timeAcceleration) {
this.timestampExtractor = timestampExtractor; this.timestampExtractor = timestampExtractor;
this.timeAcceleration = timeAcceleration; this.timeAcceleration = timeAcceleration;

View File

@ -1,5 +1,10 @@
package com.mgabriel.chronicle.flux.replay; package com.mgabriel.chronicle.flux.replay;
public interface Timed<T> extends WrappedValue<T> { /**
* Wraps a value with its timestamp.
*
* @param <T> data type
*/
interface Timed<T> extends WrappedValue<T> {
long time(); long time();
} }

View File

@ -1,4 +1,9 @@
package com.mgabriel.chronicle.flux.replay; package com.mgabriel.chronicle.flux.replay;
public interface TimedReplayValue<T> extends Timed<T>, ReplayValue<T>{ /**
* Wraps a {@link ReplayValue} with its timestamp.
*
* @param <T> data type
*/
interface TimedReplayValue<T> extends Timed<T>, ReplayValue<T>{
} }

View File

@ -2,11 +2,16 @@ package com.mgabriel.chronicle.flux.replay;
import java.util.Objects; import java.util.Objects;
public class TimedValue<T> implements Timed<T> { /**
* Default implementation of a {@link Timed} value.
*
* @param <T>
*/
class TimedValue<T> implements Timed<T> {
private final long time; private final long time;
private final T value; private final T value;
public TimedValue(long time, T value) { TimedValue(long time, T value) {
this.time = time; this.time = time;
this.value = value; this.value = value;
} }

View File

@ -1,5 +1,6 @@
package com.mgabriel.chronicle.flux.replay; package com.mgabriel.chronicle.flux.replay;
class ValueToDelay<T> implements WrappedValue<T> { class ValueToDelay<T> implements WrappedValue<T> {
private final long delay; private final long delay;
private final T value; private final T value;
@ -14,7 +15,7 @@ class ValueToDelay<T> implements WrappedValue<T> {
return value; return value;
} }
public long delay() { long delay() {
return delay; return delay;
} }
} }