Performance improvement

This commit is contained in:
Andrea Cavalli 2021-10-26 00:01:44 +02:00
parent 630b293e8e
commit 00fb002457
5 changed files with 26 additions and 40 deletions

View File

@ -66,7 +66,7 @@
<dependency> <dependency>
<groupId>it.tdlight</groupId> <groupId>it.tdlight</groupId>
<artifactId>tdlight-java</artifactId> <artifactId>tdlight-java</artifactId>
<version>2.7.8.23</version> <version>2.7.8.40</version>
</dependency> </dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>

View File

@ -23,6 +23,7 @@ import java.nio.file.Paths;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
@ -44,7 +45,7 @@ public class TDLibRemoteClient implements AutoCloseable {
private final String netInterface; private final String netInterface;
private final int port; private final int port;
private final Set<String> membersAddresses; private final Set<String> membersAddresses;
private final One<TdClusterManager> clusterManager = Sinks.one(); private final AtomicReference<TdClusterManager> clusterManager = new AtomicReference<>();
public static boolean runningFromIntelliJ() { public static boolean runningFromIntelliJ() {
return System.getProperty("java.class.path").contains("idea_rt.jar") return System.getProperty("java.class.path").contains("idea_rt.jar")
@ -125,8 +126,13 @@ public class TDLibRemoteClient implements AutoCloseable {
.block(); .block();
// Close vert.x on shutdown // Close vert.x on shutdown
var vertx = client.clusterManager.asMono().blockOptional().orElseThrow().getVertx(); var vertxMono = Mono.fromCallable(() -> client.clusterManager.get().getVertx());
Runtime.getRuntime().addShutdownHook(new Thread(() -> MonoUtils.toMono(vertx.rxClose()).blockOptional())); Runtime
.getRuntime()
.addShutdownHook(new Thread(() -> vertxMono
.flatMap(vertx -> MonoUtils.toMono(vertx.rxClose()))
.blockOptional())
);
} }
public Mono<Void> start() { public Mono<Void> start() {
@ -163,14 +169,9 @@ public class TDLibRemoteClient implements AutoCloseable {
port, port,
membersAddresses membersAddresses
)) ))
.doOnNext(clusterManager::tryEmitValue) .doOnSuccess(clusterManager::set)
.doOnError(clusterManager::tryEmitError)
.doOnSuccess(s -> {
if (s == null) {
clusterManager.tryEmitEmpty();
}
})
.single() .single()
.doOnError(ex -> logger.error("Failed to set cluster manager", ex))
.flatMap(clusterManager -> { .flatMap(clusterManager -> {
MessageConsumer<StartSessionMessage> startBotConsumer MessageConsumer<StartSessionMessage> startBotConsumer
= clusterManager.getEventBus().consumer("bots.start-bot"); = clusterManager.getEventBus().consumer("bots.start-bot");
@ -236,10 +237,6 @@ public class TDLibRemoteClient implements AutoCloseable {
@Override @Override
public void close() { public void close() {
this.clusterManager this.clusterManager.get().getVertx().rxClose().blockingAwait();
.asMono()
.blockOptional()
.map(TdClusterManager::getVertx)
.ifPresent(v -> v.rxClose().blockingAwait());
} }
} }

View File

@ -92,8 +92,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
@Override @Override
public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) { public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) {
// If closed it will be either true or false
final One<Boolean> closedFromTd = Sinks.one();
return Mono return Mono
.fromCallable(td::get) .fromCallable(td::get)
.single() .single()
@ -104,13 +102,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
&& ((UpdateAuthorizationState) update).authorizationState.getConstructor() && ((UpdateAuthorizationState) update).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) { == AuthorizationStateClosed.CONSTRUCTOR) {
logger.debug("Received closed status from tdlib"); logger.debug("Received closed status from tdlib");
closedFromTd.tryEmitValue(true);
} }
}) })
.doOnCancel(() -> {
// Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false.
closedFromTd.tryEmitValue(false);
})
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(Schedulers.boundedElastic());
} }
} }

View File

@ -53,17 +53,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final One<BinlogAsyncFile> binlog = Sinks.one(); private final One<BinlogAsyncFile> binlog = Sinks.one();
private final One<MessageConsumer<TdResultList>> updates = Sinks.one(); private final AtomicReference<MessageConsumer<TdResultList>> updates = new AtomicReference<>();
// This will only result in a successful completion, never completes in other ways // This will only result in a successful completion, never completes in other ways
private final Empty<Void> updatesStreamEnd = Sinks.one(); private final Empty<Void> updatesStreamEnd = Sinks.empty();
// This will only result in a crash, never completes in other ways // This will only result in a crash, never completes in other ways
private final Empty<Void> crash = Sinks.one(); private final Empty<Void> crash = Sinks.empty();
// This will only result in a successful completion, never completes in other ways // This will only result in a successful completion, never completes in other ways
private final Empty<Void> pingFail = Sinks.one(); private final Empty<Void> pingFail = Sinks.empty();
// This will only result in a successful completion, never completes in other ways. // This will only result in a successful completion, never completes in other ways.
// It will be called when UpdateAuthorizationStateClosing is intercepted. // It will be called when UpdateAuthorizationStateClosing is intercepted.
// If it's completed stop checking if the ping works or not // If it's completed stop checking if the ping works or not
private final Empty<Void> authStateClosing = Sinks.one(); private final Empty<Void> authStateClosing = Sinks.empty();
private long botId; private long botId;
private String botAddress; private String botAddress;
@ -229,12 +229,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono return Mono
.fromRunnable(() -> logger.trace("Emitting updates flux to sink")) .fromRunnable(() -> logger.trace("Emitting updates flux to sink"))
.then(MonoUtils.fromBlockingEmpty(() -> { .then(MonoUtils.fromBlockingEmpty(() -> {
EmitResult result; var previous = this.updates.getAndSet(updateConsumer);
while ((result = this.updates.tryEmitValue(updateConsumer)) == EmitResult.FAIL_NON_SERIALIZED) { if (previous != null) {
// 10ms logger.error("Already subscribed a consumer to the updates");
LockSupport.parkNanos(10000000);
} }
result.orThrow();
})) }))
.doOnSuccess(s -> logger.trace("Emitted updates flux to sink")) .doOnSuccess(s -> logger.trace("Emitted updates flux to sink"))
.doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster")) .doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster"))
@ -251,9 +249,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono return Mono
.fromRunnable(() -> logger.trace("Called receive() from parent")) .fromRunnable(() -> logger.trace("Called receive() from parent"))
.then(updates.asMono()) .then(Mono.fromCallable(() -> updates.get()))
.publishOn(Schedulers.parallel())
.timeout(Duration.ofSeconds(30))
.doOnSuccess(s -> logger.trace("Registering updates flux")) .doOnSuccess(s -> logger.trace("Registering updates flux"))
.flatMapMany(updatesMessageConsumer -> MonoUtils.fromMessageConsumer(Mono .flatMapMany(updatesMessageConsumer -> MonoUtils.fromMessageConsumer(Mono
.empty() .empty()

View File

@ -17,8 +17,10 @@ import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
@ -80,14 +82,12 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
@Override @Override
public Completable rxStop() { public Completable rxStop() {
closeRequest.tryEmitEmpty(); return Completable.fromRunnable(closeRequest::tryEmitEmpty);
return Completable.complete();
} }
@Override @Override
public Mono<Void> initialize() { public Mono<Void> initialize() {
return td return td.initialize();
.initialize();
} }
@Override @Override