diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index 7875dd6..86cc4fd 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -20,6 +20,7 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; @@ -44,10 +45,6 @@ public class TDLibRemoteClient implements AutoCloseable { private final int port; private final Set membersAddresses; private final One clusterManager = Sinks.one(); - /** - * Statistic about active deployments count - */ - private final AtomicInteger statsActiveDeployments = new AtomicInteger(); public static boolean runningFromIntelliJ() { return System.getProperty("java.class.path").contains("idea_rt.jar") @@ -68,6 +65,7 @@ public class TDLibRemoteClient implements AutoCloseable { this.membersAddresses = membersAddresses; if (enableAsyncStacktraces && !runningFromIntelliJ()) { + //noinspection ReactorAutomaticDebugger ReactorDebugAgent.init(); } if (enableAsyncStacktraces && enableFullAsyncStacktraces) { @@ -106,7 +104,10 @@ public class TDLibRemoteClient implements AutoCloseable { boolean enableFullAsyncStacktraces = Boolean.parseBoolean(args[8]); var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); - loggerContext.setConfigLocation(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml").toURI()); + loggerContext.setConfigLocation(Objects + .requireNonNull(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml"), + "tdlib-session-container-log4j2.xml doesn't exist") + .toURI()); var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath); @@ -124,7 +125,7 @@ public class TDLibRemoteClient implements AutoCloseable { .block(); // Close vert.x on shutdown - var vertx = client.clusterManager.asMono().block().getVertx(); + var vertx = client.clusterManager.asMono().blockOptional().orElseThrow().getVertx(); Runtime.getRuntime().addShutdownHook(new Thread(() -> MonoUtils.toMono(vertx.rxClose()).blockOptional())); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 56dab26..504355c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -26,11 +26,14 @@ import it.tdlight.utils.MonoUtils; import java.net.ConnectException; import java.nio.file.Path; import java.time.Duration; +import java.util.concurrent.locks.LockSupport; +import org.warp.commonutils.locks.LockUtils; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitResult; import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; @@ -130,7 +133,14 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { this.local = local; this.logger = LoggerFactory.getLogger(this.botId + " " + botAlias); return MonoUtils - .emitValue(this.binlog, binlog) + .fromBlockingEmpty(() -> { + EmitResult result; + while ((result = this.binlog.tryEmitValue(binlog)) == EmitResult.FAIL_NON_SERIALIZED) { + // 10ms + LockSupport.parkNanos(10000000); + } + result.orThrow(); + }) .then(binlog.getLastModifiedTime()) .zipWith(binlog.readFully().map(Buffer::getDelegate)) .single() @@ -173,11 +183,15 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { }) .flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) - .takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> { - logger.trace("About to kill pinger because updates stream ended"); - }), this.crash.asMono().onErrorResume(ex -> Mono.empty()).doOnTerminate(() -> { - logger.trace("About to kill pinger because it has seen a crash signal"); - }))) + .takeUntilOther(Mono.firstWithSignal( + this.updatesStreamEnd + .asMono() + .doOnTerminate(() -> logger.trace("About to kill pinger because updates stream ended")), + this.crash + .asMono() + .onErrorResume(ex -> Mono.empty()) + .doOnTerminate(() -> logger.trace("About to kill pinger because it has seen a crash signal")) + )) .doOnNext(s -> logger.trace("PING")) .then() .onErrorResume(ex -> { @@ -185,7 +199,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono.empty(); }) .doOnNext(s -> logger.debug("END PING")) - .then(MonoUtils.emitEmpty(this.pingFail)) + .then(MonoUtils.fromBlockingEmpty(() -> { + while (this.pingFail.tryEmitEmpty() == EmitResult.FAIL_NON_SERIALIZED) { + // 10ms + LockSupport.parkNanos(10000000); + } + })) .subscribeOn(Schedulers.parallel()) .subscribe(); } @@ -207,7 +226,14 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { // Return when the registration of all the consumers has been done across the cluster return Mono .fromRunnable(() -> logger.trace("Emitting updates flux to sink")) - .then(MonoUtils.emitValue(updates, updateConsumer)) + .then(MonoUtils.fromBlockingEmpty(() -> { + EmitResult result; + while ((result = this.updates.tryEmitValue(updateConsumer)) == EmitResult.FAIL_NON_SERIALIZED) { + // 10ms + LockSupport.parkNanos(10000000); + } + result.orThrow(); + })) .doOnSuccess(s -> logger.trace("Emitted updates flux to sink")) .doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster")) .doOnSuccess(s -> logger.trace("Registered update consumer across the cluster")); @@ -250,7 +276,14 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { var ex = new ConnectException("Server did not respond to ping"); ex.setStackTrace(new StackTraceElement[0]); throw ex; - }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) + })).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> { + EmitResult result; + while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) { + // 10ms + LockSupport.parkNanos(10000000); + } + return result; + })) .takeUntilOther(Mono .firstWithSignal(crash.asMono(), authStateClosing.asMono()) .onErrorResume(e -> Mono.empty()) @@ -285,23 +318,21 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private Mono interceptUpdate(Object update) { logger.trace("Received update {}", update.getClass().getSimpleName()); - switch (update.getConstructor()) { - case TdApi.UpdateAuthorizationState.CONSTRUCTOR: - var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; - switch (updateAuthorizationState.authorizationState.getConstructor()) { - case TdApi.AuthorizationStateClosing.CONSTRUCTOR: - authStateClosing.tryEmitEmpty(); - break; - case TdApi.AuthorizationStateClosed.CONSTRUCTOR: - return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) - .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) - .flatMap(latestBinlogMsg -> Mono.fromCallable(latestBinlogMsg::body).subscribeOn(Schedulers.parallel())) - .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) - .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) - .doOnSuccess(s -> logger.info("Overwritten binlog from server")) - .thenReturn(update); - } - break; + if (update.getConstructor() == TdApi.UpdateAuthorizationState.CONSTRUCTOR) { + var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; + switch (updateAuthorizationState.authorizationState.getConstructor()) { + case TdApi.AuthorizationStateClosing.CONSTRUCTOR: + authStateClosing.tryEmitEmpty(); + break; + case TdApi.AuthorizationStateClosed.CONSTRUCTOR: + return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) + .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) + .flatMap(latestBinlogMsg -> Mono.fromCallable(latestBinlogMsg::body).subscribeOn(Schedulers.parallel())) + .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) + .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) + .doOnSuccess(s -> logger.info("Overwritten binlog from server")) + .thenReturn(update); + } } return Mono.just(update); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 7c76bd1..ea3f683 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -25,17 +25,16 @@ import it.tdlight.tdlibsession.td.middle.TdResultList; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultMessage; import it.tdlight.utils.BinlogUtils; -import it.tdlight.utils.BufferTimeOutPublisher; import it.tdlight.utils.MonoUtils; import java.net.ConnectException; import java.time.Duration; import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.One; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuples; @@ -51,18 +50,17 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private final TelegramClientFactory clientFactory; // Variables configured by the user at startup - private final One botId = Sinks.one(); - private final One botAddress = Sinks.one(); - private final One botAlias = Sinks.one(); - private final One local = Sinks.one(); + private final AtomicReference botId = new AtomicReference<>(); + private final AtomicReference botAddress = new AtomicReference<>(); + private final AtomicReference botAlias = new AtomicReference<>(); // Variables configured at startup - private final One td = Sinks.one(); - private final One> executeConsumer = Sinks.one(); - private final One> readBinlogConsumer = Sinks.one(); - private final One> readyToReceiveConsumer = Sinks.one(); - private final One> pingConsumer = Sinks.one(); - private final One> pipeFlux = Sinks.one(); + private final AtomicReference td = new AtomicReference<>(); + private final AtomicReference> executeConsumer = new AtomicReference<>(); + private final AtomicReference> readBinlogConsumer = new AtomicReference<>(); + private final AtomicReference> readyToReceiveConsumer = new AtomicReference<>(); + private final AtomicReference> pingConsumer = new AtomicReference<>(); + private final AtomicReference> pipeFlux = new AtomicReference<>(); public AsyncTdMiddleEventBusServer() { this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100); @@ -79,20 +77,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { if (botId == null || botId <= 0) { throw new IllegalArgumentException("botId is not set!"); } - if (this.botId.tryEmitValue(botId).isFailure()) { - throw new IllegalStateException("Failed to set botId"); - } + this.botId.set(botId); var botAddress = "bots.bot." + botId; - if (this.botAddress.tryEmitValue(botAddress).isFailure()) { - throw new IllegalStateException("Failed to set botAddress"); - } + this.botAddress.set(botAddress); var botAlias = config().getString("botAlias"); if (botAlias == null || botAlias.isEmpty()) { throw new IllegalArgumentException("botAlias is not set!"); } - if (this.botAlias.tryEmitValue(botAlias).isFailure()) { - throw new IllegalStateException("Failed to set botAlias"); - } + this.botAlias.set(botAlias); var local = config().getBoolean("local"); if (local == null) { throw new IllegalArgumentException("local is not set!"); @@ -103,9 +95,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } var td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias); - if (this.td.tryEmitValue(td).isFailure()) { - throw new IllegalStateException("Failed to set td instance"); - } + this.td.set(td); return new OnSuccessfulStartRequestInfo(td, botAddress, botAlias, botId, local); }) .flatMap(r -> onSuccessfulStartRequest(r.td, r.botAddress, r.botAlias, r.botId, r.local)) @@ -135,25 +125,18 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { boolean local) { return td .initialize() - .then(this.pipe(td, botAddress, botAlias, botId, local)) - .then(this.listen(td, botAddress, botAlias, botId, local)) - .doOnSuccess(s -> { - logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded"); - }) - .doOnError(ex -> { - logger.error("Deploy and start of bot \"" + botAlias + "\": ❌ Failed", ex); - }); + .then(this.pipe(td, botAddress, local)) + .then(this.listen(td, botAddress, botId, local)) + .doOnSuccess(s -> logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded")) + .doOnError(ex -> logger.error("Deploy and start of bot \"" + botAlias + "\": ❌ Failed", ex)); } - private Mono listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { + private Mono listen(AsyncTdDirectImpl td, String botAddress, int botId, boolean local) { return Mono.create(registrationSink -> { logger.trace("Preparing listeners"); MessageConsumer executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); - if (this.executeConsumer.tryEmitValue(executeConsumer).isFailure()) { - registrationSink.error(new IllegalStateException("Failed to set executeConsumer")); - return; - } + this.executeConsumer.set(executeConsumer); Flux .>create(sink -> { executeConsumer.handler(sink::next); @@ -170,9 +153,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .single() .timeout(Duration.ofSeconds(60 + 30)) .doOnSuccess(s -> logger.trace("Executed successfully. Request was {}", request)) - .onErrorResume(ex -> Mono.fromRunnable(() -> { - msg.fail(500, ex.getLocalizedMessage()); - })) + .onErrorResume(ex -> Mono.fromRunnable(() -> msg.fail(500, ex.getLocalizedMessage()))) .flatMap(response -> Mono.fromCallable(() -> { var replyOpts = new DeliveryOptions().setLocalOnly(local); var replyValue = new TdResultMessage(response.result(), response.cause()); @@ -197,10 +178,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { ); MessageConsumer readBinlogConsumer = vertx.eventBus().consumer(botAddress + ".read-binlog"); - if (this.readBinlogConsumer.tryEmitValue(readBinlogConsumer).isFailure()) { - registrationSink.error(new IllegalStateException("Failed to set readBinlogConsumer")); - return; - } + this.readBinlogConsumer.set(readBinlogConsumer); BinlogUtils .readBinlogConsumer(vertx, readBinlogConsumer, botId, local) .subscribeOn(Schedulers.parallel()) @@ -208,13 +186,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { MessageConsumer readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive"); - if (this.readyToReceiveConsumer.tryEmitValue(readyToReceiveConsumer).isFailure()) { - registrationSink.error(new IllegalStateException("Failed to set readyToReceiveConsumer")); - return; - } + this.readyToReceiveConsumer.set(readyToReceiveConsumer); // Pipe the data - var pipeSubscription = Flux + Flux .>create(sink -> { readyToReceiveConsumer.handler(sink::next); readyToReceiveConsumer.endHandler(h -> sink.complete()); @@ -222,10 +197,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .take(1, true) .single() .doOnNext(s -> logger.trace("Received ready-to-receive request from client")) - .flatMap(msg -> this.pipeFlux - .asMono() - .timeout(Duration.ofSeconds(5)) - .map(pipeFlux -> Tuples.of(msg, pipeFlux))) + .map(msg -> Tuples.of(msg, Objects.requireNonNull(pipeFlux.get(), "PipeFlux is empty"))) .doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex)) .doOnNext(s -> logger.trace("Replying to ready-to-receive request")) .flatMapMany(tuple -> { @@ -238,9 +210,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { logger.trace("Start piping data"); // Start piping the data - return tuple.getT2().doOnSubscribe(s -> { - logger.trace("Subscribed to updates pipe"); - }); + return tuple.getT2().doOnSubscribe(s -> logger.trace("Subscribed to updates pipe")); }) .then() .doOnSuccess(s -> logger.trace("Finished handling ready-to-receive requests (updates pipe ended)")) @@ -249,10 +219,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .subscribe(v -> {}); MessageConsumer pingConsumer = vertx.eventBus().consumer(botAddress + ".ping"); - if (this.pingConsumer.tryEmitValue(pingConsumer).isFailure()) { - registrationSink.error(new IllegalStateException("Failed to set pingConsumer")); - return; - } + this.pingConsumer.set(pingConsumer); Flux .>create(sink -> { pingConsumer.handler(sink::next); @@ -288,61 +255,52 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { * Override some requests */ private Function overrideRequest(Function request, int botId) { - switch (request.getConstructor()) { - case SetTdlibParameters.CONSTRUCTOR: - // Fix session directory locations - var setTdlibParamsObj = (SetTdlibParameters) request; - setTdlibParamsObj.parameters.databaseDirectory = TDLibRemoteClient.getSessionDirectory(botId).toString(); - setTdlibParamsObj.parameters.filesDirectory = TDLibRemoteClient.getMediaDirectory(botId).toString(); - return request; - default: - return request; + if (request.getConstructor() == SetTdlibParameters.CONSTRUCTOR) { + // Fix session directory locations + var setTdlibParamsObj = (SetTdlibParameters) request; + setTdlibParamsObj.parameters.databaseDirectory = TDLibRemoteClient.getSessionDirectory(botId).toString(); + setTdlibParamsObj.parameters.filesDirectory = TDLibRemoteClient.getMediaDirectory(botId).toString(); } + return request; } @Override public Completable rxStop() { - return MonoUtils.toCompletable(botAlias - .asMono() - .timeout(Duration.ofSeconds(1), Mono.just("???")) - .flatMap(botAlias -> Mono - .fromRunnable(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopping")) - .then(executeConsumer - .asMono() - .timeout(Duration.ofSeconds(5), Mono.empty()) - .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)) - .doOnSuccess(s -> logger.trace("Unregistered execute consumer")) - ) - .then(readBinlogConsumer - .asMono() - .timeout(Duration.ofSeconds(10), Mono.empty()) - .flatMap(ec -> Mono.fromCallable(() -> { - Mono - // ReadBinLog will live for another 30 minutes. - // Since every consumer of ReadBinLog is identical, this should not pose a problem. - .delay(Duration.ofMinutes(30)) - .then(ec.rxUnregister().as(MonoUtils::toMono)) - .subscribe(); - return null; - }).subscribeOn(Schedulers.boundedElastic())) - ) - .then(readyToReceiveConsumer - .asMono() - .timeout(Duration.ofSeconds(5), Mono.empty()) - .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) - .then(pingConsumer - .asMono() - .timeout(Duration.ofSeconds(5), Mono.empty()) - .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) - .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex)) - .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped")) + return MonoUtils.toCompletable(Mono + .fromRunnable(() -> logger.info("Undeploy of bot \"" + botAlias.get() + "\": stopping")) + .then(Mono + .fromCallable(executeConsumer::get) + .flatMap(executeConsumer -> executeConsumer.rxUnregister().as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Unregistered execute consumer")) ) + .then(MonoUtils.fromBlockingEmpty(() -> { + var readBinlogConsumer = this.readBinlogConsumer.get(); + if (readBinlogConsumer != null) { + Mono + // ReadBinLog will live for another 10 minutes. + // Since every consumer of ReadBinLog is identical, this should not pose a problem. + .delay(Duration.ofMinutes(10)) + .then(readBinlogConsumer.rxUnregister().as(MonoUtils::toMono)) + .subscribe(); + } + })) + .then(Mono + .fromCallable(readyToReceiveConsumer::get) + .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)) + ) + .then(Mono + .fromCallable(pingConsumer::get) + .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)) + ) + .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias.get() + "\": stop failed", ex)) + .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias.get() + "\": stopped")) ); } - private Mono pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { + private Mono pipe(AsyncTdDirectImpl td, String botAddress, boolean local) { logger.trace("Preparing to pipe requests"); - Flux updatesFlux = td.receive(tdOptions) + Flux updatesFlux = td + .receive(tdOptions) .takeUntil(item -> { if (item instanceof Update) { var tdUpdate = (Update) item; @@ -437,7 +395,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .then(rxStop().as(MonoUtils::toMono)); }); - return MonoUtils.emitValue(this.pipeFlux, pipeFlux) - .doOnSuccess(s -> logger.trace("Prepared piping requests successfully")); + return MonoUtils.fromBlockingEmpty(() -> { + this.pipeFlux.set(pipeFlux); + logger.trace("Prepared piping requests successfully"); + }); } } diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index e9ef967..d869e80 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -161,65 +161,6 @@ public class MonoUtils { return Completable.fromPublisher(s); } - public static Mono fromEmitResult(EmitResult emitResult) { - return Mono.fromCallable(() -> { - emitResult.orThrow(); - return null; - }); - } - - public static Future fromEmitResultFuture(EmitResult emitResult) { - if (emitResult.isSuccess()) { - return Future.succeededFuture(); - } else { - return Future.failedFuture(new EmissionException(emitResult)); - } - } - - public static Mono emitValue(One sink, T value) { - return Mono.defer(() -> fromEmitResult(sink.tryEmitValue(value))); - } - - public static Mono emitNext(Many sink, T value) { - return Mono.defer(() -> fromEmitResult(sink.tryEmitNext(value))); - } - - public static Mono emitComplete(Many sink) { - return Mono.defer(() -> fromEmitResult(sink.tryEmitComplete())); - } - - public static Mono emitEmpty(Empty sink) { - return Mono.defer(() -> fromEmitResult(sink.tryEmitEmpty())); - } - - public static Mono emitEmpty(One sink) { - return Mono.defer(() -> fromEmitResult(sink.tryEmitEmpty())); - } - - public static Mono emitError(Empty sink, Throwable value) { - return Mono.defer(() -> fromEmitResult(sink.tryEmitError(value))); - } - - public static Future emitValueFuture(One sink, T value) { - return fromEmitResultFuture(sink.tryEmitValue(value)); - } - - public static Future emitNextFuture(Many sink, T value) { - return fromEmitResultFuture(sink.tryEmitNext(value)); - } - - public static Future emitCompleteFuture(Many sink) { - return fromEmitResultFuture(sink.tryEmitComplete()); - } - - public static Future emitErrorFuture(Empty sink, Throwable value) { - return fromEmitResultFuture(sink.tryEmitError(value)); - } - - public static Future emitEmptyFuture(Empty sink) { - return fromEmitResultFuture(sink.tryEmitEmpty()); - } - @SuppressWarnings({"unchecked", "rawtypes"}) public static Mono castVoid(Mono mono) { return (Mono) mono;