From 3dd6241e2c2d8920335cfd5ada43d2b245fe0252 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 13 Jan 2022 16:19:10 +0100 Subject: [PATCH] Improve kafka grouping --- pom.xml | 5 ++ .../reactiveapi/AtomixReactiveApi.java | 12 ++-- .../AtomixReactiveApiMultiClient.java | 7 +- .../DynamicAtomixReactiveApiClient.java | 9 ++- .../it/tdlight/reactiveapi/Entrypoint.java | 4 +- .../it/tdlight/reactiveapi/KafkaConsumer.java | 65 +++++++++++++------ .../tdlight/reactiveapi/KafkaParameters.java | 4 +- .../LiveAtomixReactiveApiClient.java | 9 ++- .../reactiveapi/PeriodicRestarter.java | 2 +- .../it/tdlight/reactiveapi/ReactiveApi.java | 6 +- .../reactiveapi/ReactiveApiClient.java | 2 + 11 files changed, 84 insertions(+), 41 deletions(-) diff --git a/pom.xml b/pom.xml index 8a75918..0b902dc 100644 --- a/pom.xml +++ b/pom.xml @@ -152,6 +152,11 @@ log4j-core 2.17.1 + + com.lmax + disruptor + 3.4.4 + com.fasterxml.jackson.dataformat jackson-dataformat-yaml diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 2735e5b..b666d93 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -478,18 +478,18 @@ public class AtomixReactiveApi implements ReactiveApi { } @Override - public ReactiveApiClient dynamicClient(long userId) { - return new DynamicAtomixReactiveApiClient(this, kafkaConsumer, userId); + public ReactiveApiClient dynamicClient(String subGroupId, long userId) { + return new DynamicAtomixReactiveApiClient(this, kafkaConsumer, userId, subGroupId); } @Override - public ReactiveApiClient liveClient(long liveId, long userId) { - return new LiveAtomixReactiveApiClient(atomix, kafkaConsumer, liveId, userId); + public ReactiveApiClient liveClient(String subGroupId, long liveId, long userId) { + return new LiveAtomixReactiveApiClient(atomix, kafkaConsumer, liveId, userId, subGroupId); } @Override - public ReactiveApiMultiClient multiClient() { - return new AtomixReactiveApiMultiClient(this, kafkaConsumer); + public ReactiveApiMultiClient multiClient(String subGroupId) { + return new AtomixReactiveApiMultiClient(this, kafkaConsumer, subGroupId); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index 7ff1979..2346a20 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -19,14 +19,15 @@ import reactor.core.scheduler.Schedulers; public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable { private final ClusterEventService eventService; - private final KafkaConsumer kafkaConsumer; + private final String subGroupId; private volatile boolean closed = false; - AtomixReactiveApiMultiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer) { + AtomixReactiveApiMultiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, String subGroupId) { this.eventService = api.getAtomix().getEventService(); this.kafkaConsumer = kafkaConsumer; + this.subGroupId = subGroupId; } @Override @@ -34,7 +35,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut if (closed) { return Flux.empty(); } - return kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), ack).takeUntil(s -> closed); + return kafkaConsumer.consumeMessages(subGroupId, ack).takeUntil(s -> closed); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index c183393..0d43365 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -33,12 +33,12 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl private final Flux liveIdChange; private final Mono liveIdResolution; - DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId) { + DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId, String subGroupId) { this.api = api; this.eventService = api.getAtomix().getEventService(); this.userId = userId; - clientBoundEvents = kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), true, userId) + clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId) .doOnNext(e -> liveId.set(e.liveId())) .share(); @@ -107,6 +107,11 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl return userId; } + @Override + public boolean isPullMode() { + return true; + } + public Flux liveIdChange() { return liveIdChange; } diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java index 6b19d51..2f60ef2 100644 --- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java +++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,8 +85,9 @@ public class Entrypoint { if (instanceSettings.clientAddress == null) { throw new IllegalArgumentException("A client instance must have an address (host:port)"); } + var randomizedClientId = instanceSettings.id + "-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE); var address = Address.fromString(instanceSettings.clientAddress); - atomixBuilder.withMemberId(instanceSettings.id).withHost(address.host()).withPort(address.port()); + atomixBuilder.withMemberId(randomizedClientId).withHost(address.host()).withPort(address.port()); nodeId = null; resultingEventTransformerSet = Set.of(); } else { diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index ab77cd9..e3c5307 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -4,18 +4,22 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; import java.time.Duration; import java.util.HashMap; import java.util.Map; -import java.util.UUID; +import java.util.logging.Level; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.SignalType; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; +import reactor.util.retry.Retry; public class KafkaConsumer { @@ -36,10 +40,16 @@ public class KafkaConsumer { props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis()); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ReceiverOptions receiverOptions = ReceiverOptions .create(props) - .commitInterval(Duration.ofSeconds(10)); + .commitInterval(Duration.ofSeconds(10)) + .commitBatchSize(64) + .pollTimeout(Duration.ofMinutes(2)) + .maxCommitAttempts(100) + .maxDeferredCommits(100); Pattern pattern; if (liveId == null && userId == null) { pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+"); @@ -57,37 +67,50 @@ public class KafkaConsumer { return KafkaReceiver.create(options); } - public Flux consumeMessages(@NotNull String groupId, boolean ack, long userId, long liveId) { + private Flux retryIfCleanup(Flux clientBoundEventFlux) { + return clientBoundEventFlux.retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofMillis(100)) + .maxBackoff(Duration.ofSeconds(5)) + .filter(ex -> ex instanceof RebalanceInProgressException) + .doBeforeRetry(s -> LOG.warn("Rebalancing in progress"))); + } + + public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId, long liveId) { if (ack) { - return createReceiver(groupId, liveId, userId) - .receiveAutoAck() - .flatMapSequential(a -> a) - .map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId) + .receive() + .log("consume-messages", Level.FINEST, SignalType.REQUEST) + .doOnNext(result -> result.receiverOffset().acknowledge()) + .map(ConsumerRecord::value) + .transform(this::retryIfCleanup); } else { - return createReceiver(groupId, liveId, userId).receive().map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId).receive().map(ConsumerRecord::value); } } - public Flux consumeMessages(@NotNull String groupId, boolean ack, long userId) { + public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId) { if (ack) { - return createReceiver(groupId, null, userId) - .receiveAutoAck() - .flatMapSequential(a -> a) - .map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId) + .receive() + .log("consume-messages", Level.FINEST, SignalType.REQUEST) + .doOnNext(result -> result.receiverOffset().acknowledge()) + .map(ConsumerRecord::value) + .transform(this::retryIfCleanup); } else { - return createReceiver(groupId, null, userId).receive().map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId).receive().map(ConsumerRecord::value); } } - public Flux consumeMessages(@NotNull String groupId, boolean ack) { + public Flux consumeMessages(@NotNull String subGroupId, boolean ack) { if (ack) { - return createReceiver(groupId, null, null).receiveAutoAck().flatMapSequential(a -> a).map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null) + .receive() + .log("consume-messages", Level.FINEST, SignalType.REQUEST) + .doOnNext(result -> result.receiverOffset().acknowledge()) + .map(ConsumerRecord::value) + .transform(this::retryIfCleanup); } else { - return createReceiver(groupId, null, null).receive().map(ConsumerRecord::value); + return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null).receive().map(ConsumerRecord::value); } } - - public String newRandomGroupId() { - return UUID.randomUUID().toString(); - } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java index abb3497..08d4e44 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java @@ -2,9 +2,9 @@ package it.tdlight.reactiveapi; import java.util.stream.Collectors; -public record KafkaParameters(String clientId, String bootstrapServers) { +public record KafkaParameters(String groupId, String clientId, String bootstrapServers) { public KafkaParameters(ClusterSettings clusterSettings, String clientId) { - this(clientId, String.join(",", clusterSettings.kafkaBootstrapServers)); + this(clientId, clientId, String.join(",", clusterSettings.kafkaBootstrapServers)); } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index a99466b..748755d 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -33,11 +33,11 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { private final Flux clientBoundEvents; - LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, long liveId, long userId) { + LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, long liveId, long userId, String subGroupId) { this.eventService = atomix.getEventService(); this.liveId = liveId; this.userId = userId; - this.clientBoundEvents = kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), true, userId, liveId).share(); + this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId, liveId).share(); } @Override @@ -70,6 +70,11 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { return userId; } + @Override + public boolean isPullMode() { + return true; + } + static TdApi.Object deserializeResponse(byte[] bytes) { try { return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes))); diff --git a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java index 5461f32..979bdef 100644 --- a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java +++ b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java @@ -46,7 +46,7 @@ public class PeriodicRestarter { this.api = api; this.interval = interval; - this.multiClient = api.multiClient(); + this.multiClient = api.multiClient("periodic-restarter"); } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java index fcd2266..f207558 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java @@ -27,11 +27,11 @@ public interface ReactiveApi { */ Mono resolveUserLiveId(long userId); - ReactiveApiMultiClient multiClient(); + ReactiveApiMultiClient multiClient(String subGroupId); - ReactiveApiClient dynamicClient(long userId); + ReactiveApiClient dynamicClient(String subGroupId, long userId); - ReactiveApiClient liveClient(long liveId, long userId); + ReactiveApiClient liveClient(String subGroupId, long liveId, long userId); Mono close(); } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java index 93f7dc8..e92cbf0 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java @@ -13,4 +13,6 @@ public interface ReactiveApiClient { Mono request(TdApi.Function request, Instant timeout); long getUserId(); + + boolean isPullMode(); }