diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index 2346a20..9a0b34c 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -2,17 +2,13 @@ package it.tdlight.reactiveapi; import io.atomix.cluster.messaging.ClusterEventService; import io.atomix.cluster.messaging.MessagingException; -import io.atomix.cluster.messaging.Subscription; import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.Request; import java.time.Duration; import java.time.Instant; -import java.util.List; import java.util.concurrent.CompletableFuture; -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -35,7 +31,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut if (closed) { return Flux.empty(); } - return kafkaConsumer.consumeMessages(subGroupId, ack).takeUntil(s -> closed); + return kafkaConsumer.consumeMessages(subGroupId).takeUntil(s -> closed); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index 6d00853..08c9862 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -2,19 +2,14 @@ package it.tdlight.reactiveapi; import io.atomix.cluster.messaging.ClusterEventService; import io.atomix.cluster.messaging.MessagingException; -import io.atomix.cluster.messaging.Subscription; import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.Request; import java.time.Duration; import java.time.Instant; -import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; import reactor.core.Disposable; -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -40,7 +35,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl this.eventService = api.getAtomix().getEventService(); this.userId = userId; - clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId) + clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId) .doOnNext(e -> liveId.set(e.liveId())) .takeWhile(n -> !closed) .share(); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 915ca86..b004122 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -14,7 +14,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.SignalType; import reactor.kafka.receiver.KafkaReceiver; @@ -66,33 +65,24 @@ public class KafkaConsumer { .doBeforeRetry(s -> LOG.warn("Rebalancing in progress"))); } - public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId, long liveId) { - return consumeMessages(subGroupId, ack, userId).filter(e -> e.liveId() == liveId); + public Flux consumeMessages(@NotNull String subGroupId, long userId, long liveId) { + return consumeMessagesInternal(subGroupId, userId).filter(e -> e.liveId() == liveId); } - public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId) { - if (ack) { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) - .receive() - .log("consume-messages", Level.FINEST, SignalType.REQUEST) - .doOnNext(result -> result.receiverOffset().acknowledge()) - .map(ConsumerRecord::value) - .transform(this::retryIfCleanup); - } else { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId).receive().map(ConsumerRecord::value); - } + public Flux consumeMessages(@NotNull String subGroupId, long userId) { + return consumeMessagesInternal(subGroupId, userId); } - public Flux consumeMessages(@NotNull String subGroupId, boolean ack) { - if (ack) { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null) - .receive() - .log("consume-messages", Level.FINEST, SignalType.REQUEST) - .doOnNext(result -> result.receiverOffset().acknowledge()) - .map(ConsumerRecord::value) - .transform(this::retryIfCleanup); - } else { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null).receive().map(ConsumerRecord::value); - } + public Flux consumeMessages(@NotNull String subGroupId) { + return consumeMessagesInternal(subGroupId, null); + } + + private Flux consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) + .receive() + .log("consume-messages", Level.FINEST, SignalType.REQUEST) + .doOnNext(result -> result.receiverOffset().acknowledge()) + .map(ConsumerRecord::value) + .transform(this::retryIfCleanup); } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index a3a5ce9..e044ff4 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -37,7 +37,7 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { this.eventService = atomix.getEventService(); this.liveId = liveId; this.userId = userId; - this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId, liveId).share(); + this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId, liveId).share(); } @Override