cleanup demo folder
This commit is contained in:
parent
e504f03e31
commit
f9b4c563fc
@ -14,6 +14,9 @@ import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
/**
|
||||
* Simple demo showing how to store values in a Chronicle store and replay them in a loop with the same timing as the
|
||||
* original values
|
||||
*
|
||||
* @author mgabriel.
|
||||
*/
|
||||
public class ChronicleStoreDemo {
|
||||
@ -21,7 +24,6 @@ public class ChronicleStoreDemo {
|
||||
private static final String PATH = "demoChronicleStore";
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
||||
deleteStoreIfItExists();
|
||||
|
||||
ChronicleStore<DummyObject> store = new ChronicleStore<>(PATH, DummyObject::toBinary, DummyObject::fromBinary);
|
||||
|
@ -12,10 +12,9 @@ public class ReplayFluxDemo {
|
||||
public static void main(String[] args) {
|
||||
|
||||
Flux<Long> source = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L);
|
||||
|
||||
ReplayFlux<Long> replayFlux = new ReplayFlux<>(source, v -> v);
|
||||
|
||||
replayFlux.withTimeAcceleration(2).inLoop(ofSeconds(1))
|
||||
replayFlux.withTimeAcceleration(2)
|
||||
.inLoop(ofSeconds(1))
|
||||
.doOnNext(i -> System.out.println(Instant.now() + " " + i))
|
||||
.blockLast();
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.mgabriel.chronicle.flux.demo;
|
||||
|
||||
import java.time.Duration;
|
||||
import static java.time.Duration.ofSeconds;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
import com.mgabriel.chronicle.flux.replay.ReplayInLoop;
|
||||
@ -10,12 +11,10 @@ import reactor.core.publisher.Flux;
|
||||
public class ReplayLoopDemo {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
Flux<Long> just = Flux.just(0L, 1L, 2L, 3L, 4L, 5L);
|
||||
|
||||
Flux<ReplayValue<Long>> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(2)));
|
||||
|
||||
result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();
|
||||
Flux<Long> source = Flux.just(0L, 1L, 2L, 3L, 4L, 5L);
|
||||
Flux<ReplayValue<Long>> result = source.transform(new ReplayInLoop<>(ofSeconds(2)));
|
||||
result.doOnNext(i -> System.out.println(Instant.now() + " " + i))
|
||||
.blockLast();
|
||||
|
||||
}
|
||||
|
||||
|
@ -1,27 +1,16 @@
|
||||
package com.mgabriel.chronicle.flux.demo;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
|
||||
import com.mgabriel.chronicle.flux.replay.ReplayInLoop;
|
||||
import com.mgabriel.chronicle.flux.replay.ReplayValue;
|
||||
import com.mgabriel.chronicle.flux.replay.ReplayWithOriginalTiming;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
public class ReplayWithOriginalTimingDemo {
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
Flux<Long> just = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L);
|
||||
|
||||
Flux<ReplayValue<Long>> result = just.compose(new ReplayWithOriginalTiming<>(l -> l)).compose(new ReplayInLoop<>(Duration.ofSeconds(1)));
|
||||
|
||||
// In this order the delay of the ReplayInLoop operator is not working because the elements in the replayInLoop
|
||||
// are emitted while the delay is being applied in the ReplayWithOriginalTiming operator, making it looks as if there was no initial delay
|
||||
// Flux<ReplayValue<Long>> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(1))).compose(new ReplayWithOriginalTiming<>(WrappedValue::value));
|
||||
|
||||
Flux<Long> source = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L);
|
||||
Flux<Long> result = source.transform(new ReplayWithOriginalTiming<>(l -> l));
|
||||
result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,51 +0,0 @@
|
||||
package com.mgabriel.chronicle.flux.demo;
|
||||
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
/**
|
||||
* @author mgabriel.
|
||||
*/
|
||||
public class TryFluxRequest {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
Flux<String> source = Flux.<String>create(sink -> {
|
||||
int i = 0;
|
||||
while (!sink.isCancelled()) {
|
||||
if (sink.requestedFromDownstream() > 0) {
|
||||
sink.next("" + i++);
|
||||
if(i == 7){
|
||||
sink.complete();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
System.out.println("interrupted");
|
||||
}
|
||||
}
|
||||
}
|
||||
System.out.println("CANCELLED!!!!!!!!!!!!!");
|
||||
}
|
||||
);
|
||||
|
||||
System.out.println("max val =" + Long.MAX_VALUE);
|
||||
Disposable sub = source.doOnRequest(r -> System.out.println("~~~~~ requested " + r))
|
||||
//.take(20)
|
||||
.doOnNext(i -> System.out.println(i))
|
||||
.subscribeOn(Schedulers.newSingle("lpop"))
|
||||
.subscribe();
|
||||
|
||||
Thread.sleep(10_000);
|
||||
sub.dispose();
|
||||
System.out.println("disposed");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user