From 2156ec9ed72bb961eb270ca2734bb1c85e562e7b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 9 Jan 2022 20:50:58 +0100 Subject: [PATCH] Check if the client is really closed --- .../AtomixReactiveApiMultiClient.java | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index 9592827..3da7140 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -16,13 +16,13 @@ import reactor.core.publisher.Mono; public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable { - private final ReactiveApi api; private final ClusterEventService eventService; private final Flux clientBoundEvents; + private volatile boolean closed = false; + AtomixReactiveApiMultiClient(AtomixReactiveApi api) { - this.api = api; this.eventService = api.getAtomix().getEventService(); clientBoundEvents = Flux @@ -38,6 +38,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); }, OverflowStrategy.ERROR) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) + .takeUntil(s -> closed) .share(); } @@ -48,30 +49,34 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut @Override public Mono request(long userId, long liveId, TdApi.Function request, Instant timeout) { - return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", - new Request<>(liveId, request, timeout), - LiveAtomixReactiveApiClient::serializeRequest, - LiveAtomixReactiveApiClient::deserializeResponse, - Duration.between(Instant.now(), timeout) - )) - .handle((item, sink) -> { - if (item instanceof TdApi.Error error) { - sink.error(new TdError(error.code, error.message)); - } else { - //noinspection unchecked - sink.next((T) item); - } - }) - .onErrorMap(ex -> { - if (ex instanceof MessagingException.NoRemoteHandler) { - return new TdError(404, "Bot #IDU" + userId + " (live id: " + liveId + ") is not found on the cluster"); - } else { - return ex; - } - }); + return Mono.fromCompletionStage(() -> { + if (closed) { + return CompletableFuture.failedFuture(new TdError(500, "Session is closed")); + } + return eventService.send("session-" + liveId + "-requests", + new Request<>(liveId, request, timeout), + LiveAtomixReactiveApiClient::serializeRequest, + LiveAtomixReactiveApiClient::deserializeResponse, + Duration.between(Instant.now(), timeout) + ); + }).handle((item, sink) -> { + if (item instanceof TdApi.Error error) { + sink.error(new TdError(error.code, error.message)); + } else { + //noinspection unchecked + sink.next((T) item); + } + }).onErrorMap(ex -> { + if (ex instanceof MessagingException.NoRemoteHandler) { + return new TdError(404, "Bot #IDU" + userId + " (live id: " + liveId + ") is not found on the cluster"); + } else { + return ex; + } + }); } @Override public void close() { + closed = true; } }