diff --git a/pom.xml b/pom.xml
index 8a75918..0b902dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -152,6 +152,11 @@
log4j-core
2.17.1
+
+ com.lmax
+ disruptor
+ 3.4.4
+
com.fasterxml.jackson.dataformat
jackson-dataformat-yaml
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
index 2735e5b..b666d93 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
@@ -478,18 +478,18 @@ public class AtomixReactiveApi implements ReactiveApi {
}
@Override
- public ReactiveApiClient dynamicClient(long userId) {
- return new DynamicAtomixReactiveApiClient(this, kafkaConsumer, userId);
+ public ReactiveApiClient dynamicClient(String subGroupId, long userId) {
+ return new DynamicAtomixReactiveApiClient(this, kafkaConsumer, userId, subGroupId);
}
@Override
- public ReactiveApiClient liveClient(long liveId, long userId) {
- return new LiveAtomixReactiveApiClient(atomix, kafkaConsumer, liveId, userId);
+ public ReactiveApiClient liveClient(String subGroupId, long liveId, long userId) {
+ return new LiveAtomixReactiveApiClient(atomix, kafkaConsumer, liveId, userId, subGroupId);
}
@Override
- public ReactiveApiMultiClient multiClient() {
- return new AtomixReactiveApiMultiClient(this, kafkaConsumer);
+ public ReactiveApiMultiClient multiClient(String subGroupId) {
+ return new AtomixReactiveApiMultiClient(this, kafkaConsumer, subGroupId);
}
@Override
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java
index 7ff1979..2346a20 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java
@@ -19,14 +19,15 @@ import reactor.core.scheduler.Schedulers;
public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable {
private final ClusterEventService eventService;
-
private final KafkaConsumer kafkaConsumer;
+ private final String subGroupId;
private volatile boolean closed = false;
- AtomixReactiveApiMultiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer) {
+ AtomixReactiveApiMultiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, String subGroupId) {
this.eventService = api.getAtomix().getEventService();
this.kafkaConsumer = kafkaConsumer;
+ this.subGroupId = subGroupId;
}
@Override
@@ -34,7 +35,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
if (closed) {
return Flux.empty();
}
- return kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), ack).takeUntil(s -> closed);
+ return kafkaConsumer.consumeMessages(subGroupId, ack).takeUntil(s -> closed);
}
@Override
diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java
index c183393..0d43365 100644
--- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java
@@ -33,12 +33,12 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
private final Flux liveIdChange;
private final Mono liveIdResolution;
- DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId) {
+ DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId, String subGroupId) {
this.api = api;
this.eventService = api.getAtomix().getEventService();
this.userId = userId;
- clientBoundEvents = kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), true, userId)
+ clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId)
.doOnNext(e -> liveId.set(e.liveId()))
.share();
@@ -107,6 +107,11 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
return userId;
}
+ @Override
+ public boolean isPullMode() {
+ return true;
+ }
+
public Flux liveIdChange() {
return liveIdChange;
}
diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java
index 6b19d51..2f60ef2 100644
--- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java
+++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java
@@ -19,6 +19,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,8 +85,9 @@ public class Entrypoint {
if (instanceSettings.clientAddress == null) {
throw new IllegalArgumentException("A client instance must have an address (host:port)");
}
+ var randomizedClientId = instanceSettings.id + "-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
var address = Address.fromString(instanceSettings.clientAddress);
- atomixBuilder.withMemberId(instanceSettings.id).withHost(address.host()).withPort(address.port());
+ atomixBuilder.withMemberId(randomizedClientId).withHost(address.host()).withPort(address.port());
nodeId = null;
resultingEventTransformerSet = Set.of();
} else {
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java
index ab77cd9..e3c5307 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java
+++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java
@@ -4,18 +4,22 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
+import java.util.logging.Level;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.SignalType;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
+import reactor.util.retry.Retry;
public class KafkaConsumer {
@@ -36,10 +40,16 @@ public class KafkaConsumer {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class);
+ props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis());
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) Duration.ofMinutes(5).toMillis());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ReceiverOptions receiverOptions = ReceiverOptions
.create(props)
- .commitInterval(Duration.ofSeconds(10));
+ .commitInterval(Duration.ofSeconds(10))
+ .commitBatchSize(64)
+ .pollTimeout(Duration.ofMinutes(2))
+ .maxCommitAttempts(100)
+ .maxDeferredCommits(100);
Pattern pattern;
if (liveId == null && userId == null) {
pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+");
@@ -57,37 +67,50 @@ public class KafkaConsumer {
return KafkaReceiver.create(options);
}
- public Flux consumeMessages(@NotNull String groupId, boolean ack, long userId, long liveId) {
+ private Flux retryIfCleanup(Flux clientBoundEventFlux) {
+ return clientBoundEventFlux.retryWhen(Retry
+ .backoff(Long.MAX_VALUE, Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(5))
+ .filter(ex -> ex instanceof RebalanceInProgressException)
+ .doBeforeRetry(s -> LOG.warn("Rebalancing in progress")));
+ }
+
+ public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId, long liveId) {
if (ack) {
- return createReceiver(groupId, liveId, userId)
- .receiveAutoAck()
- .flatMapSequential(a -> a)
- .map(ConsumerRecord::value);
+ return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId)
+ .receive()
+ .log("consume-messages", Level.FINEST, SignalType.REQUEST)
+ .doOnNext(result -> result.receiverOffset().acknowledge())
+ .map(ConsumerRecord::value)
+ .transform(this::retryIfCleanup);
} else {
- return createReceiver(groupId, liveId, userId).receive().map(ConsumerRecord::value);
+ return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, liveId, userId).receive().map(ConsumerRecord::value);
}
}
- public Flux consumeMessages(@NotNull String groupId, boolean ack, long userId) {
+ public Flux consumeMessages(@NotNull String subGroupId, boolean ack, long userId) {
if (ack) {
- return createReceiver(groupId, null, userId)
- .receiveAutoAck()
- .flatMapSequential(a -> a)
- .map(ConsumerRecord::value);
+ return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId)
+ .receive()
+ .log("consume-messages", Level.FINEST, SignalType.REQUEST)
+ .doOnNext(result -> result.receiverOffset().acknowledge())
+ .map(ConsumerRecord::value)
+ .transform(this::retryIfCleanup);
} else {
- return createReceiver(groupId, null, userId).receive().map(ConsumerRecord::value);
+ return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, userId).receive().map(ConsumerRecord::value);
}
}
- public Flux consumeMessages(@NotNull String groupId, boolean ack) {
+ public Flux consumeMessages(@NotNull String subGroupId, boolean ack) {
if (ack) {
- return createReceiver(groupId, null, null).receiveAutoAck().flatMapSequential(a -> a).map(ConsumerRecord::value);
+ return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null)
+ .receive()
+ .log("consume-messages", Level.FINEST, SignalType.REQUEST)
+ .doOnNext(result -> result.receiverOffset().acknowledge())
+ .map(ConsumerRecord::value)
+ .transform(this::retryIfCleanup);
} else {
- return createReceiver(groupId, null, null).receive().map(ConsumerRecord::value);
+ return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, null, null).receive().map(ConsumerRecord::value);
}
}
-
- public String newRandomGroupId() {
- return UUID.randomUUID().toString();
- }
}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java
index abb3497..08d4e44 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java
+++ b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java
@@ -2,9 +2,9 @@ package it.tdlight.reactiveapi;
import java.util.stream.Collectors;
-public record KafkaParameters(String clientId, String bootstrapServers) {
+public record KafkaParameters(String groupId, String clientId, String bootstrapServers) {
public KafkaParameters(ClusterSettings clusterSettings, String clientId) {
- this(clientId, String.join(",", clusterSettings.kafkaBootstrapServers));
+ this(clientId, clientId, String.join(",", clusterSettings.kafkaBootstrapServers));
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
index a99466b..748755d 100644
--- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
@@ -33,11 +33,11 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
private final Flux clientBoundEvents;
- LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, long liveId, long userId) {
+ LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, long liveId, long userId, String subGroupId) {
this.eventService = atomix.getEventService();
this.liveId = liveId;
this.userId = userId;
- this.clientBoundEvents = kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), true, userId, liveId).share();
+ this.clientBoundEvents = kafkaConsumer.consumeMessages(subGroupId, true, userId, liveId).share();
}
@Override
@@ -70,6 +70,11 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
return userId;
}
+ @Override
+ public boolean isPullMode() {
+ return true;
+ }
+
static TdApi.Object deserializeResponse(byte[] bytes) {
try {
return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
diff --git a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java
index 5461f32..979bdef 100644
--- a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java
+++ b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java
@@ -46,7 +46,7 @@ public class PeriodicRestarter {
this.api = api;
this.interval = interval;
- this.multiClient = api.multiClient();
+ this.multiClient = api.multiClient("periodic-restarter");
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
index fcd2266..f207558 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java
@@ -27,11 +27,11 @@ public interface ReactiveApi {
*/
Mono resolveUserLiveId(long userId);
- ReactiveApiMultiClient multiClient();
+ ReactiveApiMultiClient multiClient(String subGroupId);
- ReactiveApiClient dynamicClient(long userId);
+ ReactiveApiClient dynamicClient(String subGroupId, long userId);
- ReactiveApiClient liveClient(long liveId, long userId);
+ ReactiveApiClient liveClient(String subGroupId, long liveId, long userId);
Mono close();
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java
index 93f7dc8..e92cbf0 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java
@@ -13,4 +13,6 @@ public interface ReactiveApiClient {
Mono request(TdApi.Function request, Instant timeout);
long getUserId();
+
+ boolean isPullMode();
}