first tests with operators

This commit is contained in:
mgabriel 2018-08-12 22:09:47 +02:00
parent 8dbb6984fd
commit 96a7120f50
6 changed files with 75 additions and 18 deletions

View File

@ -4,9 +4,10 @@ import java.time.Duration;
import java.util.function.Function; import java.util.function.Function;
import reactor.core.CoreSubscriber; import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public class ReplayFlux<T> extends Flux<T> { public class ReplayFlux<T> extends Flux<T> implements Scannable {
private final Flux<T> source; private final Flux<T> source;
private final Function<T, Long> timestampExtractor; private final Function<T, Long> timestampExtractor;
@ -18,14 +19,31 @@ public class ReplayFlux<T> extends Flux<T> {
@Override @Override
public void subscribe(CoreSubscriber<? super T> actual) { public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(actual);
} }
public ReplayLoopFlux<T> inLoop(){ @Override
public Object scanUnsafe(Attr attr) {
return getScannable().scanUnsafe(attr);
}
private Scannable getScannable() {
return (Scannable) source;
}
public ReplayFlux<T> withOriginalTiming(){
return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor)), timestampExtractor);
}
public ReplayFlux<T> withTimeAcceleration(double acceleration){
return new ReplayFlux<>(source.compose(new ReplayWithOriginalTiming<>(timestampExtractor, acceleration)), timestampExtractor);
}
public Flux<ReplayValue<T>> inLoop(){
return inLoop(Duration.ofMillis(0)); return inLoop(Duration.ofMillis(0));
} }
public ReplayLoopFlux<T> inLoop(Duration delayBeforeLoopRestart){ public Flux<ReplayValue<T>> inLoop(Duration delayBeforeLoopRestart){
return new ReplayLoopFlux<>(source, timestampExtractor, delayBeforeLoopRestart); return source.compose(new ReplayInLoop<>(delayBeforeLoopRestart));
} }
} }

View File

@ -1,7 +1,6 @@
package com.mgabriel.chronicle.flux.replay; package com.mgabriel.chronicle.flux.replay;
import java.time.Duration; import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function; import java.util.function.Function;
@ -10,12 +9,14 @@ import org.slf4j.LoggerFactory;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.Scannable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink;
public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<T>>> { public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<T>>> {
private static final Logger LOGGER = LoggerFactory.getLogger(ReplayInLoop.class); private static final Logger LOGGER = LoggerFactory.getLogger(ReplayInLoop.class);
private final Duration delayBeforeRestart; private final Duration delayBeforeRestart;
private final Boolean TOKEN = Boolean.FALSE;
public ReplayInLoop(Duration delayBeforeRestart) { public ReplayInLoop(Duration delayBeforeRestart) {
this.delayBeforeRestart = delayBeforeRestart; this.delayBeforeRestart = delayBeforeRestart;
@ -45,9 +46,8 @@ public class ReplayInLoop<T> implements Function<Flux<T>, Publisher<ReplayValue<
private void wrapValues(Flux<T> source, FluxSink<Flux<ReplayValue<T>>> sink) { private void wrapValues(Flux<T> source, FluxSink<Flux<ReplayValue<T>>> sink) {
AtomicBoolean firstValueSent = new AtomicBoolean(false); AtomicBoolean firstValueSent = new AtomicBoolean(false);
Flux<ReplayValue<T>> nextFlux = source.delaySequence(delayBeforeRestart).map(wrapAsReplayValue(firstValueSent)).doOnNext(v -> System.out Flux<ReplayValue<T>> nextFlux = source.delaySubscription(delayBeforeRestart).map(wrapAsReplayValue(firstValueSent));
.println("\t\t\taaaa "+ Instant.now()+" " +v)); sink.next(nextFlux);
sink.next(Flux.defer(() -> nextFlux));
} }
@NotNull @NotNull

View File

@ -4,22 +4,30 @@ import java.time.Duration;
import java.util.function.Function; import java.util.function.Function;
import reactor.core.CoreSubscriber; import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public class ReplayLoopFlux<T> extends Flux<ReplayValue<T>> { public class ReplayLoopFlux<T> extends Flux<ReplayValue<T>> implements Scannable{
private final Flux<T> source; private final Flux<T> source;
private final Function<T, Long> timestampExtractor;
private final Duration delayBeforeLoopRestart; private final Duration delayBeforeLoopRestart;
public ReplayLoopFlux(Flux<T> source, Function<T, Long> timestampExtractor, Duration delayBeforeLoopRestart) { public ReplayLoopFlux(Flux<T> source, Duration delayBeforeLoopRestart) {
this.source = source; this.source = source;
this.timestampExtractor = timestampExtractor;
this.delayBeforeLoopRestart = delayBeforeLoopRestart; this.delayBeforeLoopRestart = delayBeforeLoopRestart;
} }
@Override @Override
public void subscribe(CoreSubscriber<? super ReplayValue<T>> actual) { public void subscribe(CoreSubscriber<? super ReplayValue<T>> actual) {
source.compose(new ReplayInLoop<>(delayBeforeLoopRestart)).subscribe(actual);
}
@Override
public Object scanUnsafe(Scannable.Attr attr) {
return getScannable().scanUnsafe(attr);
}
private Scannable getScannable() {
return (Scannable) source;
} }
} }

View File

@ -10,10 +10,16 @@ import reactor.core.publisher.Flux;
public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<T>> { public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<T>> {
private final Function<T, Long> timestampExtractor; private final Function<T, Long> timestampExtractor;
private final double timeAcceleration;
private final Timed<T> TOKEN = new TimedValue<>(0, null); private final Timed<T> TOKEN = new TimedValue<>(0, null);
public ReplayWithOriginalTiming(Function<T, Long> timestampExtractor) { public ReplayWithOriginalTiming(Function<T, Long> timestampExtractor) {
this(timestampExtractor, 1);
}
public ReplayWithOriginalTiming(Function<T, Long> timestampExtractor, double timeAcceleration) {
this.timestampExtractor = timestampExtractor; this.timestampExtractor = timestampExtractor;
this.timeAcceleration = timeAcceleration;
} }
@Override @Override
@ -33,7 +39,7 @@ public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<
private Function<TimedValuePair<T>, ValueToDelay<T>> calculateDelay() { private Function<TimedValuePair<T>, ValueToDelay<T>> calculateDelay() {
return tvp -> { return tvp -> {
long timeDifference = tvp.timeDifference(); long timeDifference = Double.valueOf(tvp.timeDifference() / timeAcceleration).longValue();
if (timeDifference < 0 || tvp.first == TOKEN) { if (timeDifference < 0 || tvp.first == TOKEN) {
timeDifference = 0; timeDifference = 0;
} }
@ -42,8 +48,7 @@ public class ReplayWithOriginalTiming<T> implements Function<Flux<T>, Publisher<
} }
private Function<ValueToDelay<T>, Publisher<?>> applyDelay() { private Function<ValueToDelay<T>, Publisher<?>> applyDelay() {
return vtd -> { return vtd -> Flux.just(TOKEN).delayElements(ofMillis(vtd.delay()));
return Flux.just(TOKEN).delayElements(ofMillis(vtd.delay()));};
} }
private static class TimedValuePair<T> { private static class TimedValuePair<T> {

View File

@ -0,0 +1,22 @@
package com.mgabriel.chronicle.flux.replay;
import static java.time.Duration.ofSeconds;
import java.time.Instant;
import reactor.core.publisher.Flux;
public class ReplayFluxDemo {
public static void main(String[] args) {
Flux<Long> source = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L);
ReplayFlux<Long> replayFlux = new ReplayFlux<>(source, v -> v);
replayFlux.withTimeAcceleration(2).inLoop(ofSeconds(1))
.doOnNext(i -> System.out.println(Instant.now() + " " + i))
.blockLast();
}
}

View File

@ -3,6 +3,7 @@ package com.mgabriel.chronicle.flux.replay;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import com.mgabriel.chronicle.flux.WrappedValue;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public class ReplayWithOriginalTimingDemo { public class ReplayWithOriginalTimingDemo {
@ -11,8 +12,11 @@ public class ReplayWithOriginalTimingDemo {
Flux<Long> just = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L); Flux<Long> just = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L);
// Flux<ReplayValue<Long>> result = just.compose(new ReplayWithOriginalTiming<>(l -> l)).compose(new ReplayInLoop<>(Duration.ofSeconds(1))); Flux<ReplayValue<Long>> result = just.compose(new ReplayWithOriginalTiming<>(l -> l)).compose(new ReplayInLoop<>(Duration.ofSeconds(1)));
Flux<ReplayValue<Long>> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(1))).compose(new ReplayWithOriginalTiming<>(l -> l.value()));
// In this order the delay of the ReplayInLoop operator is not working because the elements in the replayInLoop
// are emitted while the delay is being applied in the ReplayWithOriginalTiming operator, making it looks as if there was no initial delay
// Flux<ReplayValue<Long>> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(1))).compose(new ReplayWithOriginalTiming<>(WrappedValue::value));
result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast(); result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast();