Fix resubscription failures
This commit is contained in:
parent
2b2e690da4
commit
116e082d56
|
@ -120,8 +120,8 @@ public abstract class KafkaConsumer<K> {
|
||||||
return new Timestamped<>(1, record.value());
|
return new Timestamped<>(1, record.value());
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.transform(ReactorUtils::subscribeOnce)
|
|
||||||
.transform(this::retryIfCleanup)
|
.transform(this::retryIfCleanup)
|
||||||
.transform(this::retryIfCommitFailed);
|
.transform(this::retryIfCommitFailed)
|
||||||
|
.transform(ReactorUtils::subscribeOnce);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue