documentation

This commit is contained in:
mgabriel 2018-08-14 09:09:00 +02:00
parent 5dee0624c2
commit f64991bd70
5 changed files with 71 additions and 31 deletions

View File

@ -21,6 +21,10 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* Implementation of a {@link FluxStore} backed by a Chronicle Queue.
*
* This store respects the backpressure on the data streams it produces.
*
* @author mgabriel.
*/
public class ChronicleStore<T> implements FluxStore<T> {
@ -31,6 +35,13 @@ public class ChronicleStore<T> implements FluxStore<T> {
private final SingleChronicleQueue queue;
private final RollCycle rollCycle;
/**
*
* @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see https://github.com/OpenHFT/Chronicle-Queue for more details)
* @param serializer data serializer
* @param deserializer data deserializer
*/
public ChronicleStore(String path, Function<T, byte[]> serializer,
Function<byte[], T> deserializer) {
this(ChronicleStore.<T>newBuilder()
@ -48,6 +59,11 @@ public class ChronicleStore<T> implements FluxStore<T> {
}
/**
*
* @param <BT> data type.
* @return a ChronicleStore builder.
*/
public static <BT> ChronicleStoreBuilder<BT> newBuilder() {
return new ChronicleStoreBuilder<>();
}
@ -89,7 +105,8 @@ public class ChronicleStore<T> implements FluxStore<T> {
try {
Thread.sleep(10); //waiting for data to appear on the queue
} catch (InterruptedException e) {
traceInterrupt(e);
//interrupt can happen when the flux is cancelled
Thread.currentThread().interrupt();
}
}
}
@ -97,7 +114,8 @@ public class ChronicleStore<T> implements FluxStore<T> {
try {
Thread.sleep(100); //waiting for requests on the flux
} catch (InterruptedException e) {
traceInterrupt(e);
//interrupt can happen when the flux is cancelled
Thread.currentThread().interrupt();
}
}
int cycle = rollCycle.toCycle(tailer.index());
@ -113,11 +131,6 @@ public class ChronicleStore<T> implements FluxStore<T> {
}
}
private void traceInterrupt(InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.trace("interrupted " + e);
}
private void deleteFile(int previousCycle) {
WireStore wireStore = queue.storeForCycle(previousCycle, 0, false);
if (wireStore != null) {
@ -193,21 +206,38 @@ public class ChronicleStore<T> implements FluxStore<T> {
private ChronicleStoreBuilder() {
}
/**
* @param path path were the Chronicle Queue will store the files.
* This path should not be a network file system (see https://github.com/OpenHFT/Chronicle-Queue for more details)
* @return this builder
*/
public ChronicleStoreBuilder<T> path(String path) {
this.path = path;
return this;
}
/**
* @param serializer data serializer
* @return this builder
*/
public ChronicleStoreBuilder<T> serializer(Function<T, byte[]> serializer) {
this.serializer = serializer;
return this;
}
/**
* @param deserializer data deserializer
* @return this builder
*/
public ChronicleStoreBuilder<T> deserializer(Function<byte[], T> deserializer) {
this.deserializer = deserializer;
return this;
}
/**
* @param rollCycle roll cycle for the files
* @return this builder
*/
public ChronicleStoreBuilder<T> rollCycle(RollCycle rollCycle) {
this.rollCycle = rollCycle;
return this;

View File

@ -15,14 +15,16 @@ import reactor.core.publisher.Flux;
public interface FluxStore<T> {
/**
* Stores all items of the given stream in the chronicle store.
* Stores all items of the given stream until the stream completes or the returned {@link Disposable} is disposed.
* Any error received on the stream will stop the storage.
*
* @param toStore data stream to store.
* @return a disposable that can be used to stop the storage process.
*/
Disposable store(Publisher<T> toStore);
/**
* Stores one item in the chronicle store.
* Stores one item.
*
* @param item item to store.
*/

View File

@ -4,19 +4,22 @@ import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* A transformer that takes a source flux and replays it in a loop.
* The values are wrapped in a {@link ReplayValue} object to indicate when the loop restarts.
* This information can be used by the application to perform some action when the loop restarts (clear caches, etc.)
*
* It is possible to specify a delay before each loop restart.
* Please note that if you chain
*
* @param <T> data type
*/
public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<T>>> {
private static final Logger LOGGER = LoggerFactory.getLogger(ReplayInLoop.class);
private final Duration delayBeforeRestart;
private final Boolean TOKEN = Boolean.FALSE;
public ReplayInLoop(Duration delayBeforeRestart) {
this.delayBeforeRestart = delayBeforeRestart;
@ -24,39 +27,39 @@ public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<
@Override
public Publisher<ReplayValue<T>> apply(Flux<T> source) {
Flux<Flux<ReplayValue<T>>> generate = Flux.create(sink -> {
Flux<Flux<ReplayValue<T>>> fluxLoop = Flux.create(sink -> {
while (!sink.isCancelled()) {
long requested = sink.requestedFromDownstream();
if (requested > 0) {
wrapValues(source, sink);
sink.next(wrapValues(source));
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
//interrupt can happen when the flux is cancelled
Thread.currentThread().interrupt();
LOGGER.info("interrupted " + e);
}
}
}
}
);
Flux<Flux<ReplayValue<T>>> limited = generate.limitRate(1);
return Flux.concat(limited);
return Flux.concat(fluxLoop.limitRate(1)); // limit the rate to avoid creating too many source flux in advance.
}
private void wrapValues(Flux<T> source, FluxSink<Flux<ReplayValue<T>>> sink) {
private Flux<ReplayValue<T>> wrapValues(Flux<T> source) {
AtomicBoolean firstValueSent = new AtomicBoolean(false);
Flux<ReplayValue<T>> nextFlux = source.delaySubscription(delayBeforeRestart).map(wrapAsReplayValue(firstValueSent));
sink.next(nextFlux);
return source.delaySubscription(delayBeforeRestart)
.map(wrapAsReplayValue(firstValueSent));
}
@NotNull
private Function<T, ReplayValue<T>> wrapAsReplayValue(AtomicBoolean firstValueSent) {
return v -> {
return val -> {
if (!firstValueSent.getAndSet(true)) {
return new ReplayValueImpl<>(true, v);
return new ReplayValueImpl<>(true, val);
}
return new ReplayValueImpl<>(v);
return new ReplayValueImpl<>(val);
};
}
}

View File

@ -1,9 +1,14 @@
package com.mgabriel.chronicle.flux.replay;
/**
* A value wrapper that indicates if the current value is the first value replayed in the replay loop.
* @see {@link ReplayInLoop}
* @param <T> data type
*/
public interface ReplayValue<T> extends WrappedValue<T> {
/**
* @return true if this object is the loop reset signal (meaning that the replay loop has restarted from the beginning)
* @return true if this object is the loop restart signal (meaning that the replay loop has restarted from the beginning)
*/
boolean isLoopReset();
boolean isLoopRestart();
}

View File

@ -17,7 +17,7 @@ public class ReplayValueImpl<T> implements ReplayValue<T>{
}
@Override
public boolean isLoopReset() {
public boolean isLoopRestart() {
return isLoopReset;
}
@ -45,7 +45,7 @@ public class ReplayValueImpl<T> implements ReplayValue<T>{
@Override
public String toString() {
return "ReplayValueImpl{" +
"isLoopReset=" + isLoopReset +
"isLoopRestart=" + isLoopReset +
", value=" + value +
'}';
}