diff --git a/pom.xml b/pom.xml index 4e672e7..fa2d01a 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ it.tdlight tdlight-java - 2.7.8.23 + 2.7.8.40 diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index 86cc4fd..2ca6901 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -23,6 +23,7 @@ import java.nio.file.Paths; import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; import org.jetbrains.annotations.Nullable; import org.warp.commonutils.log.Logger; @@ -44,7 +45,7 @@ public class TDLibRemoteClient implements AutoCloseable { private final String netInterface; private final int port; private final Set membersAddresses; - private final One clusterManager = Sinks.one(); + private final AtomicReference clusterManager = new AtomicReference<>(); public static boolean runningFromIntelliJ() { return System.getProperty("java.class.path").contains("idea_rt.jar") @@ -125,8 +126,13 @@ public class TDLibRemoteClient implements AutoCloseable { .block(); // Close vert.x on shutdown - var vertx = client.clusterManager.asMono().blockOptional().orElseThrow().getVertx(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> MonoUtils.toMono(vertx.rxClose()).blockOptional())); + var vertxMono = Mono.fromCallable(() -> client.clusterManager.get().getVertx()); + Runtime + .getRuntime() + .addShutdownHook(new Thread(() -> vertxMono + .flatMap(vertx -> MonoUtils.toMono(vertx.rxClose())) + .blockOptional()) + ); } public Mono start() { @@ -163,14 +169,9 @@ public class TDLibRemoteClient implements AutoCloseable { port, membersAddresses )) - .doOnNext(clusterManager::tryEmitValue) - .doOnError(clusterManager::tryEmitError) - .doOnSuccess(s -> { - if (s == null) { - clusterManager.tryEmitEmpty(); - } - }) + .doOnSuccess(clusterManager::set) .single() + .doOnError(ex -> logger.error("Failed to set cluster manager", ex)) .flatMap(clusterManager -> { MessageConsumer startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot"); @@ -236,10 +237,6 @@ public class TDLibRemoteClient implements AutoCloseable { @Override public void close() { - this.clusterManager - .asMono() - .blockOptional() - .map(TdClusterManager::getVertx) - .ifPresent(v -> v.rxClose().blockingAwait()); + this.clusterManager.get().getVertx().rxClose().blockingAwait(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index eb28e5f..51cba2c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -92,8 +92,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { @Override public Flux receive(AsyncTdDirectOptions options) { - // If closed it will be either true or false - final One closedFromTd = Sinks.one(); return Mono .fromCallable(td::get) .single() @@ -104,13 +102,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { && ((UpdateAuthorizationState) update).authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { logger.debug("Received closed status from tdlib"); - closedFromTd.tryEmitValue(true); } }) - .doOnCancel(() -> { - // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. - closedFromTd.tryEmitValue(false); - }) .subscribeOn(Schedulers.boundedElastic()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index da44f11..04b4fa1 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -53,17 +53,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final One binlog = Sinks.one(); - private final One> updates = Sinks.one(); + private final AtomicReference> updates = new AtomicReference<>(); // This will only result in a successful completion, never completes in other ways - private final Empty updatesStreamEnd = Sinks.one(); + private final Empty updatesStreamEnd = Sinks.empty(); // This will only result in a crash, never completes in other ways - private final Empty crash = Sinks.one(); + private final Empty crash = Sinks.empty(); // This will only result in a successful completion, never completes in other ways - private final Empty pingFail = Sinks.one(); + private final Empty pingFail = Sinks.empty(); // This will only result in a successful completion, never completes in other ways. // It will be called when UpdateAuthorizationStateClosing is intercepted. // If it's completed stop checking if the ping works or not - private final Empty authStateClosing = Sinks.one(); + private final Empty authStateClosing = Sinks.empty(); private long botId; private String botAddress; @@ -229,12 +229,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono .fromRunnable(() -> logger.trace("Emitting updates flux to sink")) .then(MonoUtils.fromBlockingEmpty(() -> { - EmitResult result; - while ((result = this.updates.tryEmitValue(updateConsumer)) == EmitResult.FAIL_NON_SERIALIZED) { - // 10ms - LockSupport.parkNanos(10000000); + var previous = this.updates.getAndSet(updateConsumer); + if (previous != null) { + logger.error("Already subscribed a consumer to the updates"); } - result.orThrow(); })) .doOnSuccess(s -> logger.trace("Emitted updates flux to sink")) .doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster")) @@ -251,9 +249,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono .fromRunnable(() -> logger.trace("Called receive() from parent")) - .then(updates.asMono()) - .publishOn(Schedulers.parallel()) - .timeout(Duration.ofSeconds(30)) + .then(Mono.fromCallable(() -> updates.get())) .doOnSuccess(s -> logger.trace("Registering updates flux")) .flatMapMany(updatesMessageConsumer -> MonoUtils.fromMessageConsumer(Mono .empty() diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 15c6a99..bcabdbc 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -17,8 +17,10 @@ import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.utils.MonoUtils; import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -80,14 +82,12 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd @Override public Completable rxStop() { - closeRequest.tryEmitEmpty(); - return Completable.complete(); + return Completable.fromRunnable(closeRequest::tryEmitEmpty); } @Override public Mono initialize() { - return td - .initialize(); + return td.initialize(); } @Override