Retry commit failures
This commit is contained in:
parent
3eea163a73
commit
fd658e5e6c
|
@ -9,6 +9,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.logging.Level;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.kafka.clients.consumer.CommitFailedException;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.errors.RebalanceInProgressException;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
|
@ -73,10 +74,25 @@ public abstract class KafkaConsumer<K> {
|
|||
return eventFlux.retryWhen(Retry
|
||||
.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
|
||||
.maxBackoff(Duration.ofSeconds(5))
|
||||
.transientErrors(true)
|
||||
.filter(ex -> ex instanceof RebalanceInProgressException)
|
||||
.doBeforeRetry(s -> LOG.warn("Rebalancing in progress")));
|
||||
}
|
||||
|
||||
protected Flux<Timestamped<K>> retryIfCommitFailed(Flux<Timestamped<K>> eventFlux) {
|
||||
return eventFlux.retryWhen(Retry
|
||||
.backoff(10, Duration.ofSeconds(1))
|
||||
.maxBackoff(Duration.ofSeconds(5))
|
||||
.transientErrors(true)
|
||||
.filter(ex -> ex instanceof CommitFailedException)
|
||||
.doBeforeRetry(s -> LOG.warn("Commit cannot be completed since the group has already rebalanced"
|
||||
+ " and assigned the partitions to another member. This means that the time between subsequent"
|
||||
+ " calls to poll() was longer than the configured max.poll.interval.ms, which typically implies"
|
||||
+ " that the poll loop is spending too much time message processing. You can address this either"
|
||||
+ " by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll()"
|
||||
+ " with max.poll.records.")));
|
||||
}
|
||||
|
||||
public Flux<Timestamped<K>> consumeMessages(@NotNull String subGroupId, long userId) {
|
||||
return consumeMessagesInternal(subGroupId, userId);
|
||||
}
|
||||
|
@ -103,6 +119,7 @@ public abstract class KafkaConsumer<K> {
|
|||
return new Timestamped<>(1, record.value());
|
||||
}
|
||||
})
|
||||
.transform(this::retryIfCleanup);
|
||||
.transform(this::retryIfCleanup)
|
||||
.transform(this::retryIfCommitFailed);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
package it.tdlight.reactiveapi;
|
||||
|
||||
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
||||
import java.time.Duration;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {
|
||||
|
||||
|
|
Loading…
Reference in New Issue