diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index b11f606..1406dc3 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; @@ -469,7 +470,7 @@ public class AtomixReactiveApi implements ReactiveApi { Duration.ofSeconds(1) )) .onErrorResume(ex -> { - if (ex instanceof MessagingException.NoRemoteHandler) { + if (ex instanceof MessagingException.NoRemoteHandler || ex instanceof CancellationException) { return Mono.empty(); } else { return Mono.error(ex); diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index 54ae31b..e3ed411 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -2,6 +2,7 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicReference; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -94,8 +95,12 @@ public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient public void close() { this.closed = true; var clientBoundEventsSubscription = this.clientBoundEventsSubscription.get(); - if (clientBoundEventsSubscription != null) { - clientBoundEventsSubscription.dispose(); + if (clientBoundEventsSubscription != null && !clientBoundEventsSubscription.isDisposed()) { + try { + clientBoundEventsSubscription.dispose(); + } catch (CancellationException ignored) { + LOG.debug("Reactive api client for user {} has been cancelled", userId); + } } super.close(); } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 5136923..46bb24a 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -33,7 +33,7 @@ public class KafkaConsumer { public KafkaReceiver createReceiver(@NotNull String groupId, @Nullable Long userId) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId() + (userId != null ? ("_" + userId) : "")); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class);