Add extra utils

This commit is contained in:
Andrea Cavalli 2021-06-06 02:24:46 +02:00
parent d832f1d55c
commit ef321f4be1
1 changed files with 12 additions and 0 deletions

View File

@ -15,6 +15,7 @@ import io.vertx.reactivex.core.streams.Pipe;
import io.vertx.reactivex.core.streams.ReadStream;
import io.vertx.reactivex.core.streams.WriteStream;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Chat;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
@ -29,6 +30,7 @@ import java.util.function.Consumer;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.warp.commonutils.concurrency.future.CompletableFutureUtils;
import org.warp.commonutils.log.Logger;
@ -295,6 +297,10 @@ public class MonoUtils {
return Mono.defer(() -> fromEmitResult(sink.tryEmitEmpty()));
}
public static <T> Mono<Void> emitEmpty(One<T> sink) {
return Mono.defer(() -> fromEmitResult(sink.tryEmitEmpty()));
}
public static <T> Mono<Void> emitError(Empty<T> sink, Throwable value) {
return Mono.defer(() -> fromEmitResult(sink.tryEmitError(value)));
}
@ -435,6 +441,12 @@ public class MonoUtils {
return mono.map(Optional::of).defaultIfEmpty(Optional.empty());
}
public static <T> Mono<Boolean> isSet(Mono<T> mono) {
return mono
.map(res -> true)
.defaultIfEmpty(false);
}
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {
private final Many<T> sink;