From dfc393c953644d478d5895d4cc140f70d938c325 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 22 Sep 2022 03:01:59 +0200 Subject: [PATCH] Retries --- src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index a96013b..47bcc41 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -129,9 +129,8 @@ public abstract class KafkaConsumer { return new Timestamped<>(1, record.value()); } }) - .transform(ReactorUtils::subscribeOnce); - //todo: check if they must be re-enabled - //.transform(this::retryIfCleanup) - //.transform(this::retryIfCommitFailed); + .transform(ReactorUtils::subscribeOnce) + .transform(this::retryIfCleanup) + .transform(this::retryIfCommitFailed); } }