This commit is contained in:
Andrea Cavalli 2022-01-14 16:33:54 +01:00
parent 4c4b7a3677
commit 48fbca5fad
3 changed files with 79 additions and 49 deletions

View File

@ -40,14 +40,11 @@ public class KafkaConsumer {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<Integer, ClientBoundEvent> receiverOptions = ReceiverOptions ReceiverOptions<Integer, ClientBoundEvent> receiverOptions = ReceiverOptions
.<Integer, ClientBoundEvent>create(props) .<Integer, ClientBoundEvent>create(props)
.commitInterval(Duration.ofSeconds(10)) .commitInterval(Duration.ofSeconds(10))
.commitBatchSize(64) .commitBatchSize(64)
.pollTimeout(Duration.ofMinutes(2))
.maxCommitAttempts(100) .maxCommitAttempts(100)
.maxDeferredCommits(100); .maxDeferredCommits(100);
Pattern pattern; Pattern pattern;

View File

@ -1,5 +1,8 @@
package it.tdlight.reactiveapi; package it.tdlight.reactiveapi;
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNullElse;
import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.AuthorizationStateClosing; import it.tdlight.jni.TdApi.AuthorizationStateClosing;
import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut; import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut;
@ -10,9 +13,12 @@ import it.tdlight.jni.TdApi.UpdateNewMessage;
import it.tdlight.reactiveapi.Event.OnUpdateData; import it.tdlight.reactiveapi.Event.OnUpdateData;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.Disposable; import reactor.core.Disposable;
@ -25,6 +31,7 @@ public class PeriodicRestarter {
private final ReactiveApi api; private final ReactiveApi api;
private final Duration interval; private final Duration interval;
private final Set<Long> restartUserIds;
private final ReactiveApiMultiClient multiClient; private final ReactiveApiMultiClient multiClient;
/** /**
@ -42,9 +49,10 @@ public class PeriodicRestarter {
*/ */
private final ConcurrentMap<Long, Boolean> sessionAuthReady = new ConcurrentHashMap<>(); private final ConcurrentMap<Long, Boolean> sessionAuthReady = new ConcurrentHashMap<>();
public PeriodicRestarter(ReactiveApi api, Duration interval) { public PeriodicRestarter(ReactiveApi api, Duration interval, Set<Long> restartUserIds) {
this.api = api; this.api = api;
this.interval = interval; this.interval = interval;
this.restartUserIds = restartUserIds;
this.multiClient = api.multiClient("periodic-restarter"); this.multiClient = api.multiClient("periodic-restarter");
@ -53,45 +61,62 @@ public class PeriodicRestarter {
public Mono<Void> start() { public Mono<Void> start() {
return Mono.fromRunnable(() -> { return Mono.fromRunnable(() -> {
LOG.info("Starting periodic restarter..."); LOG.info("Starting periodic restarter...");
multiClient.clientBoundEvents(true).doOnNext(event -> { multiClient
if (event instanceof OnUpdateData onUpdate) { .clientBoundEvents(true)
if (onUpdate.update() instanceof UpdateAuthorizationState updateAuthorizationState) {
if (updateAuthorizationState.authorizationState instanceof AuthorizationStateReady) { // Filter events to reduce overhead
// Session is now ready .filter(event -> {
this.sessionAuthReady.put(event.liveId(), true); boolean isAllowedUpdate;
onSessionReady(event.liveId(), event.userId()); if (event instanceof OnUpdateData onUpdateData) {
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateLoggingOut) { isAllowedUpdate = switch (onUpdateData.update().getConstructor()) {
// Session is not ready anymore case UpdateAuthorizationState.CONSTRUCTOR, UpdateNewMessage.CONSTRUCTOR -> true;
this.sessionAuthReady.remove(event.liveId(), false); default -> false;
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosing) { };
// Session is not ready anymore } else {
this.sessionAuthReady.remove(event.liveId(), false); isAllowedUpdate = false;
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosed) { }
// Session is not ready anymore return isAllowedUpdate && restartUserIds.contains(event.userId());
this.sessionAuthReady.remove(event.liveId(), false); })
Boolean prev = closingByPeriodicRestarter.remove(event.liveId()); .cast(OnUpdateData.class)
var disposable = closeManagedByPeriodicRestarter.remove(event.userId());
boolean managed = prev != null && prev; .doOnNext(event -> {
// Check if the live session is managed by the periodic restarter if (event.update() instanceof UpdateAuthorizationState updateAuthorizationState) {
if (managed) { if (updateAuthorizationState.authorizationState instanceof AuthorizationStateReady) {
LOG.info("The session #IDU{} (liveId: {}) is being started", event.userId(), event.liveId()); // Session is now ready
// Restart the session this.sessionAuthReady.put(event.liveId(), true);
api.tryReviveSession(event.userId()).subscribeOn(Schedulers.parallel()).subscribe(); onSessionReady(event.liveId(), event.userId());
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateLoggingOut) {
// Session is not ready anymore
this.sessionAuthReady.remove(event.liveId(), false);
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosing) {
// Session is not ready anymore
this.sessionAuthReady.remove(event.liveId(), false);
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosed) {
// Session is not ready anymore
this.sessionAuthReady.remove(event.liveId(), false);
Boolean prev = closingByPeriodicRestarter.remove(event.liveId());
var disposable = closeManagedByPeriodicRestarter.remove(event.userId());
boolean managed = prev != null && prev;
// Check if the live session is managed by the periodic restarter
if (managed) {
LOG.info("The session #IDU{} (liveId: {}) is being started", event.userId(), event.liveId());
// Restart the session
api.tryReviveSession(event.userId()).subscribeOn(Schedulers.parallel()).subscribe();
}
// Dispose restarter anyway
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
} }
// Dispose restarter anyway } else if (event.update() instanceof UpdateNewMessage) {
if (disposable != null && !disposable.isDisposed()) { var wasReady = requireNonNullElse(this.sessionAuthReady.put(event.liveId(), true), false);
disposable.dispose(); if (!wasReady) {
onSessionReady(event.liveId(), event.userId());
} }
} }
} else if (onUpdate.update() instanceof UpdateNewMessage) { })
var wasReady = this.sessionAuthReady.getOrDefault(event.liveId(), false); .subscribeOn(Schedulers.parallel())
if (!wasReady) { .subscribe();
this.sessionAuthReady.put(event.liveId(), true);
onSessionReady(event.liveId(), event.userId());
}
}
}
}).subscribeOn(Schedulers.parallel()).subscribe();
LOG.info("Started periodic restarter"); LOG.info("Started periodic restarter");
}); });
} }
@ -104,19 +129,27 @@ public class PeriodicRestarter {
); );
// Restart after x time // Restart after x time
AtomicReference<Disposable> disposableRef = new AtomicReference<>();
var disposable = Schedulers var disposable = Schedulers
.parallel() .parallel()
.schedule(() -> { .schedule(() -> {
closeManagedByPeriodicRestarter.remove(liveId, disposableRef.get());
LOG.info("The session #IDU{} (liveId: {}) is being stopped", userId, liveId); LOG.info("The session #IDU{} (liveId: {}) is being stopped", userId, liveId);
closingByPeriodicRestarter.put(liveId, true); if (!requireNonNullElse(closingByPeriodicRestarter.put(liveId, true), false)) {
// Request restart // Request restart
multiClient multiClient
.request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15))) .request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15)))
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(); .retry(5)
.doOnError(ex -> LOG.error("Failed to restart bot {} (liveId={})", userId, liveId, ex))
.onErrorResume(ex -> Mono.empty())
.subscribe();
}
}, interval.toMillis(), TimeUnit.MILLISECONDS); }, interval.toMillis(), TimeUnit.MILLISECONDS);
closeManagedByPeriodicRestarter.put(liveId, disposable); disposableRef.set(disposable);
var prev = closeManagedByPeriodicRestarter.put(liveId, disposable);
if (prev != null) prev.dispose();
} }
public Mono<Void> stop() { public Mono<Void> stop() {

View File

@ -471,7 +471,7 @@ public abstract class ReactiveApiPublisher {
return Mono.from(rawTelegramClient.send(request, timeoutDuration)); return Mono.from(rawTelegramClient.send(request, timeoutDuration));
} else { } else {
LOG.error("Ignored a request because the current state is {}", state); LOG.error("Ignored a request because the current state is {}. Request: {}", state, requestObj);
return Mono.empty(); return Mono.empty();
} }
}) })