From fd658e5e6cb6e8a4760b6fc4ddd016b91ba0b380 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 25 Jul 2022 22:10:24 +0200 Subject: [PATCH] Retry commit failures --- .../it/tdlight/reactiveapi/KafkaConsumer.java | 19 ++++++++++++++++++- .../LiveAtomixReactiveApiClient.java | 2 ++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 53fe8d2..3590ae7 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -9,6 +9,7 @@ import java.util.HashMap; import java.util.Map; import java.util.logging.Level; import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.record.TimestampType; @@ -73,10 +74,25 @@ public abstract class KafkaConsumer { return eventFlux.retryWhen(Retry .backoff(Long.MAX_VALUE, Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(5)) + .transientErrors(true) .filter(ex -> ex instanceof RebalanceInProgressException) .doBeforeRetry(s -> LOG.warn("Rebalancing in progress"))); } + protected Flux> retryIfCommitFailed(Flux> eventFlux) { + return eventFlux.retryWhen(Retry + .backoff(10, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(5)) + .transientErrors(true) + .filter(ex -> ex instanceof CommitFailedException) + .doBeforeRetry(s -> LOG.warn("Commit cannot be completed since the group has already rebalanced" + + " and assigned the partitions to another member. This means that the time between subsequent" + + " calls to poll() was longer than the configured max.poll.interval.ms, which typically implies" + + " that the poll loop is spending too much time message processing. You can address this either" + + " by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll()" + + " with max.poll.records."))); + } + public Flux> consumeMessages(@NotNull String subGroupId, long userId) { return consumeMessagesInternal(subGroupId, userId); } @@ -103,6 +119,7 @@ public abstract class KafkaConsumer { return new Timestamped<>(1, record.value()); } }) - .transform(this::retryIfCleanup); + .transform(this::retryIfCleanup) + .transform(this::retryIfCommitFailed); } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 85cf047..6702530 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -1,8 +1,10 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.time.Duration; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {