Add documentation
This commit is contained in:
parent
4a047df314
commit
6e64c9e54b
76
README.md
76
README.md
@ -9,3 +9,79 @@
|
||||
|
||||
## A reactive driver for chronicle-queue
|
||||
|
||||
This project builds on [Chronicle Queue](https://github.com/OpenHFT/Chronicle-Queue) (see also their [product](https://chronicle.software/products/queue/) page) to offer a simple reactive data store.
|
||||
|
||||
Chronicle Queue is a low latency library that persists everything on disk with memory mapped files (off-heap).
|
||||
|
||||
### Functionalities
|
||||
With the provided ChronicleStore, you can:
|
||||
- Store a reactive stream.
|
||||
- Subscribe to the history + live updates of the store with a reactive stream (with a [Flux](http://projectreactor.io/docs/core/release/reference/#flux) from project [reactor-core](https://github.com/reactor/reactor-core)).
|
||||
- Replay the history with the original timing, with a time acceleration (e.g. replay at twice the speed).
|
||||
- Replay the history in a loop.
|
||||
|
||||
### Use cases
|
||||
This project can be useful for different use cases:
|
||||
- event sourcing, to store all events entering an application.
|
||||
- you can use it as an off-heap buffer in a reactive stream if you need to handle data bursts.
|
||||
- testing, to replay the history of an application.
|
||||
- performance testing, to replay the events at higher speeds.
|
||||
- a simple alternative to Kafka for basic uses cases: no need to install anything, it's all persisted on disk :smiley:
|
||||
|
||||
|
||||
### Example usage
|
||||
|
||||
|
||||
####Create a Chronicle Store:
|
||||
|
||||
Please note that you must provide a way to serialize the data as binary.
|
||||
The easiest solution is to use a binary serialization protocol such as [Protobuf](https://developers.google.com/protocol-buffers/), [Avro](https://avro.apache.org/docs/current/), etc. In this case the binary serialization will be supported out of the box.
|
||||
|
||||
The first argument is the directory path used to persist the on disk.
|
||||
Since Chronicle is using memory mapped file, **you should not use a path mounted on a network file system**, for more details, have a look at the [Chronicle documentation](https://github.com/OpenHFT/Chronicle-Queue#chronicle-queue)
|
||||
The 2nd and 3rd arguments are the serializer and deserializer for your data.
|
||||
|
||||
```java
|
||||
ChronicleStore<DummyObject> chronicleStore = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary);
|
||||
```
|
||||
|
||||
####Store a stream of data
|
||||
|
||||
The store method will return a handle that can be used to stop the storage.
|
||||
Otherwise the data stream will be store until it completes or an error is received on the stream.
|
||||
|
||||
```java
|
||||
Flux<DummyObject> source = ...
|
||||
Disposable handle = chronicleStore.store(source);
|
||||
```
|
||||
|
||||
####Subscribe to the store
|
||||
|
||||
We can subscribe to the store and print old values, as well as new values being persisted in the store.
|
||||
|
||||
```java
|
||||
Flux<DummyObject> allValues = chronicleStore.retrieveAll();
|
||||
allValues.doOnNext(System.out::println)
|
||||
.blockLast();
|
||||
```
|
||||
|
||||
We can also replay the history with the same timing as the original stream:
|
||||
|
||||
```java
|
||||
chronicleStore.replayHistory(DummyObject::timestamp)
|
||||
.withOriginalTiming()
|
||||
.inLoop()
|
||||
.doOnNext(i -> System.out.println(Instant.now() + " " + i))
|
||||
.blockLast();
|
||||
```
|
||||
|
||||
In order to replay data with the original timing, we need to provide a function to extract the epoch time (in milliseconds) from the data.
|
||||
Chronicle-flux could potentially add a timestamp to the values when they are persisted, but this would be a poor design choice since the data would be timestamped at the very end of the chain, possibly after passing through several queues. It is in general a better idea to add a timestamp on the data as soon as they enter the system in order to have an accurate time.
|
||||
|
||||
|
||||
####Runnable demo
|
||||
|
||||
if you want to run some code samples, have a look at the demo folder in the test directory that contains several runnable classes.
|
||||
|
||||
|
||||
|
||||
|
@ -24,21 +24,22 @@ class ChronicleStoreDemo {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
deleteStoreIfItExists(PATH);
|
||||
|
||||
ChronicleStore<DummyObject> store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary);
|
||||
ChronicleStore<DummyObject> chronicleStore = new ChronicleStore<>(PATH, DummyObject::toBinary,
|
||||
DummyObject::fromBinary);
|
||||
|
||||
Flux<String> source = Flux.just("one", "two", "three")
|
||||
.concatWith(Flux.just("four").delayElements(Duration.ofSeconds(1)))
|
||||
.concatWith(Flux.just("five").delayElements(Duration.ofSeconds(2)));
|
||||
|
||||
System.out.println("Storing source flux...");
|
||||
Disposable handle = store.store(source.map(v -> new DummyObject(System.currentTimeMillis(), v)));
|
||||
Disposable handle = chronicleStore.store(source.map(v -> new DummyObject(System.currentTimeMillis(), v)));
|
||||
|
||||
SECONDS.sleep(4); //wait until all items are stored
|
||||
handle.dispose();
|
||||
|
||||
System.out.println("Storage achieved, replaying from store");
|
||||
System.out.println("Storage achieved, replaying from chronicleStore");
|
||||
|
||||
store.replayHistory(DummyObject::timestamp)
|
||||
chronicleStore.replayHistory(DummyObject::timestamp)
|
||||
.withOriginalTiming()
|
||||
.inLoop(Duration.ofSeconds(1))
|
||||
.doOnNext(i -> System.out.println(Instant.now() + " " + i))
|
||||
|
@ -0,0 +1,36 @@
|
||||
package com.mgabriel.chronicle.flux.demo;
|
||||
|
||||
import static com.mgabriel.chronicle.flux.util.ChronicleStoreCleanup.deleteStoreIfItExists;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import com.mgabriel.chronicle.flux.ChronicleStore;
|
||||
import com.mgabriel.chronicle.flux.DummyObject;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
/**
|
||||
* This is a demo usage of a Chronicle store as an off-heap buffer for a reactive stream.
|
||||
* The persisted files will be automatically deleted after the rollover (daily by default).
|
||||
*/
|
||||
public class ReactiveBuffer {
|
||||
|
||||
private static final String PATH = "demoReactiveBuffer";
|
||||
|
||||
public static void main(String[] args) {
|
||||
deleteStoreIfItExists(PATH);
|
||||
|
||||
Flux<DummyObject> source = Flux.interval(Duration.ofSeconds(1)).map(i -> new DummyObject(i, String.valueOf(i)));
|
||||
|
||||
ChronicleStore<DummyObject> chronicleStore = new ChronicleStore<>(PATH, DummyObject::toBinary,
|
||||
DummyObject::fromBinary);
|
||||
Disposable storage = chronicleStore.store(source);
|
||||
|
||||
chronicleStore.retrieveAll(true)
|
||||
.doOnNext(System.out::println)
|
||||
.take(100)
|
||||
.blockLast();
|
||||
|
||||
storage.dispose();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user