From 8645231031fea26876f32e861070ad1880809bf5 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 21 Mar 2022 01:08:12 +0100 Subject: [PATCH] Improve kafka logging, update tdlib --- pom.xml | 2 +- .../DynamicAtomixReactiveApiClient.java | 10 +++++++++- .../it/tdlight/reactiveapi/KafkaConsumer.java | 16 +++++++++++++++- .../it/tdlight/reactiveapi/KafkaProducer.java | 9 +++++++++ 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 730ecfe..1bab614 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ it.tdlight tdlight-java-bom - 2.8.2.0 + 2.8.2.2 pom import diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index e3ed411..f3e10a0 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -41,7 +41,15 @@ public class DynamicAtomixReactiveApiClient extends BaseAtomixReactiveApiClient .consumeMessages(subGroupId, userId) .takeWhile(n -> !closed) .publish() - .autoConnect(3, clientBoundEventsSubscription::set); + .autoConnect(3, clientBoundEventsSubscription::set) + .onErrorResume(CancellationException.class, ex -> { + if ("Disconnected".equals(ex.getMessage())) { + LOG.debug("Disconnected client {}", userId, ex); + return Mono.empty(); + } else { + return Mono.error(ex); + } + }); var firstLiveId = clientBoundEvents .take(1, true) diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 46bb24a..c47d0f2 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -1,5 +1,7 @@ package it.tdlight.reactiveapi; +import it.tdlight.common.Init; +import it.tdlight.common.utils.CantLoadLibrary; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import java.time.Duration; import java.util.HashMap; @@ -31,6 +33,12 @@ public class KafkaConsumer { } public KafkaReceiver createReceiver(@NotNull String groupId, @Nullable Long userId) { + try { + Init.start(); + } catch (CantLoadLibrary e) { + LOG.error("Can't load TDLight library", e); + throw new RuntimeException(e); + } Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId() + (userId != null ? ("_" + userId) : "")); @@ -80,7 +88,13 @@ public class KafkaConsumer { private Flux consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) .receive() - .log("consume-messages", Level.FINEST, SignalType.REQUEST) + .log("consume-messages" + (userId != null ? "-" + userId : ""), + Level.FINEST, + SignalType.REQUEST, + SignalType.ON_NEXT, + SignalType.ON_ERROR, + SignalType.ON_COMPLETE + ) .doOnNext(result -> result.receiverOffset().acknowledge()) .map(record -> { if (record.timestampType() == TimestampType.CREATE_TIME) { diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index 10351cf..a0fde6e 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -4,6 +4,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.logging.Level; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; @@ -11,6 +12,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.SignalType; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; @@ -40,6 +42,13 @@ public class KafkaProducer { userId.getTopic(), event ), null)) + .log("produce-messages-" + userId, + Level.FINEST, + SignalType.REQUEST, + SignalType.ON_NEXT, + SignalType.ON_ERROR, + SignalType.ON_COMPLETE + ) .transform(sender::send) .doOnError(e -> LOG.error("Send failed", e)) .then();