diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 0ab2d52..8dac702 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -5,6 +5,7 @@ import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; import it.tdlight.common.utils.LibraryVersion; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.Ignored; import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; @@ -46,6 +47,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; +import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; @@ -66,15 +68,16 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo private final Map>>> responses = new ConcurrentHashMap<>(); private final AtomicLong requestId = new AtomicLong(0); + private final Disposable subscription; public BaseAtomixReactiveApiClient(KafkaTdlibClient kafkaTdlibClient, long userId) { this.userId = userId; this.clientId = System.nanoTime(); - kafkaTdlibClient.request().sendMessages(userId, requests.asFlux()) + var subscription1 = kafkaTdlibClient.request().sendMessages(userId, requests.asFlux()) .subscribeOn(Schedulers.boundedElastic()) .subscribe(v -> {}, ex -> LOG.error("Failed to send requests", ex)); - kafkaTdlibClient.response() + var subscription2 = kafkaTdlibClient.response() .consumeMessages("td-responses", userId) .filter(response -> response.data().clientId() == clientId) .doOnNext(response -> { @@ -88,6 +91,10 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo }) .subscribeOn(Schedulers.parallel()) .subscribe(); + this.subscription = () -> { + subscription1.dispose(); + subscription2.dispose(); + }; } @Override @@ -121,7 +128,8 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo sink.error(new UnsupportedOperationException("Unknown response type: " + responseObj.data().getClass())); } }) - .timeout(timeoutDuration, timeoutErrorMono.doFirst(() -> this.responses.remove(requestId))); + .timeout(timeoutDuration, timeoutErrorMono) + .doFinally(s -> this.responses.remove(requestId)); requests.emitNext(new Request<>(clientId, requestId, request, timeout), EmitFailureHandler.busyLooping(TEN_MS)); return response; }); @@ -167,5 +175,11 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo @Override public void close() { + subscription.dispose(); + long now = System.currentTimeMillis(); + responses.forEach((requestId, cf) -> cf.complete(new Timestamped<>(now, + new Response<>(clientId, requestId, userId, new Error(408, "Request Timeout")) + ))); + responses.clear(); } }