diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 58c9822..915ca86 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -31,9 +31,7 @@ public class KafkaConsumer { this.kafkaParameters = kafkaParameters; } - public KafkaReceiver createReceiver(@NotNull String groupId, - @Nullable Long liveId, - @Nullable Long userId) { + public KafkaReceiver createReceiver(@NotNull String groupId, @Nullable Long userId) { Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); @@ -48,14 +46,10 @@ public class KafkaConsumer { .maxCommitAttempts(100) .maxDeferredCommits(100); Pattern pattern; - if (liveId == null && userId == null) { - pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+"); - } else if (liveId == null) { - pattern = Pattern.compile("tdlib\\.event\\." + userId + "\\.[0-9]+"); - } else if (userId == null) { - pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\." + liveId); + if (userId == null) { + pattern = Pattern.compile("tdlib\\.event\\.[0-9]+"); } else { - pattern = Pattern.compile("tdlib\\.event\\." + userId + "\\." + liveId); + pattern = Pattern.compile("tdlib\\.event\\." + userId); } ReceiverOptions options = receiverOptions .subscription(pattern) @@ -73,41 +67,32 @@ public class KafkaConsumer { } public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId, long liveId) { - if (ack) { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId) - .receive() - .log("consume-messages", Level.FINEST, SignalType.REQUEST) - .doOnNext(result -> result.receiverOffset().acknowledge()) - .map(ConsumerRecord::value) - .transform(this::retryIfCleanup); - } else { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId).receive().map(ConsumerRecord::value); - } + return consumeMessages(subGroupId, ack, userId).filter(e -> e.liveId() == liveId); } public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId) { if (ack) { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId) + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) .receive() .log("consume-messages", Level.FINEST, SignalType.REQUEST) .doOnNext(result -> result.receiverOffset().acknowledge()) .map(ConsumerRecord::value) .transform(this::retryIfCleanup); } else { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId).receive().map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId).receive().map(ConsumerRecord::value); } } public Flux consumeMessages(@NotNull String subGroupId, boolean ack) { if (ack) { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null) + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null) .receive() .log("consume-messages", Level.FINEST, SignalType.REQUEST) .doOnNext(result -> result.receiverOffset().acknowledge()) .map(ConsumerRecord::value) .transform(this::retryIfCleanup); } else { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null).receive().map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null).receive().map(ConsumerRecord::value); } } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index 7521744..64a8f57 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -34,10 +34,10 @@ public class KafkaProducer { sender = KafkaSender.create(senderOptions.maxInFlight(1024)); } - public Mono sendMessages(long liveId, long userId, Flux eventsFlux) { + public Mono sendMessages(long userId, Flux eventsFlux) { return eventsFlux .>map(event -> SenderRecord.create(new ProducerRecord<>( - "tdlib.event.%d.%d".formatted(userId, liveId), + "tdlib.event.%d".formatted(userId), event ), null)) .transform(sender::send) diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 91bcbf7..7c53b4f 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -189,7 +189,7 @@ public abstract class ReactiveApiPublisher { // Buffer requests to avoid halting the event loop .onBackpressureBuffer(); - kafkaProducer.sendMessages(liveId, userId, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe(); + kafkaProducer.sendMessages(userId, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe(); publishedResultingEvents // Obtain only cluster-bound events