From a140e7a2b1f22753a8008464a8907d48e0ec3fb3 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 13 Jan 2022 11:20:44 +0100 Subject: [PATCH] Configure kafka --- src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java | 6 ++++-- src/main/java/it/tdlight/reactiveapi/KafkaProducer.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 9170c80..ab77cd9 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -1,6 +1,7 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -36,8 +37,9 @@ public class KafkaConsumer { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - ReceiverOptions receiverOptions = ReceiverOptions.create(props); - ReceiverOptions.create(receiverOptions.consumerProperties()); + ReceiverOptions receiverOptions = ReceiverOptions + .create(props) + .commitInterval(Duration.ofSeconds(10)); Pattern pattern; if (liveId == null && userId == null) { pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+"); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index 8748ab0..7521744 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -31,7 +31,7 @@ public class KafkaProducer { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ClientBoundEventSerializer.class); SenderOptions senderOptions = SenderOptions.create(props); - sender = KafkaSender.create(senderOptions); + sender = KafkaSender.create(senderOptions.maxInFlight(1024)); } public Mono sendMessages(long liveId, long userId, Flux eventsFlux) { @@ -40,7 +40,7 @@ public class KafkaProducer { "tdlib.event.%d.%d".formatted(userId, liveId), event ), null)) - .flatMap(record -> sender.send(Mono.just(record))) + .transform(sender::send) .doOnError(e -> LOG.error("Send failed", e)) .then(); }