Optimize dynamic live id resolution

This commit is contained in:
Andrea Cavalli 2022-01-22 17:45:56 +01:00
parent 76ba67b760
commit 3a74997b49
9 changed files with 135 additions and 77 deletions

View File

@ -163,17 +163,17 @@ public class AtomixReactiveApi implements ReactiveApi {
var removeObsoleteDiskSessions = diskChangesMono var removeObsoleteDiskSessions = diskChangesMono
.flatMapIterable(diskChanges -> diskChanges.removedIds) .flatMapIterable(diskChanges -> diskChanges.removedIds)
.flatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId))) .concatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId)))
.then(); .then();
var addedDiskSessionsFlux = diskChangesMono var addedDiskSessionsFlux = diskChangesMono
.flatMapIterable(diskChanges -> diskChanges.addedIds) .flatMapIterable(diskChanges -> diskChanges.addedIds)
.flatMap(this::getLocalDiskSession); .concatMap(this::getLocalDiskSession);
var normalDiskSessionsFlux = diskChangesMono var normalDiskSessionsFlux = diskChangesMono
.flatMapIterable(diskChanges -> diskChanges.normalIds) .flatMapIterable(diskChanges -> diskChanges.normalIds)
.flatMap(this::getLocalDiskSession); .concatMap(this::getLocalDiskSession);
var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> { var addNewDiskSessions = addedDiskSessionsFlux.concatMap(diskSessionAndId -> {
var id = diskSessionAndId.id; var id = diskSessionAndId.id;
var diskSession = diskSessionAndId.diskSession; var diskSession = diskSessionAndId.diskSession;
return createSession(new LoadSessionFromDiskRequest(id, return createSession(new LoadSessionFromDiskRequest(id,
@ -183,7 +183,7 @@ public class AtomixReactiveApi implements ReactiveApi {
)); ));
}).then(); }).then();
var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> { var loadExistingDiskSessions = normalDiskSessionsFlux.concatMap(diskSessionAndId -> {
var id = diskSessionAndId.id; var id = diskSessionAndId.id;
var diskSession = diskSessionAndId.diskSession; var diskSession = diskSessionAndId.diskSession;
return createSession(new LoadSessionFromDiskRequest(id, return createSession(new LoadSessionFromDiskRequest(id,

View File

@ -31,7 +31,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
if (closed) { if (closed) {
return Flux.empty(); return Flux.empty();
} }
return kafkaConsumer.consumeMessages(subGroupId).takeUntil(s -> closed); return kafkaConsumer.consumeMessages(subGroupId).map(TimestampedClientBoundEvent::event).takeUntil(s -> closed);
} }
@Override @Override

View File

@ -23,30 +23,45 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient { abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BaseAtomixReactiveApiClient.class);
protected final ClusterEventService eventService; protected final ClusterEventService eventService;
protected final long userId; protected final long userId;
private Mono<Long> liveIdMono; private Disposable liveIdChangeSubscription;
private Flux<Long> liveIdChange;
private Mono<Long> emptyIdErrorMono;
public BaseAtomixReactiveApiClient(Atomix atomix, long userId) { public BaseAtomixReactiveApiClient(Atomix atomix, long userId) {
this.eventService = atomix.getEventService(); this.eventService = atomix.getEventService();
this.userId = userId; this.userId = userId;
} }
protected void initialize() {
this.liveIdChange = liveIdChange().cache(1);
this.liveIdChangeSubscription = liveIdChange
.subscribeOn(Schedulers.parallel())
.subscribe(v -> LOG.debug("Live id of user {} changed: {}", userId, v),
ex -> LOG.error("Failed to retrieve live id of user {}", userId)
);
this.emptyIdErrorMono = Mono.error(() -> new TdError(404, "Bot #IDU" + this.userId
+ " is not found on the cluster, no live id has been associated with it locally"));
}
@Override @Override
public final <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) { public final <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
// Don't care about race conditions here, because the mono is always the same. return liveIdChange
// This variable is set just to avoid creating the mono every time .take(1, true)
Mono<Long> liveIdMono = this.liveIdMono; .singleOrEmpty()
if (liveIdMono == null) { .switchIfEmpty(emptyIdErrorMono)
liveIdMono = (this.liveIdMono = resolveLiveId());
}
return liveIdMono
.flatMap(liveId -> Mono .flatMap(liveId -> Mono
.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
new Request<>(liveId, request, timeout), new Request<>(liveId, request, timeout),
@ -75,7 +90,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient {
}); });
} }
protected abstract Mono<Long> resolveLiveId(); protected abstract Flux<Long> liveIdChange();
@Override @Override
public final long getUserId() { public final long getUserId() {
@ -151,4 +166,11 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient {
default -> throw new IllegalStateException("Unexpected value: " + is.readByte()); default -> throw new IllegalStateException("Unexpected value: " + is.readByte());
}; };
} }
@Override
public void close() {
if (liveIdChangeSubscription != null) {
liveIdChangeSubscription.dispose();
}
}
} }

View File

@ -2,24 +2,32 @@ package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable; import reactor.core.Disposable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient implements AutoCloseable { public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient implements AutoCloseable {
private static final long LIVE_ID_UNSET = -1L; private static final Logger LOG = LoggerFactory.getLogger(DynamicAtomixReactiveApiClient.class);
private static final long LIVE_ID_FAILED = -2L;
private record CurrentLiveId(long sinceTimestamp, long liveId) implements Comparable<CurrentLiveId> {
@Override
public int compareTo(@NotNull DynamicAtomixReactiveApiClient.CurrentLiveId o) {
return Long.compare(this.sinceTimestamp, o.sinceTimestamp);
}
}
private final ReactiveApi api; private final ReactiveApi api;
private final AtomicLong liveId = new AtomicLong(LIVE_ID_UNSET); private final AtomicReference<Disposable> clientBoundEventsSubscription = new AtomicReference<>(null);
private final Disposable liveIdSubscription;
private final long userId; private final long userId;
private final Flux<ClientBoundEvent> clientBoundEvents; private final Flux<TimestampedClientBoundEvent> clientBoundEvents;
private final Flux<Long> liveIdChange; private final Flux<CurrentLiveId> liveIdChange;
private volatile boolean closed; private volatile boolean closed;
@ -28,52 +36,67 @@ public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient
this.api = api; this.api = api;
this.userId = userId; this.userId = userId;
clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId) var clientBoundEvents = kafkaConsumer
.doOnNext(e -> liveId.set(e.liveId())) .consumeMessages(subGroupId, userId)
.takeWhile(n -> !closed) .takeWhile(n -> !closed)
.share(); .publish()
.autoConnect(3, clientBoundEventsSubscription::set);
liveIdChange = this.clientBoundEvents() var firstLiveId = clientBoundEvents
.take(1, true)
.singleOrEmpty()
.map(e -> new CurrentLiveId(e.timestamp(), e.event().liveId()));
var sampledLiveIds = clientBoundEvents
.skip(1)
.sample(Duration.ofSeconds(1)) .sample(Duration.ofSeconds(1))
.map(Event::liveId) .map(e -> new CurrentLiveId(e.timestamp(), e.event().liveId()));
.distinctUntilChanged(); var startupLiveId = api
.resolveUserLiveId(userId)
.doOnError(ex -> LOG.error("Failed to resolve live id of user {}", userId, ex))
.onErrorResume(ex -> Mono.empty())
.map(liveId -> new CurrentLiveId(System.currentTimeMillis(), liveId));
this.liveIdSubscription = liveIdChange.subscribeOn(Schedulers.parallel()).subscribe(liveId::set); liveIdChange = startupLiveId
.concatWith(Flux.merge(firstLiveId, sampledLiveIds))
.scan((prev, next) -> {
if (next.compareTo(prev) > 0) {
LOG.trace("Replaced id {} with id {}", prev, next);
return next;
} else {
return prev;
}
})
.distinctUntilChanged(CurrentLiveId::liveId);
// minimum 3 subscribers:
// - firstClientBoundEvent
// - sampledClientBoundEvents
// - clientBoundEvents
this.clientBoundEvents = clientBoundEvents;
super.initialize();
} }
@Override @Override
public Flux<ClientBoundEvent> clientBoundEvents() { public Flux<ClientBoundEvent> clientBoundEvents() {
return clientBoundEvents; return clientBoundEvents.doFirst(() -> {
if (this.clientBoundEventsSubscription.get() != null) {
throw new UnsupportedOperationException("Already subscribed");
}
}).map(TimestampedClientBoundEvent::event);
} }
@Override @Override
protected Mono<Long> resolveLiveId() { protected Flux<Long> liveIdChange() {
return Mono return liveIdChange.map(CurrentLiveId::liveId);
.fromSupplier(this.liveId::get)
.flatMap(liveId -> {
if (liveId == LIVE_ID_UNSET) {
return api.resolveUserLiveId(userId)
.switchIfEmpty(Mono.error(this::createLiveIdFailed))
.doOnError(ex -> this.liveId.compareAndSet(LIVE_ID_UNSET, LIVE_ID_FAILED));
} else if (liveId == LIVE_ID_FAILED) {
return Mono.error(createLiveIdFailed());
} else {
return Mono.just(liveId);
}
});
}
private Throwable createLiveIdFailed() {
return new TdError(404, "Bot #IDU" + this.userId
+ " is not found on the cluster, no live id has been associated with it locally");
}
public Flux<Long> liveIdChange() {
return liveIdChange;
} }
public void close() { public void close() {
this.closed = true; this.closed = true;
liveIdSubscription.dispose(); var clientBoundEventsSubscription = this.clientBoundEventsSubscription.get();
if (clientBoundEventsSubscription != null) {
clientBoundEventsSubscription.dispose();
}
super.close();
} }
} }

View File

@ -7,8 +7,8 @@ import java.util.Map;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -57,7 +57,7 @@ public class KafkaConsumer {
return KafkaReceiver.create(options); return KafkaReceiver.create(options);
} }
private Flux<ClientBoundEvent> retryIfCleanup(Flux<ClientBoundEvent> clientBoundEventFlux) { private Flux<TimestampedClientBoundEvent> retryIfCleanup(Flux<TimestampedClientBoundEvent> clientBoundEventFlux) {
return clientBoundEventFlux.retryWhen(Retry return clientBoundEventFlux.retryWhen(Retry
.backoff(Long.MAX_VALUE, Duration.ofMillis(100)) .backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5)) .maxBackoff(Duration.ofSeconds(5))
@ -65,24 +65,30 @@ public class KafkaConsumer {
.doBeforeRetry(s -> LOG.warn("Rebalancing in progress"))); .doBeforeRetry(s -> LOG.warn("Rebalancing in progress")));
} }
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, long userId, long liveId) { public Flux<TimestampedClientBoundEvent> consumeMessages(@NotNull String subGroupId, long userId, long liveId) {
return consumeMessagesInternal(subGroupId, userId).filter(e -> e.liveId() == liveId); return consumeMessagesInternal(subGroupId, userId).filter(e -> e.event().liveId() == liveId);
} }
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, long userId) { public Flux<TimestampedClientBoundEvent> consumeMessages(@NotNull String subGroupId, long userId) {
return consumeMessagesInternal(subGroupId, userId); return consumeMessagesInternal(subGroupId, userId);
} }
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId) { public Flux<TimestampedClientBoundEvent> consumeMessages(@NotNull String subGroupId) {
return consumeMessagesInternal(subGroupId, null); return consumeMessagesInternal(subGroupId, null);
} }
private Flux<ClientBoundEvent> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { private Flux<TimestampedClientBoundEvent> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId)
.receive() .receive()
.log("consume-messages", Level.FINEST, SignalType.REQUEST) .log("consume-messages", Level.FINEST, SignalType.REQUEST)
.doOnNext(result -> result.receiverOffset().acknowledge()) .doOnNext(result -> result.receiverOffset().acknowledge())
.map(ConsumerRecord::value) .map(record -> {
if (record.timestampType() == TimestampType.CREATE_TIME) {
return new TimestampedClientBoundEvent(record.timestamp(), record.value());
} else {
return new TimestampedClientBoundEvent(1, record.value());
}
})
.transform(this::retryIfCleanup); .transform(this::retryIfCleanup);
} }
} }

View File

@ -8,7 +8,7 @@ import reactor.core.publisher.Mono;
public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {
private final Flux<ClientBoundEvent> clientBoundEvents; private final Flux<ClientBoundEvent> clientBoundEvents;
private final Mono<Long> liveId; private final long liveId;
LiveAtomixReactiveApiClient(Atomix atomix, LiveAtomixReactiveApiClient(Atomix atomix,
KafkaConsumer kafkaConsumer, KafkaConsumer kafkaConsumer,
@ -16,8 +16,11 @@ public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {
long userId, long userId,
String subGroupId) { String subGroupId) {
super(atomix, userId); super(atomix, userId);
this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId, liveId).share(); this.clientBoundEvents = kafkaConsumer
this.liveId = Mono.just(liveId); .consumeMessages(subGroupId, userId, liveId)
.map(TimestampedClientBoundEvent::event);
this.liveId = liveId;
super.initialize();
} }
@Override @Override
@ -26,8 +29,7 @@ public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {
} }
@Override @Override
public Mono<Long> resolveLiveId() { protected Flux<Long> liveIdChange() {
return liveId; return Flux.just(liveId);
} }
} }

View File

@ -152,7 +152,7 @@ public class PeriodicRestarter {
if (!requireNonNullElse(closingByPeriodicRestarter.put(liveId, true), false)) { if (!requireNonNullElse(closingByPeriodicRestarter.put(liveId, true), false)) {
// Request restart // Request restart
multiClient multiClient
.request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15))) .request(userId, liveId, new Close(), Instant.now().plus(Duration.ofMinutes(5)))
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5)))
.doOnError(ex -> { .doOnError(ex -> {

View File

@ -64,9 +64,8 @@ import reactor.core.scheduler.Schedulers;
public abstract class ReactiveApiPublisher { public abstract class ReactiveApiPublisher {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class);
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(3); private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5);
private final KafkaProducer kafkaProducer; private final KafkaProducer kafkaProducer;
private final ClusterEventService eventService; private final ClusterEventService eventService;
@ -102,7 +101,7 @@ public abstract class ReactiveApiPublisher {
subscription.close(); subscription.close();
rawTelegramClient.dispose(); rawTelegramClient.dispose();
}); });
})).publishOn(Schedulers.parallel()).share(); }));
} }
public static ReactiveApiPublisher fromToken(Atomix atomix, public static ReactiveApiPublisher fromToken(Atomix atomix,
@ -181,8 +180,9 @@ public abstract class ReactiveApiPublisher {
.onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage())))) .onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage()))))
// when an error arrives, close the session // when an error arrives, close the session
.flatMap(ignored -> Mono .take(1, true)
.from(rawTelegramClient.send(new TdApi.Close(), Duration.ofMinutes(1))) .concatMap(ignored -> Mono
.from(rawTelegramClient.send(new TdApi.Close(), SPECIAL_RAW_TIMEOUT_DURATION))
.then(Mono.empty()) .then(Mono.empty())
) )
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())

View File

@ -0,0 +1,5 @@
package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
public record TimestampedClientBoundEvent(long timestamp, ClientBoundEvent event) {}