package it.tdlight.reactiveapi; import it.cavallium.filequeue.IQueueToConsumer; import it.cavallium.filequeue.LMDBQueueToConsumer; import it.tdlight.reactiveapi.rsocket.FileQueueUtils; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.LongConsumer; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; import reactor.core.scheduler.Schedulers; import reactor.util.context.Context; public class ReactorUtils { @SuppressWarnings("rawtypes") private static final WaitingSink WAITING_SINK = new WaitingSink<>(); public static Flux subscribeOnce(Flux f) { AtomicBoolean subscribed = new AtomicBoolean(); return f.doOnSubscribe(s -> { if (!subscribed.compareAndSet(false, true)) { throw new UnsupportedOperationException("Can't subscribe more than once!"); } }); } public static Flux subscribeOnceUntilUnsubscribe(Flux f) { AtomicBoolean subscribed = new AtomicBoolean(); return f.doOnSubscribe(s -> { if (!subscribed.compareAndSet(false, true)) { throw new UnsupportedOperationException("Can't subscribe more than once!"); } }).doFinally(s -> subscribed.set(false)); } public static Mono subscribeOnce(Mono f) { AtomicBoolean subscribed = new AtomicBoolean(); return f.doOnSubscribe(s -> { if (!subscribed.compareAndSet(false, true)) { throw new UnsupportedOperationException("Can't subscribe more than once!"); } }); } public static Mono subscribeOnceUntilUnsubscribe(Mono f) { AtomicBoolean subscribed = new AtomicBoolean(); return f.doOnSubscribe(s -> { if (!subscribed.compareAndSet(false, true)) { throw new UnsupportedOperationException("Can't subscribe more than once!"); } }).doFinally(s -> subscribed.set(false)); } public static Flux createLastestSubscriptionFlux(Flux upstream, int maxBufferSize) { return upstream.transform(parent -> { AtomicReference subscriptionAtomicReference = new AtomicReference<>(); AtomicReference> prevEmitterRef = new AtomicReference<>(); Deque> queue = new ArrayDeque<>(maxBufferSize); return Flux.create(emitter -> { var prevEmitter = prevEmitterRef.getAndSet(emitter); if (prevEmitter != null) { if (prevEmitter != WAITING_SINK) { prevEmitter.error(new CancellationException()); } synchronized (queue) { Signal next; while (!emitter.isCancelled() && (next = queue.peek()) != null) { if (next.isOnNext()) { queue.poll(); var nextVal = next.get(); assert nextVal != null; emitter.next(nextVal); } else if (next.isOnError()) { var throwable = next.getThrowable(); assert throwable != null; emitter.error(throwable); break; } else if (next.isOnComplete()) { emitter.complete(); break; } else { throw new UnsupportedOperationException(); } } } } else { parent.subscribe(new CoreSubscriber<>() { @Override public void onSubscribe(@NotNull Subscription s) { subscriptionAtomicReference.set(s); } @Override public void onNext(K payload) { FluxSink prevEmitter = prevEmitterRef.get(); if (prevEmitter != WAITING_SINK) { prevEmitter.next(payload); } else { synchronized (queue) { queue.add(Signal.next(payload)); } } } @Override public void onError(Throwable throwable) { FluxSink prevEmitter = prevEmitterRef.get(); synchronized (queue) { queue.add(Signal.error(throwable)); } if (prevEmitter != WAITING_SINK) { prevEmitter.error(throwable); } } @Override public void onComplete() { FluxSink prevEmitter = prevEmitterRef.get(); synchronized (queue) { queue.add(Signal.complete()); } if (prevEmitter != WAITING_SINK) { prevEmitter.complete(); } } }); } var s = subscriptionAtomicReference.get(); emitter.onRequest(n -> { if (n > maxBufferSize) { emitter.error(new UnsupportedOperationException( "Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize)); } else { s.request(n); } }); //noinspection unchecked emitter.onCancel(() -> prevEmitterRef.compareAndSet(emitter, WAITING_SINK)); //noinspection unchecked emitter.onDispose(() -> prevEmitterRef.compareAndSet(emitter, WAITING_SINK)); }, OverflowStrategy.BUFFER); }); } public static Function, Flux> onBackpressureBufferSubscribe(Path path, String name, boolean persistent, Serializer serializer, Deserializer deserializer) { return flux -> { AtomicReference> ref = new AtomicReference<>(); var queuePath = path.resolve(".tdlib-queue"); IQueueToConsumer queue = new LMDBQueueToConsumer<>(queuePath, name, !persistent, FileQueueUtils.convert(serializer), FileQueueUtils.convert(deserializer), signal -> { var sink = ref.get(); if (sink != null && !sink.isCancelled() && sink.requestedFromDownstream() > 0) { if (signal != null) { sink.next(signal); } return true; } else { return false; } } ); AtomicReference startEx = new AtomicReference<>(); var disposable = flux .subscribeOn(Schedulers.parallel()) .publishOn(Schedulers.boundedElastic()) .subscribe(queue::add, ex -> { startEx.set(ex); var refVal = ref.get(); if (refVal != null) { refVal.error(ex); } }); queue.startQueue(); return Flux.create(sink -> { sink.onDispose(() -> { disposable.dispose(); queue.close(); }); var startExVal = startEx.get(); if (startExVal != null) { sink.error(startExVal); return; } ref.set(sink); sink.onCancel(() -> ref.set(null)); }); }; } public static Function, Flux> onBackpressureBuffer(Path path, String name, boolean persistent, Serializer serializer, Deserializer deserializer) { return flux -> Flux.create(sink -> { var queuePath = path.resolve(".tdlib-queue"); var queue = new LMDBQueueToConsumer<>(queuePath, name, !persistent, FileQueueUtils.convert(serializer), FileQueueUtils.convert(deserializer), signal -> { if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) { if (signal != null) { sink.next(signal); } return true; } else { return false; } } ); sink.onDispose(queue::close); flux .subscribeOn(Schedulers.parallel()) .publishOn(Schedulers.boundedElastic()) .subscribe(new CoreSubscriber<>() { @Override public void onSubscribe(@NotNull Subscription s) { sink.onCancel(s::cancel); s.request(Long.MAX_VALUE); } @Override public void onNext(T element) { if (!sink.isCancelled()) { queue.add(element); } } @Override public void onError(Throwable throwable) { sink.error(throwable); } @Override public void onComplete() { } }); queue.startQueue(); }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()); } private static class WaitingSink implements FluxSink { @Override public @NotNull FluxSink next(@NotNull T t) { throw new UnsupportedOperationException(); } @Override public void complete() { throw new UnsupportedOperationException(); } @Override public void error(@NotNull Throwable e) { throw new UnsupportedOperationException(); } @Override public @NotNull Context currentContext() { throw new UnsupportedOperationException(); } @Override public long requestedFromDownstream() { throw new UnsupportedOperationException(); } @Override public boolean isCancelled() { throw new UnsupportedOperationException(); } @Override public @NotNull FluxSink onRequest(@NotNull LongConsumer consumer) { throw new UnsupportedOperationException(); } @Override public @NotNull FluxSink onCancel(@NotNull Disposable d) { throw new UnsupportedOperationException(); } @Override public @NotNull FluxSink onDispose(@NotNull Disposable d) { throw new UnsupportedOperationException(); } } }