diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index f30137e..c2167b7 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -45,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.metrics.AtomicDetailedTimeAbsoluteSamples; import reactor.core.Disposable; +import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -94,33 +95,53 @@ public abstract class ReactiveApiPublisher { LOG.info("Starting session \"{}\" in path \"{}\"", this, path); var publishedResultingEvents = telegramClient .subscribeOn(Schedulers.parallel()) + // Handle signals, then return a ResultingEvent .mapNotNull(this::onSignal) .publish(); publishedResultingEvents + // Obtain only TDLib-bound events .filter(s -> s instanceof TDLibBoundResultingEvent) .map(s -> ((TDLibBoundResultingEvent) s).action()) - .flatMapSequential(function -> Mono + // Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered + .limitRate(4) + .onBackpressureBuffer(64, BufferOverflowStrategy.ERROR) + // Send requests to tdlib + .concatMap(function -> Mono .from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION)) .mapNotNull(resp -> { if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) { - LOG.error("Received error for special request {}: {}", function, resp); + LOG.error("Received error for special request {}: {}\nThe instance will be closed", function, resp); return new OnUpdateError(liveId, userId, (TdApi.Error) resp); } else { return null; } }) - .doOnError(ex -> LOG.error("Failed to receive the response for special request {}", function, ex)) + .doOnError(ex -> LOG.error("Failed to receive the response for special request {}\n" + + " The instance will be closed", function, ex)) .onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage())))) ) + .doOnError(ex -> LOG.error("Failed to receive resulting events. The instance will be closed", ex)) + .onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage())))) + + // when an error arrives, close the session + .flatMap(ignored -> Mono + .from(rawTelegramClient.send(new TdApi.Close(), Duration.ofMinutes(1))) + .then(Mono.empty()) + ) .subscribeOn(Schedulers.parallel()) - .subscribe(this::sendClientBoundEvent); + .subscribe(); + publishedResultingEvents + // Obtain only client-bound events .filter(s -> s instanceof ClientBoundResultingEvent) .cast(ClientBoundResultingEvent.class) .map(ClientBoundResultingEvent::event) + + // Send events to the client .subscribeOn(Schedulers.parallel()) - .subscribe(this::sendClientBoundEvent); + .subscribe(clientBoundEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events", + clientBoundEvent, ReactiveApiPublisher::serializeEvent)); var prev = this.disposable.getAndSet(publishedResultingEvents.connect()); @@ -130,13 +151,6 @@ public abstract class ReactiveApiPublisher { } } - private void sendClientBoundEvent(ClientBoundEvent clientBoundResultingEvent) { - eventService.broadcast("session-" + liveId + "-client-bound-events", - clientBoundResultingEvent, - ReactiveApiPublisher::serializeEvent - ); - } - @Nullable private ResultingEvent onSignal(Signal signal) { // Update the state @@ -158,6 +172,11 @@ public abstract class ReactiveApiPublisher { LOG.error("Received an error signal", signal.getException()); return null; } + if (signal.isClosed()) { + signal.getClosed(); + LOG.info("Received a closed signal"); + return null; + } if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) { var error = ((TdApi.Error) signal.getUpdate()); LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message);