diff --git a/pom.xml b/pom.xml
index d09073b..6cb199a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -129,7 +129,7 @@
it.tdlight
tdlight-java
- [3.171.31,)
+ [3.171.36,)
it.cavallium
diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java
index 805f586..d5153e3 100644
--- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java
+++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java
@@ -2,14 +2,24 @@ package it.tdlight.tdlibsession;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.eventbus.ReplyException;
+import it.tdlight.utils.MonoUtils;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.One;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuples;
public class EventBusFlux {
private static final Logger logger = LoggerFactory.getLogger(EventBusFlux.class);
@@ -27,7 +37,11 @@ public class EventBusFlux {
}
}
- public static Mono serve(Flux flux,
+ /**
+ *
+ * @return tuple. T1 = flux served, T2 = error that caused cancelling of the subscription
+ */
+ public static Tuple2, Mono> serve(Flux flux,
EventBus eventBus,
String fluxAddress,
DeliveryOptions baseDeliveryOptions,
@@ -40,7 +54,8 @@ public class EventBusFlux {
var signalDeliveryOptions = new DeliveryOptions(deliveryOptions)
.setCodecName(signalsCodec.name());
AtomicInteger subscriptionsCount = new AtomicInteger();
- return Mono.create(sink -> {
+ One fatalErrorSink = Sinks.one();
+ var servedMono = Mono.create(sink -> {
MessageConsumer subscribe = eventBus.consumer(fluxAddress + ".subscribe");
subscribe.handler(msg -> {
@@ -62,47 +77,52 @@ public class EventBusFlux {
subscriptionReady.handler(subscriptionReadyMsg -> {
subscriptionReady.unregister(subscriptionReadyUnregistered -> {
if (subscriptionReadyUnregistered.succeeded()) {
- var subscription = flux.subscribe(item -> {
- eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, msg2 -> {
- if (msg2.failed()) {
- logger.error("Failed to send onNext signal", msg2.cause());
- }
- });
- }, error -> {
- eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, msg2 -> {
- logger.info("Errored flux \"" + fluxAddress + "\"");
- if (msg2.failed()) {
- logger.error("Failed to send onError signal", msg2.cause());
- }
- });
- }, () -> {
- eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> {
- logger.info("Completed flux \"" + fluxAddress + "\"");
- if (msg2.failed()) {
- logger.error("Failed to send onComplete signal", msg2.cause());
- }
- });
- });
+ AtomicReference atomicSubscription = new AtomicReference<>(null);
+ var subscription = flux
+ .onErrorResume(error -> Mono
+ .>create(errorSink -> {
+ var responseHandler = MonoUtils.toHandler(errorSink);
+ eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, responseHandler);
+ })
+ .then(Mono.empty())
+ )
+ .flatMap(item -> Mono.>create(itemSink -> {
+ var responseHandler = MonoUtils.toHandler(itemSink);
+ eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, responseHandler);
+ })).subscribe(response -> {}, error -> {
+ if (error instanceof ReplyException) {
+ var errorMessage = error.getMessage();
+ if (errorMessage != null && errorMessage.contains("NO_HANDLERS")) {
+ logger.error("Can't send a signal of flux \"" + fluxAddress + "\" because the connection was lost");
+ } else {
+ logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.getLocalizedMessage());
+ }
+ } else {
+ logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error);
+ }
+ fatalErrorSink.tryEmitValue(error);
+ disposeFlux(atomicSubscription.get(), fatalErrorSink, cancel, dispose, fluxAddress, () -> {
+ logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error");
+ });
+ }, () -> {
+ eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> {
+ logger.info("Completed flux \"" + fluxAddress + "\"");
+ if (msg2.failed()) {
+ logger.error("Failed to send onComplete signal", msg2.cause());
+ fatalErrorSink.tryEmitValue(msg2.cause());
+ }
+ });
+ });
+ atomicSubscription.set(subscription);
cancel.handler(msg3 -> {
- logger.warn("Cancelling flux \"" + fluxAddress + "\"");
+ logger.trace("Cancelling flux \"" + fluxAddress + "\"");
subscription.dispose();
+ logger.debug("Cancelled flux \"" + fluxAddress + "\"");
msg3.reply(EMPTY, deliveryOptions);
});
dispose.handler(msg2 -> {
- logger.warn("Disposing flux \"" + fluxAddress + "\"");
- subscription.dispose();
- cancel.unregister(v -> {
- if (v.failed()) {
- logger.error("Failed to unregister cancel", v.cause());
- }
- dispose.unregister(v2 -> {
- if (v.failed()) {
- logger.error("Failed to unregister dispose", v2.cause());
- }
- msg2.reply(EMPTY);
- });
- });
+ disposeFlux(subscription, fatalErrorSink, cancel, dispose, fluxAddress, () -> msg2.reply(EMPTY));
});
cancel.completionHandler(h -> {
@@ -148,6 +168,33 @@ public class EventBusFlux {
}
});
});
+
+ return Tuples.of(servedMono, fatalErrorSink.asMono());
+ }
+
+ private static void disposeFlux(@Nullable Disposable subscription,
+ One fatalErrorSink,
+ MessageConsumer cancel,
+ MessageConsumer dispose,
+ String fluxAddress,
+ Runnable after) {
+ logger.trace("Disposing flux \"" + fluxAddress + "\"");
+ fatalErrorSink.tryEmitEmpty();
+ if (subscription != null) {
+ subscription.dispose();
+ }
+ cancel.unregister(v -> {
+ if (v.failed()) {
+ logger.error("Failed to unregister cancel", v.cause());
+ }
+ dispose.unregister(v2 -> {
+ if (v.failed()) {
+ logger.error("Failed to unregister dispose", v2.cause());
+ }
+ logger.debug("Disposed flux \"" + fluxAddress + "\"");
+ after.run();
+ });
+ });
}
public static Flux connect(EventBus eventBus,
diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java
index 0cf71e8..9836664 100644
--- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java
+++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java
@@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.remoteclient;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
+import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.net.JksOptions;
import io.vertx.core.shareddata.AsyncMap;
@@ -17,6 +18,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@ public class TDLibRemoteClient implements AutoCloseable {
private final Set membersAddresses;
private final Many clusterManager = Sinks.many().replay().latest();
private final Scheduler deploymentScheduler = Schedulers.newSingle("TDLib", false);
+ private final AtomicInteger statsActiveDeployments = new AtomicInteger();
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses) {
this.securityInfo = securityInfo;
@@ -124,7 +127,7 @@ public class TDLibRemoteClient implements AutoCloseable {
if (mapResult.succeeded()) {
var deployableBotAddresses = mapResult.result();
- clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
+ clusterManager.getSharedData().getLock("deployment", lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult putAllResult) -> {
@@ -168,7 +171,7 @@ public class TDLibRemoteClient implements AutoCloseable {
AsyncMap