From aa887ba9540cc10d85ba5cf9c4bedf818bb66fa9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 23 Jan 2022 21:57:43 +0100 Subject: [PATCH] Highly optimize topic name --- .../java/it/tdlight/reactiveapi/KafkaProducer.java | 4 ++-- .../tdlight/reactiveapi/ReactiveApiPublisher.java | 4 +++- .../java/it/tdlight/reactiveapi/UserTopic.java | 14 ++++++++++++++ 3 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/UserTopic.java diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index 64a8f57..10351cf 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -34,10 +34,10 @@ public class KafkaProducer { sender = KafkaSender.create(senderOptions.maxInFlight(1024)); } - public Mono sendMessages(long userId, Flux eventsFlux) { + public Mono sendMessages(UserTopic userId, Flux eventsFlux) { return eventsFlux .>map(event -> SenderRecord.create(new ProducerRecord<>( - "tdlib.event.%d".formatted(userId), + userId.getTopic(), event ), null)) .transform(sender::send) diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index b31deb5..b83eff2 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -75,6 +75,7 @@ public abstract class ReactiveApiPublisher { private final AtomicReference state = new AtomicReference<>(new State(LOGGED_OUT)); protected final long userId; + protected final UserTopic userTopic; protected final long liveId; private final String dynamicIdResolveSubject; @@ -90,6 +91,7 @@ public abstract class ReactiveApiPublisher { this.eventService = atomix.getEventService(); this.resultingEventTransformerSet = resultingEventTransformerSet; this.userId = userId; + this.userTopic = new UserTopic(userId); this.liveId = liveId; this.dynamicIdResolveSubject = SubjectNaming.getDynamicIdResolveSubject(userId); this.rawTelegramClient = ClientManager.createReactive(); @@ -197,7 +199,7 @@ public abstract class ReactiveApiPublisher { // Buffer requests to avoid halting the event loop .onBackpressureBuffer(); - kafkaProducer.sendMessages(userId, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe(); + kafkaProducer.sendMessages(userTopic, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe(); publishedResultingEvents // Obtain only cluster-bound events diff --git a/src/main/java/it/tdlight/reactiveapi/UserTopic.java b/src/main/java/it/tdlight/reactiveapi/UserTopic.java new file mode 100644 index 0000000..4746657 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/UserTopic.java @@ -0,0 +1,14 @@ +package it.tdlight.reactiveapi; + +public class UserTopic { + + private final String value; + + public UserTopic(long userId) { + value = "tdlib.event.%d".formatted(userId); + } + + public String getTopic() { + return value; + } +}