diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index b9dcbff..f00afaa 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -34,6 +34,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; +import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; @@ -41,14 +43,12 @@ import reactor.core.scheduler.Schedulers; abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { private static final Logger LOG = LoggerFactory.getLogger(BaseAtomixReactiveApiClient.class); - - private static final Duration HUNDRED_MS = Duration.ofMillis(100); private static final long EMPTY_USER_ID = 0; // Temporary id used to make requests private final long clientId; private final Consumer> requests; - private final Map>>> responses + private final Map>>> responses = new ConcurrentHashMap<>(); private final AtomicLong requestId = new AtomicLong(0); private final Disposable subscription; @@ -63,7 +63,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { LOG.debug("Bot received a response for an unknown request id: {}", response.data().requestId()); return; } - responseSink.complete(response); + responseSink.success(response); }).subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, ex -> LOG.error("Reactive api client responses flux has failed unexpectedly!", ex)); } @@ -78,9 +78,14 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { if (timeoutDuration.isNegative() || timeoutDuration.isZero()) { return Mono.error(timeoutError); } - var cf = new CompletableFuture>>(); - this.responses.put(requestId, cf); - Mono response = Mono.fromFuture(() -> cf) + Mono response = Mono + .>>create(sink -> { + sink.onDispose(() -> this.responses.remove(requestId, sink)); + var prev = this.responses.putIfAbsent(requestId, sink); + if (prev != null) { + sink.error(new IllegalStateException("Can't call the same request twice: " + requestId)); + } + }) .timeout(timeoutDuration, Mono.fromSupplier(() -> new Timestamped<>(requestTimestamp.toEpochMilli(), new Response<>(clientId, requestId, userId, new TdApi.Error(408, "Request Timeout"))))) .handle((responseObj, sink) -> { @@ -100,8 +105,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { } else { sink.error(new UnsupportedOperationException("Unknown response type: " + responseObj.data().getClass())); } - }) - .doFinally(s -> this.responses.remove(requestId)); + }); requests.accept(new Request<>(userId, clientId, requestId, request, timeout)); return response; }); @@ -139,7 +143,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { return Mono.fromRunnable(() -> { subscription.dispose(); long now = System.currentTimeMillis(); - responses.forEach((requestId, cf) -> cf.complete(new Timestamped<>(now, + responses.forEach((requestId, cf) -> cf.success(new Timestamped<>(now, new Response<>(clientId, requestId, EMPTY_USER_ID, new Error(408, "Request Timeout")) ))); responses.clear(); diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java index aa362ea..91e0b58 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java @@ -64,7 +64,9 @@ public class TdlibChannelsSharedReceive implements Closeable { .subscribeOn(Schedulers.parallel()) .subscribe(n -> {}, ex -> { LOG.error("An error when handling requests killed the requests subscriber!", ex); - requests.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + synchronized (requests) { + requests.emitError(ex, EmitFailureHandler.FAIL_FAST); + } }); }