This commit is contained in:
Andrea Cavalli 2022-01-22 13:08:11 +01:00
parent 797808114c
commit 76ba67b760
1 changed files with 9 additions and 4 deletions

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -31,17 +30,23 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient {
protected final ClusterEventService eventService;
protected final long userId;
private final Mono<Long> liveId;
private Mono<Long> liveIdMono;
public BaseAtomixReactiveApiClient(Atomix atomix, long userId) {
this.eventService = atomix.getEventService();
this.userId = userId;
this.liveId = resolveLiveId();
}
@Override
public final <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
return liveId
// Don't care about race conditions here, because the mono is always the same.
// This variable is set just to avoid creating the mono every time
Mono<Long> liveIdMono = this.liveIdMono;
if (liveIdMono == null) {
liveIdMono = (this.liveIdMono = resolveLiveId());
}
return liveIdMono
.flatMap(liveId -> Mono
.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
new Request<>(liveId, request, timeout),