From 8dbb6984fd447df06d898dc310f067998d266232 Mon Sep 17 00:00:00 2001 From: mgabriel Date: Sun, 12 Aug 2018 21:36:25 +0200 Subject: [PATCH] first tests with operators --- .gitignore | 12 ++ build.gradle | 19 +- gradle/wrapper/gradle-wrapper.properties | 5 + gradlew | 172 ++++++++++++++++++ gradlew.bat | 84 +++++++++ .../mgabriel/chronicle/flux/FluxStore.java | 60 ++++++ .../mgabriel/chronicle/flux/WrappedValue.java | 5 + .../chronicle/flux/replay/ReplayFlux.java | 31 ++++ .../chronicle/flux/replay/ReplayInLoop.java | 62 +++++++ .../chronicle/flux/replay/ReplayLoopFlux.java | 25 +++ .../chronicle/flux/replay/ReplayValue.java | 11 ++ .../flux/replay/ReplayValueImpl.java | 52 ++++++ .../flux/replay/ReplayWithOriginalTiming.java | 65 +++++++ .../mgabriel/chronicle/flux/replay/Timed.java | 7 + .../flux/replay/TimedReplayValue.java | 4 + .../chronicle/flux/replay/TimedValue.java | 44 +++++ .../chronicle/flux/replay/ValueToDelay.java | 22 +++ .../chronicle/flux/replay/ReplayLoopDemo.java | 20 ++ .../replay/ReplayWithOriginalTimingDemo.java | 21 +++ 19 files changed, 715 insertions(+), 6 deletions(-) create mode 100644 gradle/wrapper/gradle-wrapper.properties create mode 100644 gradlew create mode 100644 gradlew.bat create mode 100644 src/main/java/com/mgabriel/chronicle/flux/FluxStore.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/WrappedValue.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java create mode 100644 src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java create mode 100644 src/test/java/com/mgabriel/chronicle/flux/replay/ReplayLoopDemo.java create mode 100644 src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java diff --git a/.gitignore b/.gitignore index a1c2a23..947cce7 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,15 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* + +# IntelliJ project files +*.iml +*.ipr +*.iws + + +build/ +out/ +.idea/ +.gradle/ + diff --git a/build.gradle b/build.gradle index 0dadc53..e647706 100644 --- a/build.gradle +++ b/build.gradle @@ -1,9 +1,3 @@ -apply plugin: 'java' -apply plugin: 'eclipse' -apply plugin: 'idea' - -sourceCompatibility = 1.8 - buildscript { repositories { mavenCentral() @@ -13,10 +7,23 @@ buildscript { group 'com.mgabriel' + + +apply plugin: 'java' +apply plugin: 'eclipse' +apply plugin: 'idea' + +sourceCompatibility = 1.8 + def chronicleVersion = '4.6.77' def reactorVersion = '3.1.7.RELEASE' +repositories { + mavenCentral() +} + dependencies { + compile "org.slf4j:slf4j-api:1.7.25" compile "io.projectreactor:reactor-core:$reactorVersion" compile "net.openhft:chronicle-queue:$chronicleVersion" } diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..d2c45a4 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.8-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100644 index 0000000..cccdd3d --- /dev/null +++ b/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..f955316 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/src/main/java/com/mgabriel/chronicle/flux/FluxStore.java b/src/main/java/com/mgabriel/chronicle/flux/FluxStore.java new file mode 100644 index 0000000..be0ebdf --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/FluxStore.java @@ -0,0 +1,60 @@ +package com.mgabriel.chronicle.flux; + +import java.util.function.Function; + +import com.mgabriel.chronicle.flux.replay.ReplayFlux; +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; + +/** + * Reactive store used to store and replay a Flux. + * + * @param the value type + */ +public interface FluxStore { + + /** + * Stores all items of the given stream in the chronicle store. + * + * @param toStore data stream to store. + */ + Disposable store(Publisher toStore); + + /** + * Stores one item in the chronicle store. + * + * @param item item to store. + */ + void store(T item); + + /** + * @return all values present in the store and new values being stored in this FluxStore. + */ + default Flux retrieveAll() { + return retrieveAll(false); + } + + /** + * @param deleteAfterRead if true, the file storing the data on disk will be deleted once it has been read. + * @return all values present in the store and new values being stored in this FluxStore. + */ + Flux retrieveAll(boolean deleteAfterRead); + + /** + * @return all values present in the store and completes the stream. + */ + Flux retrieveHistory(); + + /** + * @return the stream of new values being stored in this FluxStore (history is ignored). + */ + Flux retrieveNewValues(); + + /** + * @param timestampExtractor a function to extract the epoch time from the values. + * @return a Flux that can be used to replay the history with multiple strategies. + */ + ReplayFlux replayHistory(Function timestampExtractor); + +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/WrappedValue.java b/src/main/java/com/mgabriel/chronicle/flux/WrappedValue.java new file mode 100644 index 0000000..837575d --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/WrappedValue.java @@ -0,0 +1,5 @@ +package com.mgabriel.chronicle.flux; + +public interface WrappedValue { + T value(); +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java new file mode 100644 index 0000000..aff4a1b --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayFlux.java @@ -0,0 +1,31 @@ +package com.mgabriel.chronicle.flux.replay; + +import java.time.Duration; +import java.util.function.Function; + +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; + +public class ReplayFlux extends Flux { + + private final Flux source; + private final Function timestampExtractor; + + public ReplayFlux(Flux source, Function timestampExtractor) { + this.source = source; + this.timestampExtractor = timestampExtractor; + } + + @Override + public void subscribe(CoreSubscriber actual) { + + } + + public ReplayLoopFlux inLoop(){ + return inLoop(Duration.ofMillis(0)); + } + + public ReplayLoopFlux inLoop(Duration delayBeforeLoopRestart){ + return new ReplayLoopFlux<>(source, timestampExtractor, delayBeforeLoopRestart); + } +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java new file mode 100644 index 0000000..abbff29 --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayInLoop.java @@ -0,0 +1,62 @@ +package com.mgabriel.chronicle.flux.replay; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; + +public class ReplayInLoop implements Function, Publisher>> { + private static final Logger LOGGER = LoggerFactory.getLogger(ReplayInLoop.class); + private final Duration delayBeforeRestart; + + public ReplayInLoop(Duration delayBeforeRestart) { + this.delayBeforeRestart = delayBeforeRestart; + } + + @Override + public Publisher> apply(Flux source) { + Flux>> generate = Flux.create(sink -> { + while (!sink.isCancelled()) { + long requested = sink.requestedFromDownstream(); + if (requested > 0) { + wrapValues(source, sink); + } else { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.info("interrupted " + e); + } + } + } + } + ); + Flux>> limited = generate.limitRate(1); + return Flux.concat(limited); + } + + private void wrapValues(Flux source, FluxSink>> sink) { + AtomicBoolean firstValueSent = new AtomicBoolean(false); + Flux> nextFlux = source.delaySequence(delayBeforeRestart).map(wrapAsReplayValue(firstValueSent)).doOnNext(v -> System.out + .println("\t\t\taaaa "+ Instant.now()+" " +v)); + sink.next(Flux.defer(() -> nextFlux)); + } + + @NotNull + private Function> wrapAsReplayValue(AtomicBoolean firstValueSent) { + return v -> { + if (!firstValueSent.getAndSet(true)) { + return new ReplayValueImpl<>(true, v); + } + return new ReplayValueImpl<>(v); + }; + } +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java new file mode 100644 index 0000000..8d5ec43 --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayLoopFlux.java @@ -0,0 +1,25 @@ +package com.mgabriel.chronicle.flux.replay; + +import java.time.Duration; +import java.util.function.Function; + +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; + +public class ReplayLoopFlux extends Flux> { + + private final Flux source; + private final Function timestampExtractor; + private final Duration delayBeforeLoopRestart; + + public ReplayLoopFlux(Flux source, Function timestampExtractor, Duration delayBeforeLoopRestart) { + this.source = source; + this.timestampExtractor = timestampExtractor; + this.delayBeforeLoopRestart = delayBeforeLoopRestart; + } + + @Override + public void subscribe(CoreSubscriber> actual) { + + } +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java new file mode 100644 index 0000000..1c5de20 --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValue.java @@ -0,0 +1,11 @@ +package com.mgabriel.chronicle.flux.replay; + +import com.mgabriel.chronicle.flux.WrappedValue; + +public interface ReplayValue extends WrappedValue { + + /** + * @return true if this object is the loop reset signal (meaning that the replay loop has restarted from the beginning) + */ + boolean isLoopReset(); +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java new file mode 100644 index 0000000..791ad77 --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayValueImpl.java @@ -0,0 +1,52 @@ +package com.mgabriel.chronicle.flux.replay; + +import java.util.Objects; + +public class ReplayValueImpl implements ReplayValue{ + private final boolean isLoopReset; + private final T value; + + public ReplayValueImpl(T value) { + this.isLoopReset = false; + this.value = value; + } + + public ReplayValueImpl(boolean isLoopReset, T value) { + this.isLoopReset = isLoopReset; + this.value = value; + } + + @Override + public boolean isLoopReset() { + return isLoopReset; + } + + @Override + public T value() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + ReplayValueImpl that = (ReplayValueImpl) o; + return isLoopReset == that.isLoopReset && + Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(isLoopReset, value); + } + + @Override + public String toString() { + return "ReplayValueImpl{" + + "isLoopReset=" + isLoopReset + + ", value=" + value + + '}'; + } +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java new file mode 100644 index 0000000..385dc3a --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTiming.java @@ -0,0 +1,65 @@ +package com.mgabriel.chronicle.flux.replay; + +import static java.time.Duration.ofMillis; + +import java.util.function.Function; +import java.util.function.Predicate; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +public class ReplayWithOriginalTiming implements Function, Publisher> { + private final Function timestampExtractor; + private final Timed TOKEN = new TimedValue<>(0, null); + + public ReplayWithOriginalTiming(Function timestampExtractor) { + this.timestampExtractor = timestampExtractor; + } + + @Override + public Publisher apply(Flux source) { + Flux> timedFlux = source.map(v -> new TimedValue<>(timestampExtractor.apply(v), v)); + return timedFlux.scan(new TimedValuePair<>(TOKEN, TOKEN), + (acc, val) -> new TimedValuePair<>(acc.second, val)) + .filter(filterFirstValue()) + .map(calculateDelay()) + .delayUntil(applyDelay()) + .map(ValueToDelay::value); + } + + private Predicate> filterFirstValue() { + return tvp -> tvp.second != TOKEN; + } + + private Function, ValueToDelay> calculateDelay() { + return tvp -> { + long timeDifference = tvp.timeDifference(); + if (timeDifference < 0 || tvp.first == TOKEN) { + timeDifference = 0; + } + return new ValueToDelay<>(timeDifference, tvp.second.value()); + }; + } + + private Function, Publisher> applyDelay() { + return vtd -> { + return Flux.just(TOKEN).delayElements(ofMillis(vtd.delay()));}; + } + + private static class TimedValuePair { + private final Timed first; + private final Timed second; + + private TimedValuePair(Timed first, Timed second) { + if (first == null || second == null) { + throw new IllegalArgumentException("values should not be null"); + } + this.first = first; + this.second = second; + } + + long timeDifference() { + return second.time() - first.time(); + } + } +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java b/src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java new file mode 100644 index 0000000..527bf84 --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/Timed.java @@ -0,0 +1,7 @@ +package com.mgabriel.chronicle.flux.replay; + +import com.mgabriel.chronicle.flux.WrappedValue; + +public interface Timed extends WrappedValue { + long time(); +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java new file mode 100644 index 0000000..2eb0b8b --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedReplayValue.java @@ -0,0 +1,4 @@ +package com.mgabriel.chronicle.flux.replay; + +public interface TimedReplayValue extends Timed, ReplayValue{ +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java new file mode 100644 index 0000000..226988a --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/TimedValue.java @@ -0,0 +1,44 @@ +package com.mgabriel.chronicle.flux.replay; + +import java.util.Objects; + +public class TimedValue implements Timed { + private final long time; + private final T value; + + public TimedValue(long time, T value) { + this.time = time; + this.value = value; + } + + @Override + public T value() { + return value; + } + + @Override + public long time() { + return time; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TimedValue that = (TimedValue) o; + return time == that.time && + Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(time, value); + } + + @Override + public String toString() { + return super.toString(); + } +} diff --git a/src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java b/src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java new file mode 100644 index 0000000..18ab8c4 --- /dev/null +++ b/src/main/java/com/mgabriel/chronicle/flux/replay/ValueToDelay.java @@ -0,0 +1,22 @@ +package com.mgabriel.chronicle.flux.replay; + +import com.mgabriel.chronicle.flux.WrappedValue; + +class ValueToDelay implements WrappedValue { + private final long delay; + private final T value; + + ValueToDelay(long delay, T value) { + this.delay = delay; + this.value = value; + } + + @Override + public T value() { + return value; + } + + public long delay() { + return delay; + } +} diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayLoopDemo.java b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayLoopDemo.java new file mode 100644 index 0000000..33c7016 --- /dev/null +++ b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayLoopDemo.java @@ -0,0 +1,20 @@ +package com.mgabriel.chronicle.flux.replay; + +import java.time.Duration; +import java.time.Instant; + +import reactor.core.publisher.Flux; + +public class ReplayLoopDemo { + + public static void main(String[] args) { + + Flux just = Flux.just(0L, 1L, 2L, 3L, 4L, 5L); + + Flux> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(2))); + + result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast(); + + } + +} diff --git a/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java new file mode 100644 index 0000000..bc31551 --- /dev/null +++ b/src/test/java/com/mgabriel/chronicle/flux/replay/ReplayWithOriginalTimingDemo.java @@ -0,0 +1,21 @@ +package com.mgabriel.chronicle.flux.replay; + +import java.time.Duration; +import java.time.Instant; + +import reactor.core.publisher.Flux; + +public class ReplayWithOriginalTimingDemo { + + public static void main(String[] args) { + + Flux just = Flux.just(0L, 1000L, 2000L, 3000L, 4000L, 7000L); + +// Flux> result = just.compose(new ReplayWithOriginalTiming<>(l -> l)).compose(new ReplayInLoop<>(Duration.ofSeconds(1))); + Flux> result = just.compose(new ReplayInLoop<>(Duration.ofSeconds(1))).compose(new ReplayWithOriginalTiming<>(l -> l.value())); + + result.doOnNext(i -> System.out.println(Instant.now() + " " + i)).blockLast(); + + } + +}