Configure kafka

This commit is contained in:
Andrea Cavalli 2022-01-13 11:20:44 +01:00
parent 006974ba23
commit a140e7a2b1
2 changed files with 6 additions and 4 deletions

View File

@ -1,6 +1,7 @@
package it.tdlight.reactiveapi; package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import java.time.Duration;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -36,8 +37,9 @@ public class KafkaConsumer {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions<Integer, ClientBoundEvent> receiverOptions = ReceiverOptions.create(props); ReceiverOptions<Integer, ClientBoundEvent> receiverOptions = ReceiverOptions
ReceiverOptions.create(receiverOptions.consumerProperties()); .<Integer, ClientBoundEvent>create(props)
.commitInterval(Duration.ofSeconds(10));
Pattern pattern; Pattern pattern;
if (liveId == null && userId == null) { if (liveId == null && userId == null) {
pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+"); pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+");

View File

@ -31,7 +31,7 @@ public class KafkaProducer {
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ClientBoundEventSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ClientBoundEventSerializer.class);
SenderOptions<Integer, ClientBoundEvent> senderOptions = SenderOptions.create(props); SenderOptions<Integer, ClientBoundEvent> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions); sender = KafkaSender.create(senderOptions.maxInFlight(1024));
} }
public Mono<Void> sendMessages(long liveId, long userId, Flux<ClientBoundEvent> eventsFlux) { public Mono<Void> sendMessages(long liveId, long userId, Flux<ClientBoundEvent> eventsFlux) {
@ -40,7 +40,7 @@ public class KafkaProducer {
"tdlib.event.%d.%d".formatted(userId, liveId), "tdlib.event.%d.%d".formatted(userId, liveId),
event event
), null)) ), null))
.flatMap(record -> sender.send(Mono.just(record))) .transform(sender::send)
.doOnError(e -> LOG.error("Send failed", e)) .doOnError(e -> LOG.error("Send failed", e))
.then(); .then();
} }