tdlib-session-container/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java

128 lines
5.2 KiB
Java
Raw Normal View History

2022-01-13 01:59:26 +01:00
package it.tdlight.reactiveapi;
2022-06-27 00:06:53 +02:00
import static java.lang.Math.toIntExact;
2022-03-21 01:08:12 +01:00
import it.tdlight.common.Init;
import it.tdlight.common.utils.CantLoadLibrary;
2022-01-13 11:20:44 +01:00
import java.time.Duration;
2022-01-13 01:59:26 +01:00
import java.util.HashMap;
import java.util.Map;
2022-01-13 16:19:10 +01:00
import java.util.logging.Level;
2022-01-13 01:59:26 +01:00
import java.util.regex.Pattern;
2022-07-25 22:10:24 +02:00
import org.apache.kafka.clients.consumer.CommitFailedException;
2022-01-13 01:59:26 +01:00
import org.apache.kafka.clients.consumer.ConsumerConfig;
2022-01-13 16:19:10 +01:00
import org.apache.kafka.common.errors.RebalanceInProgressException;
2022-01-22 17:45:56 +01:00
import org.apache.kafka.common.record.TimestampType;
2022-01-13 01:59:26 +01:00
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
2022-01-13 16:19:10 +01:00
import reactor.core.publisher.SignalType;
2022-01-13 01:59:26 +01:00
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
2022-01-13 16:19:10 +01:00
import reactor.util.retry.Retry;
2022-01-13 01:59:26 +01:00
2022-06-27 00:06:53 +02:00
public abstract class KafkaConsumer<K> {
2022-01-13 01:59:26 +01:00
private static final Logger LOG = LogManager.getLogger(KafkaConsumer.class);
private final KafkaParameters kafkaParameters;
public KafkaConsumer(KafkaParameters kafkaParameters) {
this.kafkaParameters = kafkaParameters;
}
2022-06-27 00:06:53 +02:00
public KafkaReceiver<Integer, K> createReceiver(@NotNull String groupId, @Nullable Long userId) {
2022-03-21 01:08:12 +01:00
try {
Init.start();
} catch (CantLoadLibrary e) {
LOG.error("Can't load TDLight library", e);
throw new RuntimeException(e);
}
2022-01-13 01:59:26 +01:00
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers());
2022-03-19 00:06:30 +01:00
props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId() + (userId != null ? ("_" + userId) : ""));
2022-01-13 01:59:26 +01:00
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
2022-06-27 00:06:53 +02:00
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getChannelName().getDeserializerClass());
2022-01-14 16:33:54 +01:00
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
2022-06-27 00:06:53 +02:00
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, toIntExact(Duration.ofMinutes(5).toMillis()));
2022-09-10 13:36:38 +02:00
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576");
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100");
2022-06-27 00:06:53 +02:00
ReceiverOptions<Integer, K> receiverOptions = ReceiverOptions
.<Integer, K>create(props)
2022-01-13 16:19:10 +01:00
.commitInterval(Duration.ofSeconds(10))
2022-07-28 23:40:26 +02:00
.commitBatchSize(65535)
2022-01-13 16:19:10 +01:00
.maxCommitAttempts(100)
.maxDeferredCommits(100);
2022-01-13 01:59:26 +01:00
Pattern pattern;
2022-01-14 19:32:33 +01:00
if (userId == null) {
2022-06-27 00:06:53 +02:00
pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\.\\d+");
2022-01-13 01:59:26 +01:00
} else {
2022-06-27 00:06:53 +02:00
pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\." + userId);
2022-01-13 01:59:26 +01:00
}
2022-06-27 00:06:53 +02:00
ReceiverOptions<Integer, K> options = receiverOptions
2022-01-13 01:59:26 +01:00
.subscription(pattern)
.addAssignListener(partitions -> LOG.debug("onPartitionsAssigned {}", partitions))
.addRevokeListener(partitions -> LOG.debug("onPartitionsRevoked {}", partitions));
return KafkaReceiver.create(options);
}
2022-06-27 00:06:53 +02:00
public abstract KafkaChannelName getChannelName();
protected Flux<Timestamped<K>> retryIfCleanup(Flux<Timestamped<K>> eventFlux) {
return eventFlux.retryWhen(Retry
2022-01-13 16:19:10 +01:00
.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
2022-07-25 22:10:24 +02:00
.transientErrors(true)
2022-01-13 16:19:10 +01:00
.filter(ex -> ex instanceof RebalanceInProgressException)
.doBeforeRetry(s -> LOG.warn("Rebalancing in progress")));
}
2022-07-25 22:10:24 +02:00
protected Flux<Timestamped<K>> retryIfCommitFailed(Flux<Timestamped<K>> eventFlux) {
return eventFlux.retryWhen(Retry
.backoff(10, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(5))
.transientErrors(true)
.filter(ex -> ex instanceof CommitFailedException)
.doBeforeRetry(s -> LOG.warn("Commit cannot be completed since the group has already rebalanced"
+ " and assigned the partitions to another member. This means that the time between subsequent"
+ " calls to poll() was longer than the configured max.poll.interval.ms, which typically implies"
+ " that the poll loop is spending too much time message processing. You can address this either"
+ " by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll()"
+ " with max.poll.records.")));
}
2022-06-27 00:06:53 +02:00
public Flux<Timestamped<K>> consumeMessages(@NotNull String subGroupId, long userId) {
2022-01-14 20:04:29 +01:00
return consumeMessagesInternal(subGroupId, userId);
2022-01-13 01:59:26 +01:00
}
2022-06-27 00:06:53 +02:00
public Flux<Timestamped<K>> consumeMessages(@NotNull String subGroupId) {
2022-01-14 20:04:29 +01:00
return consumeMessagesInternal(subGroupId, null);
}
2022-06-27 00:06:53 +02:00
private Flux<Timestamped<K>> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) {
2022-01-14 20:04:29 +01:00
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId)
.receive()
2022-03-21 01:08:12 +01:00
.log("consume-messages" + (userId != null ? "-" + userId : ""),
Level.FINEST,
SignalType.REQUEST,
SignalType.ON_NEXT,
SignalType.ON_ERROR,
SignalType.ON_COMPLETE
)
2022-01-14 20:04:29 +01:00
.doOnNext(result -> result.receiverOffset().acknowledge())
2022-01-22 17:45:56 +01:00
.map(record -> {
if (record.timestampType() == TimestampType.CREATE_TIME) {
2022-06-27 00:06:53 +02:00
return new Timestamped<>(record.timestamp(), record.value());
2022-01-22 17:45:56 +01:00
} else {
2022-06-27 00:06:53 +02:00
return new Timestamped<>(1, record.value());
2022-01-22 17:45:56 +01:00
}
})
2022-07-25 22:10:24 +02:00
.transform(this::retryIfCleanup)
.transform(this::retryIfCommitFailed);
2022-01-13 01:59:26 +01:00
}
}