From 3a74997b493c984e796426efa4fde8528f1ec6c2 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 22 Jan 2022 17:45:56 +0100 Subject: [PATCH] Optimize dynamic live id resolution --- .../reactiveapi/AtomixReactiveApi.java | 10 +- .../AtomixReactiveApiMultiClient.java | 2 +- .../BaseAtomixReactiveApiClient.java | 44 ++++++-- .../DynamicAtomixReactiveApiClient.java | 103 +++++++++++------- .../it/tdlight/reactiveapi/KafkaConsumer.java | 22 ++-- .../LiveAtomixReactiveApiClient.java | 14 ++- .../reactiveapi/PeriodicRestarter.java | 2 +- .../reactiveapi/ReactiveApiPublisher.java | 10 +- .../TimestampedClientBoundEvent.java | 5 + 9 files changed, 135 insertions(+), 77 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/TimestampedClientBoundEvent.java diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index b666d93..726b02a 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -163,17 +163,17 @@ public class AtomixReactiveApi implements ReactiveApi { var removeObsoleteDiskSessions = diskChangesMono .flatMapIterable(diskChanges -> diskChanges.removedIds) - .flatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId))) + .concatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId))) .then(); var addedDiskSessionsFlux = diskChangesMono .flatMapIterable(diskChanges -> diskChanges.addedIds) - .flatMap(this::getLocalDiskSession); + .concatMap(this::getLocalDiskSession); var normalDiskSessionsFlux = diskChangesMono .flatMapIterable(diskChanges -> diskChanges.normalIds) - .flatMap(this::getLocalDiskSession); + .concatMap(this::getLocalDiskSession); - var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> { + var addNewDiskSessions = addedDiskSessionsFlux.concatMap(diskSessionAndId -> { var id = diskSessionAndId.id; var diskSession = diskSessionAndId.diskSession; return createSession(new LoadSessionFromDiskRequest(id, @@ -183,7 +183,7 @@ public class AtomixReactiveApi implements ReactiveApi { )); }).then(); - var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> { + var loadExistingDiskSessions = normalDiskSessionsFlux.concatMap(diskSessionAndId -> { var id = diskSessionAndId.id; var diskSession = diskSessionAndId.diskSession; return createSession(new LoadSessionFromDiskRequest(id, diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index 9a0b34c..7129844 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -31,7 +31,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut if (closed) { return Flux.empty(); } - return kafkaConsumer.consumeMessages(subGroupId).takeUntil(s -> closed); + return kafkaConsumer.consumeMessages(subGroupId).map(TimestampedClientBoundEvent::event).takeUntil(s -> closed); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index e7bf19b..32802a8 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -23,30 +23,45 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient { +abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(BaseAtomixReactiveApiClient.class); protected final ClusterEventService eventService; protected final long userId; - private Mono liveIdMono; + private Disposable liveIdChangeSubscription; + private Flux liveIdChange; + private Mono emptyIdErrorMono; public BaseAtomixReactiveApiClient(Atomix atomix, long userId) { this.eventService = atomix.getEventService(); this.userId = userId; } + protected void initialize() { + this.liveIdChange = liveIdChange().cache(1); + this.liveIdChangeSubscription = liveIdChange + .subscribeOn(Schedulers.parallel()) + .subscribe(v -> LOG.debug("Live id of user {} changed: {}", userId, v), + ex -> LOG.error("Failed to retrieve live id of user {}", userId) + ); + this.emptyIdErrorMono = Mono.error(() -> new TdError(404, "Bot #IDU" + this.userId + + " is not found on the cluster, no live id has been associated with it locally")); + } + @Override public final Mono request(TdApi.Function request, Instant timeout) { - // Don't care about race conditions here, because the mono is always the same. - // This variable is set just to avoid creating the mono every time - Mono liveIdMono = this.liveIdMono; - if (liveIdMono == null) { - liveIdMono = (this.liveIdMono = resolveLiveId()); - } - - return liveIdMono + return liveIdChange + .take(1, true) + .singleOrEmpty() + .switchIfEmpty(emptyIdErrorMono) .flatMap(liveId -> Mono .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", new Request<>(liveId, request, timeout), @@ -75,7 +90,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient { }); } - protected abstract Mono resolveLiveId(); + protected abstract Flux liveIdChange(); @Override public final long getUserId() { @@ -151,4 +166,11 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient { default -> throw new IllegalStateException("Unexpected value: " + is.readByte()); }; } + + @Override + public void close() { + if (liveIdChangeSubscription != null) { + liveIdChangeSubscription.dispose(); + } + } } diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index e41d537..54ae31b 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -2,24 +2,32 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import java.time.Duration; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient implements AutoCloseable { - private static final long LIVE_ID_UNSET = -1L; - private static final long LIVE_ID_FAILED = -2L; + private static final Logger LOG = LoggerFactory.getLogger(DynamicAtomixReactiveApiClient.class); + + private record CurrentLiveId(long sinceTimestamp, long liveId) implements Comparable { + + @Override + public int compareTo(@NotNull DynamicAtomixReactiveApiClient.CurrentLiveId o) { + return Long.compare(this.sinceTimestamp, o.sinceTimestamp); + } + } private final ReactiveApi api; - private final AtomicLong liveId = new AtomicLong(LIVE_ID_UNSET); - private final Disposable liveIdSubscription; + private final AtomicReference clientBoundEventsSubscription = new AtomicReference<>(null); private final long userId; - private final Flux clientBoundEvents; - private final Flux liveIdChange; + private final Flux clientBoundEvents; + private final Flux liveIdChange; private volatile boolean closed; @@ -28,52 +36,67 @@ public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient this.api = api; this.userId = userId; - clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId) - .doOnNext(e -> liveId.set(e.liveId())) + var clientBoundEvents = kafkaConsumer + .consumeMessages(subGroupId, userId) .takeWhile(n -> !closed) - .share(); + .publish() + .autoConnect(3, clientBoundEventsSubscription::set); - liveIdChange = this.clientBoundEvents() + var firstLiveId = clientBoundEvents + .take(1, true) + .singleOrEmpty() + .map(e -> new CurrentLiveId(e.timestamp(), e.event().liveId())); + var sampledLiveIds = clientBoundEvents + .skip(1) .sample(Duration.ofSeconds(1)) - .map(Event::liveId) - .distinctUntilChanged(); + .map(e -> new CurrentLiveId(e.timestamp(), e.event().liveId())); + var startupLiveId = api + .resolveUserLiveId(userId) + .doOnError(ex -> LOG.error("Failed to resolve live id of user {}", userId, ex)) + .onErrorResume(ex -> Mono.empty()) + .map(liveId -> new CurrentLiveId(System.currentTimeMillis(), liveId)); - this.liveIdSubscription = liveIdChange.subscribeOn(Schedulers.parallel()).subscribe(liveId::set); + liveIdChange = startupLiveId + .concatWith(Flux.merge(firstLiveId, sampledLiveIds)) + .scan((prev, next) -> { + if (next.compareTo(prev) > 0) { + LOG.trace("Replaced id {} with id {}", prev, next); + return next; + } else { + return prev; + } + }) + .distinctUntilChanged(CurrentLiveId::liveId); + + // minimum 3 subscribers: + // - firstClientBoundEvent + // - sampledClientBoundEvents + // - clientBoundEvents + this.clientBoundEvents = clientBoundEvents; + + super.initialize(); } @Override public Flux clientBoundEvents() { - return clientBoundEvents; + return clientBoundEvents.doFirst(() -> { + if (this.clientBoundEventsSubscription.get() != null) { + throw new UnsupportedOperationException("Already subscribed"); + } + }).map(TimestampedClientBoundEvent::event); } @Override - protected Mono resolveLiveId() { - return Mono - .fromSupplier(this.liveId::get) - .flatMap(liveId -> { - if (liveId == LIVE_ID_UNSET) { - return api.resolveUserLiveId(userId) - .switchIfEmpty(Mono.error(this::createLiveIdFailed)) - .doOnError(ex -> this.liveId.compareAndSet(LIVE_ID_UNSET, LIVE_ID_FAILED)); - } else if (liveId == LIVE_ID_FAILED) { - return Mono.error(createLiveIdFailed()); - } else { - return Mono.just(liveId); - } - }); - } - - private Throwable createLiveIdFailed() { - return new TdError(404, "Bot #IDU" + this.userId - + " is not found on the cluster, no live id has been associated with it locally"); - } - - public Flux liveIdChange() { - return liveIdChange; + protected Flux liveIdChange() { + return liveIdChange.map(CurrentLiveId::liveId); } public void close() { this.closed = true; - liveIdSubscription.dispose(); + var clientBoundEventsSubscription = this.clientBoundEventsSubscription.get(); + if (clientBoundEventsSubscription != null) { + clientBoundEventsSubscription.dispose(); + } + super.close(); } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index b004122..5136923 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -7,8 +7,8 @@ import java.util.Map; import java.util.logging.Level; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.errors.RebalanceInProgressException; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -57,7 +57,7 @@ public class KafkaConsumer { return KafkaReceiver.create(options); } - private Flux retryIfCleanup(Flux clientBoundEventFlux) { + private Flux retryIfCleanup(Flux clientBoundEventFlux) { return clientBoundEventFlux.retryWhen(Retry .backoff(Long.MAX_VALUE, Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(5)) @@ -65,24 +65,30 @@ public class KafkaConsumer { .doBeforeRetry(s -> LOG.warn("Rebalancing in progress"))); } - public Flux consumeMessages(@NotNull String subGroupId, long userId, long liveId) { - return consumeMessagesInternal(subGroupId, userId).filter(e -> e.liveId() == liveId); + public Flux consumeMessages(@NotNull String subGroupId, long userId, long liveId) { + return consumeMessagesInternal(subGroupId, userId).filter(e -> e.event().liveId() == liveId); } - public Flux consumeMessages(@NotNull String subGroupId, long userId) { + public Flux consumeMessages(@NotNull String subGroupId, long userId) { return consumeMessagesInternal(subGroupId, userId); } - public Flux consumeMessages(@NotNull String subGroupId) { + public Flux consumeMessages(@NotNull String subGroupId) { return consumeMessagesInternal(subGroupId, null); } - private Flux consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { + private Flux consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) .receive() .log("consume-messages", Level.FINEST, SignalType.REQUEST) .doOnNext(result -> result.receiverOffset().acknowledge()) - .map(ConsumerRecord::value) + .map(record -> { + if (record.timestampType() == TimestampType.CREATE_TIME) { + return new TimestampedClientBoundEvent(record.timestamp(), record.value()); + } else { + return new TimestampedClientBoundEvent(1, record.value()); + } + }) .transform(this::retryIfCleanup); } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 556139c..f3fb9d5 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -8,7 +8,7 @@ import reactor.core.publisher.Mono; public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { private final Flux clientBoundEvents; - private final Mono liveId; + private final long liveId; LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, @@ -16,8 +16,11 @@ public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { long userId, String subGroupId) { super(atomix, userId); - this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId, liveId).share(); - this.liveId = Mono.just(liveId); + this.clientBoundEvents = kafkaConsumer + .consumeMessages(subGroupId, userId, liveId) + .map(TimestampedClientBoundEvent::event); + this.liveId = liveId; + super.initialize(); } @Override @@ -26,8 +29,7 @@ public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { } @Override - public Mono resolveLiveId() { - return liveId; + protected Flux liveIdChange() { + return Flux.just(liveId); } - } diff --git a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java index 59e3199..d65e79e 100644 --- a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java +++ b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java @@ -152,7 +152,7 @@ public class PeriodicRestarter { if (!requireNonNullElse(closingByPeriodicRestarter.put(liveId, true), false)) { // Request restart multiClient - .request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15))) + .request(userId, liveId, new Close(), Instant.now().plus(Duration.ofMinutes(5))) .subscribeOn(Schedulers.parallel()) .retryWhen(Retry.backoff(5, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(5))) .doOnError(ex -> { diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index ba969f9..b31deb5 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -64,9 +64,8 @@ import reactor.core.scheduler.Schedulers; public abstract class ReactiveApiPublisher { - private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); - private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(3); + private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5); private final KafkaProducer kafkaProducer; private final ClusterEventService eventService; @@ -102,7 +101,7 @@ public abstract class ReactiveApiPublisher { subscription.close(); rawTelegramClient.dispose(); }); - })).publishOn(Schedulers.parallel()).share(); + })); } public static ReactiveApiPublisher fromToken(Atomix atomix, @@ -181,8 +180,9 @@ public abstract class ReactiveApiPublisher { .onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage())))) // when an error arrives, close the session - .flatMap(ignored -> Mono - .from(rawTelegramClient.send(new TdApi.Close(), Duration.ofMinutes(1))) + .take(1, true) + .concatMap(ignored -> Mono + .from(rawTelegramClient.send(new TdApi.Close(), SPECIAL_RAW_TIMEOUT_DURATION)) .then(Mono.empty()) ) .subscribeOn(Schedulers.parallel()) diff --git a/src/main/java/it/tdlight/reactiveapi/TimestampedClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/TimestampedClientBoundEvent.java new file mode 100644 index 0000000..ec404d5 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TimestampedClientBoundEvent.java @@ -0,0 +1,5 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; + +public record TimestampedClientBoundEvent(long timestamp, ClientBoundEvent event) {}