Do not create infinite topics

This commit is contained in:
Andrea Cavalli 2022-01-14 19:32:33 +01:00
parent 48fbca5fad
commit 2d0ab31fd0
3 changed files with 12 additions and 27 deletions

View File

@ -31,9 +31,7 @@ public class KafkaConsumer {
this.kafkaParameters = kafkaParameters; this.kafkaParameters = kafkaParameters;
} }
public KafkaReceiver<Integer, ClientBoundEvent> createReceiver(@NotNull String groupId, public KafkaReceiver<Integer, ClientBoundEvent> createReceiver(@NotNull String groupId, @Nullable Long userId) {
@Nullable Long liveId,
@Nullable Long userId) {
Map<String, Object> props = new HashMap<>(); Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId());
@ -48,14 +46,10 @@ public class KafkaConsumer {
.maxCommitAttempts(100) .maxCommitAttempts(100)
.maxDeferredCommits(100); .maxDeferredCommits(100);
Pattern pattern; Pattern pattern;
if (liveId == null && userId == null) { if (userId == null) {
pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+"); pattern = Pattern.compile("tdlib\\.event\\.[0-9]+");
} else if (liveId == null) {
pattern = Pattern.compile("tdlib\\.event\\." + userId + "\\.[0-9]+");
} else if (userId == null) {
pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\." + liveId);
} else { } else {
pattern = Pattern.compile("tdlib\\.event\\." + userId + "\\." + liveId); pattern = Pattern.compile("tdlib\\.event\\." + userId);
} }
ReceiverOptions<Integer, ClientBoundEvent> options = receiverOptions ReceiverOptions<Integer, ClientBoundEvent> options = receiverOptions
.subscription(pattern) .subscription(pattern)
@ -73,41 +67,32 @@ public class KafkaConsumer {
} }
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack, long userId, long liveId) { public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack, long userId, long liveId) {
if (ack) { return consumeMessages(subGroupId, ack, userId).filter(e -> e.liveId() == liveId);
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId)
.receive()
.log("consume-messages", Level.FINEST, SignalType.REQUEST)
.doOnNext(result -> result.receiverOffset().acknowledge())
.map(ConsumerRecord::value)
.transform(this::retryIfCleanup);
} else {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId).receive().map(ConsumerRecord::value);
}
} }
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack, long userId) { public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack, long userId) {
if (ack) { if (ack) {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId) return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId)
.receive() .receive()
.log("consume-messages", Level.FINEST, SignalType.REQUEST) .log("consume-messages", Level.FINEST, SignalType.REQUEST)
.doOnNext(result -> result.receiverOffset().acknowledge()) .doOnNext(result -> result.receiverOffset().acknowledge())
.map(ConsumerRecord::value) .map(ConsumerRecord::value)
.transform(this::retryIfCleanup); .transform(this::retryIfCleanup);
} else { } else {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId).receive().map(ConsumerRecord::value); return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId).receive().map(ConsumerRecord::value);
} }
} }
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack) { public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack) {
if (ack) { if (ack) {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null) return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null)
.receive() .receive()
.log("consume-messages", Level.FINEST, SignalType.REQUEST) .log("consume-messages", Level.FINEST, SignalType.REQUEST)
.doOnNext(result -> result.receiverOffset().acknowledge()) .doOnNext(result -> result.receiverOffset().acknowledge())
.map(ConsumerRecord::value) .map(ConsumerRecord::value)
.transform(this::retryIfCleanup); .transform(this::retryIfCleanup);
} else { } else {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null).receive().map(ConsumerRecord::value); return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null).receive().map(ConsumerRecord::value);
} }
} }
} }

View File

@ -34,10 +34,10 @@ public class KafkaProducer {
sender = KafkaSender.create(senderOptions.maxInFlight(1024)); sender = KafkaSender.create(senderOptions.maxInFlight(1024));
} }
public Mono<Void> sendMessages(long liveId, long userId, Flux<ClientBoundEvent> eventsFlux) { public Mono<Void> sendMessages(long userId, Flux<ClientBoundEvent> eventsFlux) {
return eventsFlux return eventsFlux
.<SenderRecord<Integer, ClientBoundEvent, Integer>>map(event -> SenderRecord.create(new ProducerRecord<>( .<SenderRecord<Integer, ClientBoundEvent, Integer>>map(event -> SenderRecord.create(new ProducerRecord<>(
"tdlib.event.%d.%d".formatted(userId, liveId), "tdlib.event.%d".formatted(userId),
event event
), null)) ), null))
.transform(sender::send) .transform(sender::send)

View File

@ -189,7 +189,7 @@ public abstract class ReactiveApiPublisher {
// Buffer requests to avoid halting the event loop // Buffer requests to avoid halting the event loop
.onBackpressureBuffer(); .onBackpressureBuffer();
kafkaProducer.sendMessages(liveId, userId, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe(); kafkaProducer.sendMessages(userId, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe();
publishedResultingEvents publishedResultingEvents
// Obtain only cluster-bound events // Obtain only cluster-bound events