Reduce duplicate code

This commit is contained in:
Andrea Cavalli 2022-01-14 20:04:29 +01:00
parent 2d0ab31fd0
commit 2e21f765ab
4 changed files with 18 additions and 37 deletions

View File

@ -2,17 +2,13 @@ package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.Subscription;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -35,7 +31,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
if (closed) {
return Flux.empty();
}
return kafkaConsumer.consumeMessages(subGroupId, ack).takeUntil(s -> closed);
return kafkaConsumer.consumeMessages(subGroupId).takeUntil(s -> closed);
}
@Override

View File

@ -2,19 +2,14 @@ package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.Subscription;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import reactor.core.Disposable;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -40,7 +35,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
this.eventService = api.getAtomix().getEventService();
this.userId = userId;
clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId)
clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId)
.doOnNext(e -> liveId.set(e.liveId()))
.takeWhile(n -> !closed)
.share();

View File

@ -14,7 +14,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.kafka.receiver.KafkaReceiver;
@ -66,33 +65,24 @@ public class KafkaConsumer {
.doBeforeRetry(s -> LOG.warn("Rebalancing in progress")));
}
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack, long userId, long liveId) {
return consumeMessages(subGroupId, ack, userId).filter(e -> e.liveId() == liveId);
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, long userId, long liveId) {
return consumeMessagesInternal(subGroupId, userId).filter(e -> e.liveId() == liveId);
}
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack, long userId) {
if (ack) {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId)
.receive()
.log("consume-messages", Level.FINEST, SignalType.REQUEST)
.doOnNext(result -> result.receiverOffset().acknowledge())
.map(ConsumerRecord::value)
.transform(this::retryIfCleanup);
} else {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId).receive().map(ConsumerRecord::value);
}
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, long userId) {
return consumeMessagesInternal(subGroupId, userId);
}
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId, boolean ack) {
if (ack) {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null)
.receive()
.log("consume-messages", Level.FINEST, SignalType.REQUEST)
.doOnNext(result -> result.receiverOffset().acknowledge())
.map(ConsumerRecord::value)
.transform(this::retryIfCleanup);
} else {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null).receive().map(ConsumerRecord::value);
}
public Flux<ClientBoundEvent> consumeMessages(@NotNull String subGroupId) {
return consumeMessagesInternal(subGroupId, null);
}
private Flux<ClientBoundEvent> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId)
.receive()
.log("consume-messages", Level.FINEST, SignalType.REQUEST)
.doOnNext(result -> result.receiverOffset().acknowledge())
.map(ConsumerRecord::value)
.transform(this::retryIfCleanup);
}
}

View File

@ -37,7 +37,7 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
this.eventService = atomix.getEventService();
this.liveId = liveId;
this.userId = userId;
this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId, liveId).share();
this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, userId, liveId).share();
}
@Override