diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index e3c5307..58c9822 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -40,14 +40,11 @@ public class KafkaConsumer { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class); - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis()); - props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); ReceiverOptions receiverOptions = ReceiverOptions .create(props) .commitInterval(Duration.ofSeconds(10)) .commitBatchSize(64) - .pollTimeout(Duration.ofMinutes(2)) .maxCommitAttempts(100) .maxDeferredCommits(100); Pattern pattern; diff --git a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java index 979bdef..882cf08 100644 --- a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java +++ b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java @@ -1,5 +1,8 @@ package it.tdlight.reactiveapi; +import static java.util.Objects.requireNonNull; +import static java.util.Objects.requireNonNullElse; + import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateClosing; import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut; @@ -10,9 +13,12 @@ import it.tdlight.jni.TdApi.UpdateNewMessage; import it.tdlight.reactiveapi.Event.OnUpdateData; import java.time.Duration; import java.time.Instant; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -25,6 +31,7 @@ public class PeriodicRestarter { private final ReactiveApi api; private final Duration interval; + private final Set restartUserIds; private final ReactiveApiMultiClient multiClient; /** @@ -42,9 +49,10 @@ public class PeriodicRestarter { */ private final ConcurrentMap sessionAuthReady = new ConcurrentHashMap<>(); - public PeriodicRestarter(ReactiveApi api, Duration interval) { + public PeriodicRestarter(ReactiveApi api, Duration interval, Set restartUserIds) { this.api = api; this.interval = interval; + this.restartUserIds = restartUserIds; this.multiClient = api.multiClient("periodic-restarter"); @@ -53,45 +61,62 @@ public class PeriodicRestarter { public Mono start() { return Mono.fromRunnable(() -> { LOG.info("Starting periodic restarter..."); - multiClient.clientBoundEvents(true).doOnNext(event -> { - if (event instanceof OnUpdateData onUpdate) { - if (onUpdate.update() instanceof UpdateAuthorizationState updateAuthorizationState) { - if (updateAuthorizationState.authorizationState instanceof AuthorizationStateReady) { - // Session is now ready - this.sessionAuthReady.put(event.liveId(), true); - onSessionReady(event.liveId(), event.userId()); - } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateLoggingOut) { - // Session is not ready anymore - this.sessionAuthReady.remove(event.liveId(), false); - } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosing) { - // Session is not ready anymore - this.sessionAuthReady.remove(event.liveId(), false); - } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosed) { - // Session is not ready anymore - this.sessionAuthReady.remove(event.liveId(), false); - Boolean prev = closingByPeriodicRestarter.remove(event.liveId()); - var disposable = closeManagedByPeriodicRestarter.remove(event.userId()); - boolean managed = prev != null && prev; - // Check if the live session is managed by the periodic restarter - if (managed) { - LOG.info("The session #IDU{} (liveId: {}) is being started", event.userId(), event.liveId()); - // Restart the session - api.tryReviveSession(event.userId()).subscribeOn(Schedulers.parallel()).subscribe(); + multiClient + .clientBoundEvents(true) + + // Filter events to reduce overhead + .filter(event -> { + boolean isAllowedUpdate; + if (event instanceof OnUpdateData onUpdateData) { + isAllowedUpdate = switch (onUpdateData.update().getConstructor()) { + case UpdateAuthorizationState.CONSTRUCTOR, UpdateNewMessage.CONSTRUCTOR -> true; + default -> false; + }; + } else { + isAllowedUpdate = false; + } + return isAllowedUpdate && restartUserIds.contains(event.userId()); + }) + .cast(OnUpdateData.class) + + .doOnNext(event -> { + if (event.update() instanceof UpdateAuthorizationState updateAuthorizationState) { + if (updateAuthorizationState.authorizationState instanceof AuthorizationStateReady) { + // Session is now ready + this.sessionAuthReady.put(event.liveId(), true); + onSessionReady(event.liveId(), event.userId()); + } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateLoggingOut) { + // Session is not ready anymore + this.sessionAuthReady.remove(event.liveId(), false); + } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosing) { + // Session is not ready anymore + this.sessionAuthReady.remove(event.liveId(), false); + } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosed) { + // Session is not ready anymore + this.sessionAuthReady.remove(event.liveId(), false); + Boolean prev = closingByPeriodicRestarter.remove(event.liveId()); + var disposable = closeManagedByPeriodicRestarter.remove(event.userId()); + boolean managed = prev != null && prev; + // Check if the live session is managed by the periodic restarter + if (managed) { + LOG.info("The session #IDU{} (liveId: {}) is being started", event.userId(), event.liveId()); + // Restart the session + api.tryReviveSession(event.userId()).subscribeOn(Schedulers.parallel()).subscribe(); + } + // Dispose restarter anyway + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } } - // Dispose restarter anyway - if (disposable != null && !disposable.isDisposed()) { - disposable.dispose(); + } else if (event.update() instanceof UpdateNewMessage) { + var wasReady = requireNonNullElse(this.sessionAuthReady.put(event.liveId(), true), false); + if (!wasReady) { + onSessionReady(event.liveId(), event.userId()); } } - } else if (onUpdate.update() instanceof UpdateNewMessage) { - var wasReady = this.sessionAuthReady.getOrDefault(event.liveId(), false); - if (!wasReady) { - this.sessionAuthReady.put(event.liveId(), true); - onSessionReady(event.liveId(), event.userId()); - } - } - } - }).subscribeOn(Schedulers.parallel()).subscribe(); + }) + .subscribeOn(Schedulers.parallel()) + .subscribe(); LOG.info("Started periodic restarter"); }); } @@ -104,19 +129,27 @@ public class PeriodicRestarter { ); // Restart after x time + AtomicReference disposableRef = new AtomicReference<>(); var disposable = Schedulers .parallel() .schedule(() -> { + closeManagedByPeriodicRestarter.remove(liveId, disposableRef.get()); LOG.info("The session #IDU{} (liveId: {}) is being stopped", userId, liveId); - closingByPeriodicRestarter.put(liveId, true); - // Request restart - multiClient - .request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15))) - .subscribeOn(Schedulers.parallel()) - .subscribe(); + if (!requireNonNullElse(closingByPeriodicRestarter.put(liveId, true), false)) { + // Request restart + multiClient + .request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15))) + .subscribeOn(Schedulers.parallel()) + .retry(5) + .doOnError(ex -> LOG.error("Failed to restart bot {} (liveId={})", userId, liveId, ex)) + .onErrorResume(ex -> Mono.empty()) + .subscribe(); + } }, interval.toMillis(), TimeUnit.MILLISECONDS); - closeManagedByPeriodicRestarter.put(liveId, disposable); + disposableRef.set(disposable); + var prev = closeManagedByPeriodicRestarter.put(liveId, disposable); + if (prev != null) prev.dispose(); } public Mono stop() { diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index c0080bb..91bcbf7 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -471,7 +471,7 @@ public abstract class ReactiveApiPublisher { return Mono.from(rawTelegramClient.send(request, timeoutDuration)); } else { - LOG.error("Ignored a request because the current state is {}", state); + LOG.error("Ignored a request because the current state is {}. Request: {}", state, requestObj); return Mono.empty(); } })