Add ChronicleStore and some demo class

This commit is contained in:
mgabriel 2018-08-13 17:31:45 +02:00
parent 96a7120f50
commit 5dee0624c2
11 changed files with 376 additions and 9 deletions

View File

@ -26,6 +26,7 @@ dependencies {
compile "org.slf4j:slf4j-api:1.7.25"
compile "io.projectreactor:reactor-core:$reactorVersion"
compile "net.openhft:chronicle-queue:$chronicleVersion"
testCompile "com.google.guava:guava:26.0-jre"
}

View File

@ -0,0 +1,220 @@
package com.mgabriel.chronicle.flux;
import java.io.File;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mgabriel.chronicle.flux.replay.ReplayFlux;
import net.openhft.chronicle.bytes.BytesIn;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
/**
* @author mgabriel.
*/
public class ChronicleStore<T> implements FluxStore<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(ChronicleStore.class);
private final Function<T, byte[]> serializer;
private final Function<byte[], T> deserializer;
private final SingleChronicleQueue queue;
private final RollCycle rollCycle;
public ChronicleStore(String path, Function<T, byte[]> serializer,
Function<byte[], T> deserializer) {
this(ChronicleStore.<T>newBuilder()
.path(path)
.serializer(serializer)
.deserializer(deserializer));
}
private ChronicleStore(ChronicleStoreBuilder<T> builder) {
String path = builder.path;
serializer = builder.serializer;
deserializer = builder.deserializer;
rollCycle = builder.rollCycle;
this.queue = SingleChronicleQueueBuilder.binary(path).rollCycle(rollCycle).build();
}
public static <BT> ChronicleStoreBuilder<BT> newBuilder() {
return new ChronicleStoreBuilder<>();
}
@Override
public Disposable store(Publisher<T> toStore) {
ExcerptAppender appender = queue.acquireAppender();
return Flux.from(toStore)
.doOnError(err -> LOGGER.error("Error received", err))
.subscribe(v -> {
byte[] bytesToStore = serializer.apply(v);
appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore));
});
}
@Override
public void store(T item) {
ExcerptAppender appender = queue.acquireAppender();
byte[] bytesToStore = serializer.apply(item);
appender.writeBytes(b -> b.writeInt(bytesToStore.length).write(bytesToStore));
}
private enum ReaderType {
ALL,
ONLY_HISTORY
}
private void readTailer(ExcerptTailer tailer, FluxSink<T> sink,
ReaderType readerType, boolean deleteAfterRead) {
int previousCycle = 0;
try {
while (!sink.isCancelled()) {
if (sink.requestedFromDownstream() > 0) {
boolean present = tailer.readBytes(b -> readAndSendValue(sink, b));
if (!present) {
if (readerType == ReaderType.ONLY_HISTORY) {
sink.complete();
} else {
try {
Thread.sleep(10); //waiting for data to appear on the queue
} catch (InterruptedException e) {
traceInterrupt(e);
}
}
}
} else {
try {
Thread.sleep(100); //waiting for requests on the flux
} catch (InterruptedException e) {
traceInterrupt(e);
}
}
int cycle = rollCycle.toCycle(tailer.index());
if (cycle != previousCycle) {
if (deleteAfterRead) {
deleteFile(previousCycle);
}
previousCycle = cycle;
}
}
} catch (Exception e) {
LOGGER.error("Error while tailing on queue {}", tailer.queue().file().getAbsolutePath(), e);
}
}
private void traceInterrupt(InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.trace("interrupted " + e);
}
private void deleteFile(int previousCycle) {
WireStore wireStore = queue.storeForCycle(previousCycle, 0, false);
if (wireStore != null) {
File file = wireStore.file();
if (file != null) {
try {
boolean deleted = file.delete();
if (deleted) {
LOGGER.trace("file {} deleted after read", file.getAbsolutePath());
} else {
LOGGER.error("Could not delete file {}", file.getAbsolutePath());
}
} catch (Exception e) {
LOGGER.error("Could not delete file {}", file.getAbsolutePath(), e);
}
} else {
LOGGER.error("Could not find file for cycle {}", previousCycle);
}
} else {
LOGGER.trace("wirestore is null for cycle {}", previousCycle);
}
}
private void readAndSendValue(FluxSink<T> sink, BytesIn b) {
int size = b.readInt();
byte[] bytes = new byte[size];
b.read(bytes);
T value = deserializer.apply(bytes);
sink.next(value);
}
@Override
public Flux<T> retrieveAll(boolean deleteAfterRead) {
return Flux.create(sink -> launchTailer(sink, ReaderType.ALL, deleteAfterRead));
}
private void launchTailer(FluxSink<T> sink, ReaderType readerType, boolean deleteAfterRead) {
launchTailer(sink, queue.createTailer(), readerType, deleteAfterRead);
}
private void launchTailer(FluxSink<T> sink, ExcerptTailer tailer, ReaderType readerType, boolean deleteAfterRead) {
String path = tailer.queue().file().getAbsolutePath();
Thread t = new Thread(
() -> readTailer(tailer, sink, readerType, deleteAfterRead),
"ChronicleStoreRetrieve_" + path);
t.setDaemon(true);
t.run();
}
@Override
public Flux<T> retrieveHistory() {
return Flux.create(sink -> launchTailer(sink, ReaderType.ONLY_HISTORY, false));
}
@Override
public Flux<T> retrieveNewValues() {
ExcerptTailer tailer = queue.createTailer().toEnd();
return Flux.create(sink -> launchTailer(sink, tailer, ReaderType.ALL, false));
}
@Override
public ReplayFlux<T> replayHistory(Function<T, Long> timestampExtractor) {
Flux<T> historySource = Flux.defer(this::retrieveHistory);
return new ReplayFlux<>(historySource, timestampExtractor);
}
public static final class ChronicleStoreBuilder<T> {
private String path;
private Function<T, byte[]> serializer;
private Function<byte[], T> deserializer;
private RollCycle rollCycle = RollCycles.DAILY;
private ChronicleStoreBuilder() {
}
public ChronicleStoreBuilder<T> path(String path) {
this.path = path;
return this;
}
public ChronicleStoreBuilder<T> serializer(Function<T, byte[]> serializer) {
this.serializer = serializer;
return this;
}
public ChronicleStoreBuilder<T> deserializer(Function<byte[], T> deserializer) {
this.deserializer = deserializer;
return this;
}
public ChronicleStoreBuilder<T> rollCycle(RollCycle rollCycle) {
this.rollCycle = rollCycle;
return this;
}
public ChronicleStore<T> build() {
return new ChronicleStore<>(this);
}
}
}

View File

@ -8,7 +8,6 @@ import reactor.core.Scannable;
import reactor.core.publisher.Flux;
public class ReplayFlux<T> extends Flux<T> implements Scannable {
private final Flux<T> source;
private final Function<T, Long> timestampExtractor;

View File

@ -1,7 +1,5 @@
package com.mgabriel.chronicle.flux.replay;
import com.mgabriel.chronicle.flux.WrappedValue;
public interface ReplayValue<T> extends WrappedValue<T> {
/**

View File

@ -1,7 +1,5 @@
package com.mgabriel.chronicle.flux.replay;
import com.mgabriel.chronicle.flux.WrappedValue;
public interface Timed<T> extends WrappedValue<T> {
long time();
}

View File

@ -1,7 +1,5 @@
package com.mgabriel.chronicle.flux.replay;
import com.mgabriel.chronicle.flux.WrappedValue;
class ValueToDelay<T> implements WrappedValue<T> {
private final long delay;
private final T value;

View File

@ -1,4 +1,4 @@
package com.mgabriel.chronicle.flux;
package com.mgabriel.chronicle.flux.replay;
public interface WrappedValue<T> {
T value();

View File

@ -0,0 +1,33 @@
package com.mgabriel.chronicle.flux;
import java.time.Duration;
import java.time.Instant;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
/**
* @author mgabriel.
*/
public class ChronicleStoreDemo {
public static void main(String[] args) throws InterruptedException {
ChronicleStore<DummyObject> store = new ChronicleStore<>("demoChronicleStore", v -> v.toBinary(), DummyObject::fromBinary);
Flux<String> source = Flux.just("one", "two", "three")
.concatWith(Flux.just("four").delayElements(Duration.ofSeconds(1)))
.concatWith(Flux.just("five").delayElements(Duration.ofSeconds(2)));
//source.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();
// Disposable handle = store.store(source.map(v -> new DummyObject(System.currentTimeMillis(), v)));
Thread.sleep(4000);
store.replayHistory(DummyObject::getTimestamp)
.withOriginalTiming()
.inLoop(Duration.ofSeconds(1))
.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();
}
}

View File

@ -0,0 +1,70 @@
package com.mgabriel.chronicle.flux;
import java.nio.charset.Charset;
import java.util.Objects;
import com.google.common.primitives.Longs;
/**
* @author mgabriel.
*/
public class DummyObject {
private static final Charset UTF_8 = Charset.forName("UTF-8");
private final long timestamp;
private final String value;
public DummyObject(long timestamp, String value) {
this.timestamp = timestamp;
this.value = value;
}
public String getValue() {
return value;
}
public long getTimestamp() {
return timestamp;
}
public byte[] toBinary(){
byte[] time = Longs.toByteArray(timestamp);
byte[] val = value.getBytes(UTF_8);
byte[] result = new byte[time.length + val.length];
System.arraycopy(time, 0, result, 0, time.length);
System.arraycopy(val, 0, result, time.length, val.length);
return result;
}
public static DummyObject fromBinary(byte[] bytes){
byte[] time = new byte[8];
byte[] val = new byte[bytes.length - 8];
System.arraycopy(bytes, 0, time, 0, time.length);
System.arraycopy(bytes, 8, val, 0, val.length);
return new DummyObject(Longs.fromByteArray(time), new String(val, UTF_8));
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DummyObject that = (DummyObject) o;
return Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value);
}
@Override
public String toString() {
return "DummyObject{" +
"timestamp=" + timestamp +
", value='" + value + '\'' +
'}';
}
}

View File

@ -0,0 +1,51 @@
package com.mgabriel.chronicle.flux;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
/**
* @author mgabriel.
*/
public class TryFluxRequest {
public static void main(String[] args) throws Exception {
Flux<String> source = Flux.<String>create(sink -> {
int i = 0;
while (!sink.isCancelled()) {
if (sink.requestedFromDownstream() > 0) {
sink.next("" + i++);
if(i == 7){
sink.complete();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("interrupted");
}
}
}
System.out.println("CANCELLED!!!!!!!!!!!!!");
}
);
System.out.println("max val =" + Long.MAX_VALUE);
Disposable sub = source.doOnRequest(r -> System.out.println("~~~~~ requested " + r))
//.take(20)
.doOnNext(i -> System.out.println(i))
.subscribeOn(Schedulers.newSingle("lpop"))
.subscribe();
Thread.sleep(10_000);
sub.dispose();
System.out.println("disposed");
}
}

View File

@ -3,7 +3,6 @@ package com.mgabriel.chronicle.flux.replay;
import java.time.Duration;
import java.time.Instant;
import com.mgabriel.chronicle.flux.WrappedValue;
import reactor.core.publisher.Flux;
public class ReplayWithOriginalTimingDemo {