Check if the client is really closed

This commit is contained in:
Andrea Cavalli 2022-01-09 20:50:58 +01:00
parent fd0bfda2eb
commit 2156ec9ed7

View File

@ -16,13 +16,13 @@ import reactor.core.publisher.Mono;
public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable { public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable {
private final ReactiveApi api;
private final ClusterEventService eventService; private final ClusterEventService eventService;
private final Flux<ClientBoundEvent> clientBoundEvents; private final Flux<ClientBoundEvent> clientBoundEvents;
private volatile boolean closed = false;
AtomixReactiveApiMultiClient(AtomixReactiveApi api) { AtomixReactiveApiMultiClient(AtomixReactiveApi api) {
this.api = api;
this.eventService = api.getAtomix().getEventService(); this.eventService = api.getAtomix().getEventService();
clientBoundEvents = Flux clientBoundEvents = Flux
@ -38,6 +38,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR) }, OverflowStrategy.ERROR)
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.takeUntil(s -> closed)
.share(); .share();
} }
@ -48,30 +49,34 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
@Override @Override
public <T extends TdApi.Object> Mono<T> request(long userId, long liveId, TdApi.Function<T> request, Instant timeout) { public <T extends TdApi.Object> Mono<T> request(long userId, long liveId, TdApi.Function<T> request, Instant timeout) {
return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", return Mono.fromCompletionStage(() -> {
new Request<>(liveId, request, timeout), if (closed) {
LiveAtomixReactiveApiClient::serializeRequest, return CompletableFuture.failedFuture(new TdError(500, "Session is closed"));
LiveAtomixReactiveApiClient::deserializeResponse, }
Duration.between(Instant.now(), timeout) return eventService.send("session-" + liveId + "-requests",
)) new Request<>(liveId, request, timeout),
.<T>handle((item, sink) -> { LiveAtomixReactiveApiClient::serializeRequest,
if (item instanceof TdApi.Error error) { LiveAtomixReactiveApiClient::deserializeResponse,
sink.error(new TdError(error.code, error.message)); Duration.between(Instant.now(), timeout)
} else { );
//noinspection unchecked }).<T>handle((item, sink) -> {
sink.next((T) item); if (item instanceof TdApi.Error error) {
} sink.error(new TdError(error.code, error.message));
}) } else {
.onErrorMap(ex -> { //noinspection unchecked
if (ex instanceof MessagingException.NoRemoteHandler) { sink.next((T) item);
return new TdError(404, "Bot #IDU" + userId + " (live id: " + liveId + ") is not found on the cluster"); }
} else { }).onErrorMap(ex -> {
return ex; if (ex instanceof MessagingException.NoRemoteHandler) {
} return new TdError(404, "Bot #IDU" + userId + " (live id: " + liveId + ") is not found on the cluster");
}); } else {
return ex;
}
});
} }
@Override @Override
public void close() { public void close() {
closed = true;
} }
} }