Handle errors

This commit is contained in:
Andrea Cavalli 2021-12-10 16:25:18 +01:00
parent ee19a97b00
commit 430dbeb261
1 changed files with 31 additions and 12 deletions

View File

@ -45,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.metrics.AtomicDetailedTimeAbsoluteSamples;
import reactor.core.Disposable;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -94,33 +95,53 @@ public abstract class ReactiveApiPublisher {
LOG.info("Starting session \"{}\" in path \"{}\"", this, path);
var publishedResultingEvents = telegramClient
.subscribeOn(Schedulers.parallel())
// Handle signals, then return a ResultingEvent
.mapNotNull(this::onSignal)
.publish();
publishedResultingEvents
// Obtain only TDLib-bound events
.filter(s -> s instanceof TDLibBoundResultingEvent<?>)
.map(s -> ((TDLibBoundResultingEvent<?>) s).action())
.flatMapSequential(function -> Mono
// Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered
.limitRate(4)
.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR)
// Send requests to tdlib
.concatMap(function -> Mono
.from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION))
.mapNotNull(resp -> {
if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) {
LOG.error("Received error for special request {}: {}", function, resp);
LOG.error("Received error for special request {}: {}\nThe instance will be closed", function, resp);
return new OnUpdateError(liveId, userId, (TdApi.Error) resp);
} else {
return null;
}
})
.doOnError(ex -> LOG.error("Failed to receive the response for special request {}", function, ex))
.doOnError(ex -> LOG.error("Failed to receive the response for special request {}\n"
+ " The instance will be closed", function, ex))
.onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage()))))
)
.doOnError(ex -> LOG.error("Failed to receive resulting events. The instance will be closed", ex))
.onErrorResume(ex -> Mono.just(new OnUpdateError(liveId, userId, new TdApi.Error(500, ex.getMessage()))))
// when an error arrives, close the session
.flatMap(ignored -> Mono
.from(rawTelegramClient.send(new TdApi.Close(), Duration.ofMinutes(1)))
.then(Mono.empty())
)
.subscribeOn(Schedulers.parallel())
.subscribe(this::sendClientBoundEvent);
.subscribe();
publishedResultingEvents
// Obtain only client-bound events
.filter(s -> s instanceof ClientBoundResultingEvent)
.cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event)
// Send events to the client
.subscribeOn(Schedulers.parallel())
.subscribe(this::sendClientBoundEvent);
.subscribe(clientBoundEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events",
clientBoundEvent, ReactiveApiPublisher::serializeEvent));
var prev = this.disposable.getAndSet(publishedResultingEvents.connect());
@ -130,13 +151,6 @@ public abstract class ReactiveApiPublisher {
}
}
private void sendClientBoundEvent(ClientBoundEvent clientBoundResultingEvent) {
eventService.broadcast("session-" + liveId + "-client-bound-events",
clientBoundResultingEvent,
ReactiveApiPublisher::serializeEvent
);
}
@Nullable
private ResultingEvent onSignal(Signal signal) {
// Update the state
@ -158,6 +172,11 @@ public abstract class ReactiveApiPublisher {
LOG.error("Received an error signal", signal.getException());
return null;
}
if (signal.isClosed()) {
signal.getClosed();
LOG.info("Received a closed signal");
return null;
}
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = ((TdApi.Error) signal.getUpdate());
LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message);