package it.tdlight.utils; import io.reactivex.Completable; import io.reactivex.Flowable; import io.reactivex.Maybe; import io.reactivex.Observable; import io.reactivex.Single; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; import io.vertx.reactivex.core.eventbus.Message; import io.vertx.reactivex.core.eventbus.MessageConsumer; import io.vertx.reactivex.core.streams.Pipe; import io.vertx.reactivex.core.streams.ReadStream; import io.vertx.reactivex.core.streams.WriteStream; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Object; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import java.time.Duration; import java.util.Objects; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.reactivestreams.Subscription; import org.warp.commonutils.concurrency.future.CompletableFutureUtils; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmissionException; import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.One; import reactor.core.publisher.SynchronousSink; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; import reactor.util.context.Context; public class MonoUtils { private static final Logger logger = LoggerFactory.getLogger(MonoUtils.class); public static Mono notImplemented() { return Mono.fromCallable(() -> { throw new UnsupportedOperationException("Method not implemented"); }); } public static Handler> toHandler(SynchronousSink sink) { return event -> { if (event.succeeded()) { if (event.result() == null) { sink.complete(); } else { sink.next(Objects.requireNonNull(event.result())); } } else { sink.error(event.cause()); } }; } public static Handler> toHandler(MonoSink sink) { return event -> { if (event.succeeded()) { if (event.result() == null) { sink.success(); } else { sink.success(Objects.requireNonNull(event.result())); } } else { sink.error(event.cause()); } }; } public static SynchronousSink toSink(Context context, Promise promise) { return PromiseSink.of(context, promise); } public static Mono executeAsFuture(Consumer>> action) { return Mono.fromFuture(() -> { return CompletableFutureUtils.getCompletableFuture(() -> { var resultFuture = new CompletableFuture(); action.accept(handler -> { if (handler.failed()) { resultFuture.completeExceptionally(handler.cause()); } else { resultFuture.complete(handler.result()); } }); return resultFuture; }); }); } public static Mono fromBlockingMaybe(Callable callable) { return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()); } public static Mono fromBlockingEmpty(EmptyCallable callable) { return Mono.fromCallable(() -> { callable.call(); return null; }).subscribeOn(Schedulers.boundedElastic()); } public static Mono fromBlockingSingle(Callable callable) { return fromBlockingMaybe(callable).single(); } public static CoreSubscriber toSubscriber(Promise promise) { return new CoreSubscriber() { @Override public void onSubscribe(Subscription s) { s.request(1); } @Override public void onNext(T t) { promise.complete(t); } @Override public void onError(Throwable t) { promise.fail(t); } @Override public void onComplete() { promise.tryComplete(); } }; } public static void orElseThrowFuture(TdResult value, SynchronousSink> sink) { if (value.succeeded()) { sink.next(CompletableFuture.completedFuture(value.result())); } else { sink.next(CompletableFuture.failedFuture(new TdError(value.cause().code, value.cause().message))); } } public static Mono orElseThrow(TdResult value) { if (value.succeeded()) { return Mono.just(value.result()); } else { return Mono.error(new TdError(value.cause().code, value.cause().message)); } } public static Mono thenOrError(Mono> optionalMono) { return optionalMono.handle((optional, sink) -> { if (optional.succeeded()) { sink.complete(); } else { sink.error(new TdError(optional.cause().code, optional.cause().message)); } }); } public static Mono thenOrLogSkipError(Mono> optionalMono) { return optionalMono.handle((optional, sink) -> { if (optional.failed()) { logger.error("Received TDLib error: {}", optional.cause()); } sink.complete(); }); } public static Mono orElseLogSkipError(TdResult optional) { if (optional.failed()) { logger.error("Received TDLib error: {}", optional.cause()); return Mono.empty(); } return Mono.just(optional.result()); } public static Mono thenOrLogRepeatError(Supplier>> optionalMono) { return Mono.defer(() -> optionalMono.get().handle((TdResult optional, SynchronousSink sink) -> { if (optional.succeeded()) { sink.complete(); } else { logger.error("Received TDLib error: {}", optional.cause()); sink.error(new TdError(optional.cause().code, optional.cause().message)); } })).retry(); } public static Mono fromFuture(CompletableFuture future) { return Mono.create(sink -> { future.whenComplete((result, error) -> { if (error != null) { sink.error(error); } else if (result != null) { sink.success(result); } else { sink.success(); } }); }); } public static Mono fromFuture(Supplier> future) { return Mono.create(sink -> { CompletableFutureUtils.getCompletableFuture(future).whenComplete((result, error) -> { if (error != null) { sink.error(error.getCause()); } else if (result != null) { sink.success(result); } else { sink.success(); } }); }); } public static CompletableFuture toFuture(Mono mono) { var cf = new CompletableFuture(); mono.subscribe(cf::complete, cf::completeExceptionally, () -> cf.complete(null)); return cf; } public static Mono toMono(Future future) { return Mono.create(sink -> future.onComplete(result -> { if (result.succeeded()) { sink.success(result.result()); } else { sink.error(result.cause()); } })); } @NotNull public static Mono toMono(Single single) { return Mono.from(single.toFlowable()); } @NotNull public static Mono toMono(Maybe single) { return Mono.from(single.toFlowable()); } @NotNull public static Mono toMono(Completable completable) { return Mono.from(completable.toFlowable()); } public static Completable toCompletable(Mono s) { return Completable.fromPublisher(s); } public static Mono fromEmitResult(EmitResult emitResult) { return Mono.fromCallable(() -> { emitResult.orThrow(); return null; }); } public static Future fromEmitResultFuture(EmitResult emitResult) { if (emitResult.isSuccess()) { return Future.succeededFuture(); } else { return Future.failedFuture(new EmissionException(emitResult)); } } public static Mono emitValue(One sink, T value) { return Mono.defer(() -> fromEmitResult(sink.tryEmitValue(value))); } public static Mono emitNext(Many sink, T value) { return Mono.defer(() -> fromEmitResult(sink.tryEmitNext(value))); } public static Mono emitComplete(Many sink) { return Mono.defer(() -> fromEmitResult(sink.tryEmitComplete())); } public static Mono emitEmpty(Empty sink) { return Mono.defer(() -> fromEmitResult(sink.tryEmitEmpty())); } public static Mono emitError(Empty sink, Throwable value) { return Mono.defer(() -> fromEmitResult(sink.tryEmitError(value))); } public static Future emitValueFuture(One sink, T value) { return fromEmitResultFuture(sink.tryEmitValue(value)); } public static Future emitNextFuture(Many sink, T value) { return fromEmitResultFuture(sink.tryEmitNext(value)); } public static Future emitCompleteFuture(Many sink) { return fromEmitResultFuture(sink.tryEmitComplete()); } public static Future emitErrorFuture(Empty sink, Throwable value) { return fromEmitResultFuture(sink.tryEmitError(value)); } public static Future emitEmptyFuture(Empty sink) { return fromEmitResultFuture(sink.tryEmitEmpty()); } public static Mono> unicastBackpressureSinkStream(Scheduler scheduler) { Many sink = Sinks.many().unicast().onBackpressureBuffer(); return asStream(sink, scheduler, null, null, 1); } /** * Create a sink that can be written from a writeStream */ public static Mono> unicastBackpressureStream(Scheduler scheduler, int maxBackpressureQueueSize) { Queue boundedQueue = Queues.get(maxBackpressureQueueSize).get(); var queueSize = Flux .interval(Duration.ZERO, Duration.ofMillis(500)) .map(n -> boundedQueue.size()); Empty termination = Sinks.empty(); Many sink = Sinks.many().unicast().onBackpressureBuffer(boundedQueue, termination::tryEmitEmpty); return asStream(sink, scheduler, queueSize, termination, maxBackpressureQueueSize); } public static Mono> unicastBackpressureErrorStream(Scheduler scheduler) { Many sink = Sinks.many().unicast().onBackpressureError(); return asStream(sink, scheduler, null, null, 1); } public static Mono> asStream(Many sink, Scheduler scheduler, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { return SinkRWStream.create(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize); } private static Future toVertxFuture(Mono toTransform) { var promise = Promise.promise(); toTransform.subscribeOn(Schedulers.parallel()).subscribe(next -> {}, promise::fail, promise::complete); return promise.future(); } @SuppressWarnings({"unchecked", "rawtypes"}) public static Mono castVoid(Mono mono) { return (Mono) mono; } /** * This method fails to guarantee that the consumer gets registered on all clusters before returning. * Use fromConsumerAdvanced if you want better stability. */ @Deprecated public static Flux fromConsumerUnsafe(MessageConsumer messageConsumer) { return Flux.>create(sink -> { messageConsumer.endHandler(e -> sink.complete()); sink.onDispose(messageConsumer::unregister); }) //.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))) .flatMapSequential(msg -> Mono .fromCallable(msg::body) .subscribeOn(Schedulers.parallel()) ); } public static Flux fromMessageConsumer(Mono onRegistered, MessageConsumer messageConsumer) { return fromReplyableMessageConsumer(onRegistered, messageConsumer).map(Message::body); } public static Flux> fromReplyableMessageConsumer(Mono onRegistered, MessageConsumer messageConsumer) { Mono endMono = Mono.create(sink -> { AtomicBoolean alreadyRequested = new AtomicBoolean(); sink.onRequest(n -> { if (n > 0 && alreadyRequested.compareAndSet(false, true)) { messageConsumer.endHandler(e -> sink.success()); } }); }); Mono> registrationCompletionMono = Mono .fromRunnable(() -> logger.trace("Waiting for consumer registration completion...")) .then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)) .doOnSuccess(s -> logger.trace("Consumer registered")) .then(onRegistered) .thenReturn(messageConsumer); messageConsumer.handler(s -> { throw new IllegalStateException("Subscriber still didn't request any value!"); }); Flux> dataFlux = Flux .push(sink -> sink.onRequest(n -> messageConsumer.handler(sink::next)), OverflowStrategy.ERROR); Mono disposeMono = messageConsumer .rxUnregister() .as(MonoUtils::>toMono) .doOnSuccess(s -> logger.trace("Unregistered message consumer")) .then(); return Flux .usingWhen(registrationCompletionMono, msgCons -> dataFlux, msgCons -> disposeMono) .takeUntilOther(endMono); } public static Scheduler newBoundedSingle(String name) { return newBoundedSingle(name, false); } public static Scheduler newBoundedSingle(String name, boolean daemon) { return Schedulers.newBoundedElastic(1, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, name, Integer.MAX_VALUE, daemon ); } public static class SinkRWStream implements io.vertx.core.streams.WriteStream, io.vertx.core.streams.ReadStream { private final Many sink; private final Scheduler scheduler; private final Flux backpressureSize; private final Empty termination; private Handler exceptionHandler = e -> {}; private Handler drainHandler = h -> {}; private final int maxBackpressureQueueSize; private volatile int writeQueueMaxSize; private volatile boolean writeQueueFull = false; private SinkRWStream(Many sink, Scheduler scheduler, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { this.maxBackpressureQueueSize = maxBackpressureQueueSize; this.writeQueueMaxSize = this.maxBackpressureQueueSize; this.backpressureSize = backpressureSize; this.termination = termination; this.sink = sink; this.scheduler = scheduler; } public Mono> initialize() { return Mono.fromCallable(() -> { if (backpressureSize != null) { AtomicBoolean drained = new AtomicBoolean(true); var drainSubscription = backpressureSize .subscribeOn(Schedulers.boundedElastic()) .subscribe(size -> { writeQueueFull = size >= this.writeQueueMaxSize; boolean newDrained = size <= this.writeQueueMaxSize / 2; boolean oldDrained = drained.getAndSet(newDrained); if (newDrained && !oldDrained) { drainHandler.handle(null); } }, ex -> { exceptionHandler.handle(ex); }, () -> { if (!drained.get()) { drainHandler.handle(null); } }); if (termination != null) { termination .asMono() .doOnTerminate(drainSubscription::dispose) .subscribeOn(Schedulers.boundedElastic()) .subscribe(); } } return this; }).subscribeOn(Schedulers.boundedElastic()); } public static Mono> create(Many sink, Scheduler scheduler, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { return new SinkRWStream(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize).initialize(); } public Flux readAsFlux() { return sink.asFlux(); } public ReactiveReactorReadStream readAsStream() { return new ReactiveReactorReadStream<>(this); } public Many writeAsSink() { return sink; } public ReactiveSinkWriteStream writeAsStream() { return new ReactiveSinkWriteStream<>(this); } @Override public SinkRWStream exceptionHandler(Handler handler) { exceptionHandler = handler; return this; } // // Read stream section // private Handler readEndHandler = v -> {}; private Subscription readCoreSubscription; private final AtomicBoolean fetchMode = new AtomicBoolean(false); @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { sink.asFlux().subscribeOn(scheduler).subscribe(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) { readCoreSubscription = s; if (!fetchMode.get()) { readCoreSubscription.request(1); } } @Override public void onNext(T t) { handler.handle(t); if (!fetchMode.get()) { readCoreSubscription.request(1); } } @Override public void onError(Throwable t) { exceptionHandler.handle(t); } @Override public void onComplete() { readEndHandler.handle(null); } }); return this; } @Override public io.vertx.core.streams.ReadStream pause() { fetchMode.set(true); return this; } @Override public io.vertx.core.streams.ReadStream resume() { if (fetchMode.compareAndSet(true, false)) { readCoreSubscription.request(1); } return this; } @Override public io.vertx.core.streams.ReadStream fetch(long amount) { if (fetchMode.get()) { if (amount > 0) { readCoreSubscription.request(amount); } } return this; } @Override public io.vertx.core.streams.ReadStream endHandler(@io.vertx.codegen.annotations.Nullable Handler endHandler) { this.readEndHandler = endHandler; return this; } // // Write stream section // @Override public Future write(T data) { return MonoUtils.emitNextFuture(sink, data); } @Override public void write(T data, Handler> handler) { write(data).onComplete(handler); } @Override public void end(Handler> handler) { MonoUtils.emitCompleteFuture(sink).onComplete(handler); } @Override public io.vertx.core.streams.WriteStream setWriteQueueMaxSize(int maxSize) { if (maxSize <= maxBackpressureQueueSize) { this.writeQueueMaxSize = maxSize; } else { logger.error("Failed to set writeQueueMaxSize to " + maxSize + ", because it's bigger than the max backpressure queue size " + maxBackpressureQueueSize); } return this; } @Override public boolean writeQueueFull() { return writeQueueFull; } @Override public io.vertx.core.streams.WriteStream drainHandler(@Nullable Handler handler) { this.drainHandler = handler; return this; } } public static class FluxReadStream implements io.vertx.core.streams.ReadStream { private final Flux flux; private final Scheduler scheduler; private Handler exceptionHandler = e -> {}; public FluxReadStream(Flux flux, Scheduler scheduler) { this.flux = flux; this.scheduler = scheduler; } public Flux readAsFlux() { return flux; } public ReactiveReactorReadStream readAsStream() { return new ReactiveReactorReadStream<>(this); } @Override public FluxReadStream exceptionHandler(Handler handler) { exceptionHandler = handler; return this; } // // Read stream section // private Handler readEndHandler = v -> {}; private Subscription readCoreSubscription; private final AtomicBoolean fetchMode = new AtomicBoolean(false); @SuppressWarnings("DuplicatedCode") @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { flux.subscribeOn(scheduler).subscribe(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) { readCoreSubscription = s; if (!fetchMode.get()) { readCoreSubscription.request(1); } } @Override public void onNext(T t) { handler.handle(t); if (!fetchMode.get()) { readCoreSubscription.request(1); } } @Override public void onError(Throwable t) { exceptionHandler.handle(t); } @Override public void onComplete() { readEndHandler.handle(null); } }); return this; } @Override public io.vertx.core.streams.ReadStream pause() { fetchMode.set(true); return this; } @Override public io.vertx.core.streams.ReadStream resume() { if (fetchMode.compareAndSet(true, false)) { readCoreSubscription.request(1); } return this; } @Override public io.vertx.core.streams.ReadStream fetch(long amount) { if (fetchMode.get()) { if (amount > 0) { readCoreSubscription.request(amount); } } return this; } @Override public io.vertx.core.streams.ReadStream endHandler(@io.vertx.codegen.annotations.Nullable Handler endHandler) { this.readEndHandler = endHandler; return this; } } public static class ReactiveSinkWriteStream implements WriteStream { private final WriteStream ws; public ReactiveSinkWriteStream(SinkRWStream ws) { this.ws = WriteStream.newInstance(ws); } public io.vertx.core.streams.WriteStream getDelegate() { //noinspection unchecked return ws.getDelegate(); } @Override public WriteStream exceptionHandler(Handler handler) { return ws.exceptionHandler(handler); } @Override public void write(T data, Handler> handler) { ws.write(data, handler); } @Override public void write(T data) { ws.write(data); } @Override public Completable rxWrite(T data) { return ws.rxWrite(data); } @Override public void end(Handler> handler) { ws.end(handler); } @Override public void end() { ws.end(); } @Override public Completable rxEnd() { return ws.rxEnd(); } @Override public void end(T data, Handler> handler) { ws.end(data, handler); } @Override public void end(T data) { ws.end(data); } @Override public Completable rxEnd(T data) { return ws.rxEnd(data); } @Override public WriteStream setWriteQueueMaxSize(int maxSize) { return ws.setWriteQueueMaxSize(maxSize); } @Override public boolean writeQueueFull() { return ws.writeQueueFull(); } @Override public WriteStream drainHandler(Handler handler) { return ws.drainHandler(handler); } } public static class ReactiveReactorReadStream implements ReadStream { private final ReadStream rs; public ReactiveReactorReadStream(SinkRWStream rws) { this.rs = ReadStream.newInstance(rws); } public ReactiveReactorReadStream(FluxReadStream rs) { this.rs = ReadStream.newInstance(rs); } public ReactiveReactorReadStream(Flux s, Scheduler scheduler) { this.rs = ReadStream.newInstance(new FluxReadStream<>(s, scheduler)); } @Override public io.vertx.core.streams.ReadStream getDelegate() { //noinspection unchecked return rs.getDelegate(); } @Override public ReadStream exceptionHandler(Handler handler) { return rs.exceptionHandler(handler); } @Override public ReadStream handler(Handler handler) { return rs.handler(handler); } @Override public ReadStream pause() { return rs.pause(); } @Override public ReadStream resume() { return rs.resume(); } @Override public ReadStream fetch(long amount) { return rs.fetch(amount); } @Override public ReadStream endHandler(Handler endHandler) { return rs.endHandler(endHandler); } @Override public Pipe pipe() { return rs.pipe(); } @Override public void pipeTo(WriteStream dst, Handler> handler) { rs.pipeTo(dst, handler); } @Override public void pipeTo(WriteStream dst) { rs.pipeTo(dst); } @Override public Completable rxPipeTo(WriteStream dst) { return rs.rxPipeTo(dst); } @Override public Observable toObservable() { return rs.toObservable(); } @Override public Flowable toFlowable() { return rs.toFlowable(); } } }