From 83c064220fbc0a882e2601928c2f572b12fd0a20 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 22 Sep 2022 15:46:31 +0200 Subject: [PATCH] Implement lanes --- .../reactiveapi/AtomixReactiveApi.java | 45 ++++++++++++------- src/main/java/it/tdlight/reactiveapi/Cli.java | 14 ++++-- .../tdlight/reactiveapi/ClusterSettings.java | 5 ++- .../reactiveapi/CreateSessionRequest.java | 14 ++---- .../it/tdlight/reactiveapi/DiskSession.java | 7 ++- ...hannelName.java => KafkaChannelCodec.java} | 6 +-- .../reactiveapi/KafkaClientBoundConsumer.java | 20 +++++++-- .../reactiveapi/KafkaClientBoundProducer.java | 19 ++++++-- .../it/tdlight/reactiveapi/KafkaConsumer.java | 35 ++++++--------- .../tdlight/reactiveapi/KafkaParameters.java | 19 ++++++-- .../it/tdlight/reactiveapi/KafkaProducer.java | 19 ++++---- .../reactiveapi/KafkaSharedTdlibClients.java | 20 +++++++-- .../reactiveapi/KafkaSharedTdlibServers.java | 8 ++-- .../KafkaTdlibClientsChannels.java | 3 +- .../KafkaTdlibRequestConsumer.java | 11 +++-- .../KafkaTdlibRequestProducer.java | 9 +++- .../KafkaTdlibResponseConsumer.java | 10 +++-- .../KafkaTdlibResponseProducer.java | 10 +++-- .../KafkaTdlibServersChannels.java | 13 +++++- .../LiveAtomixReactiveApiClient.java | 16 +++++-- .../reactiveapi/ReactiveApiMultiClient.java | 5 ++- .../reactiveapi/ReactiveApiPublisher.java | 28 +++++++----- .../it/tdlight/reactiveapi/UserTopic.java | 40 ----------------- 23 files changed, 221 insertions(+), 155 deletions(-) rename src/main/java/it/tdlight/reactiveapi/{KafkaChannelName.java => KafkaChannelCodec.java} (87%) delete mode 100644 src/main/java/it/tdlight/reactiveapi/UserTopic.java diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 86bff6b..c7d2be8 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.HashMap; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; @@ -22,7 +23,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.scheduler.Schedulers; public class AtomixReactiveApi implements ReactiveApi { @@ -60,14 +60,17 @@ public class AtomixReactiveApi implements ReactiveApi { @Nullable DiskSessionsManager diskSessions, @NotNull Set resultingEventTransformerSet) { this.mode = mode; - var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters); - var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters); - var kafkaClientBoundConsumer = new KafkaClientBoundConsumer(kafkaParameters); - var kafkaTdlibClientsChannels = new KafkaTdlibClientsChannels(kafkaTDLibRequestProducer, - kafkaTDLibResponseConsumer, - kafkaClientBoundConsumer - ); if (mode != AtomixReactiveApiMode.SERVER) { + var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters); + var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters); + var kafkaClientBoundConsumers = new HashMap(); + for (String lane : kafkaParameters.getAllLanes()) { + kafkaClientBoundConsumers.put(lane, new KafkaClientBoundConsumer(kafkaParameters, lane)); + } + var kafkaTdlibClientsChannels = new KafkaTdlibClientsChannels(kafkaTDLibRequestProducer, + kafkaTDLibResponseConsumer, + kafkaClientBoundConsumers + ); this.kafkaSharedTdlibClients = new KafkaSharedTdlibClients(kafkaTdlibClientsChannels); this.client = new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients); } else { @@ -77,10 +80,13 @@ public class AtomixReactiveApi implements ReactiveApi { if (mode != AtomixReactiveApiMode.CLIENT) { var kafkaTDLibRequestConsumer = new KafkaTdlibRequestConsumer(kafkaParameters); var kafkaTDLibResponseProducer = new KafkaTdlibResponseProducer(kafkaParameters); - var kafkaClientBoundProducer = new KafkaClientBoundProducer(kafkaParameters); + var kafkaClientBoundProducers = new HashMap(); + for (String lane : kafkaParameters.getAllLanes()) { + kafkaClientBoundProducers.put(lane, new KafkaClientBoundProducer(kafkaParameters, lane)); + } var kafkaTDLibServer = new KafkaTdlibServersChannels(kafkaTDLibRequestConsumer, kafkaTDLibResponseProducer, - kafkaClientBoundProducer + kafkaClientBoundProducers ); this.kafkaSharedTdlibServers = new KafkaSharedTdlibServers(kafkaTDLibServer); } else { @@ -122,6 +128,7 @@ public class AtomixReactiveApi implements ReactiveApi { return createSession(new LoadSessionFromDiskRequest(id, diskSession.token, diskSession.phoneNumber, + diskSession.lane, true )); }) @@ -152,41 +159,49 @@ public class AtomixReactiveApi implements ReactiveApi { boolean loadedFromDisk; long userId; String botToken; + String lane; Long phoneNumber; if (req instanceof CreateBotSessionRequest createBotSessionRequest) { loadedFromDisk = false; userId = createBotSessionRequest.userId(); botToken = createBotSessionRequest.token(); phoneNumber = null; + lane = createBotSessionRequest.lane(); reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, - botToken + botToken, + lane ); } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) { loadedFromDisk = false; userId = createUserSessionRequest.userId(); botToken = null; phoneNumber = createUserSessionRequest.phoneNumber(); + lane = createUserSessionRequest.lane(); reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, - phoneNumber + phoneNumber, + lane ); } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) { loadedFromDisk = true; userId = loadSessionFromDiskRequest.userId(); botToken = loadSessionFromDiskRequest.token(); phoneNumber = loadSessionFromDiskRequest.phoneNumber(); + lane = loadSessionFromDiskRequest.lane(); if (loadSessionFromDiskRequest.phoneNumber() != null) { reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, - phoneNumber + phoneNumber, + lane ); } else { reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, - botToken + botToken, + lane ); } } else { @@ -231,7 +246,7 @@ public class AtomixReactiveApi implements ReactiveApi { if (!loadedFromDisk) { // Create the disk session configuration - var diskSession = new DiskSession(botToken, phoneNumber); + var diskSession = new DiskSession(botToken, phoneNumber, lane); return Mono.fromCallable(() -> { Objects.requireNonNull(diskSessions); synchronized (diskSessions) { diff --git a/src/main/java/it/tdlight/reactiveapi/Cli.java b/src/main/java/it/tdlight/reactiveapi/Cli.java index d632c51..13fd15b 100644 --- a/src/main/java/it/tdlight/reactiveapi/Cli.java +++ b/src/main/java/it/tdlight/reactiveapi/Cli.java @@ -109,11 +109,17 @@ public class Cli { private static void createSession(ReactiveApi api, String commandArgs) { var parts = commandArgs.split(" "); boolean invalid = false; - if (parts.length == 3) { + if (parts.length == 4 || parts.length == 3) { + String lane; + if (parts.length == 4) { + lane = parts[3]; + } else { + lane = ""; + } CreateSessionRequest request = switch (parts[0]) { - case "bot" -> new CreateBotSessionRequest(Long.parseLong(parts[1]), parts[2]); + case "bot" -> new CreateBotSessionRequest(Long.parseLong(parts[1]), parts[2], lane); case "user" -> new CreateUserSessionRequest(Long.parseLong(parts[1]), - Long.parseLong(parts[2])); + Long.parseLong(parts[2]), lane); default -> { invalid = true; yield null; @@ -129,7 +135,7 @@ public class Cli { invalid = true; } if (invalid) { - LOG.error("Syntax: CreateSession <\"bot\"|\"user\"> "); + LOG.error("Syntax: CreateSession <\"bot\"|\"user\"> [lane]"); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java index 4c0410f..61672e4 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java +++ b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java @@ -11,11 +11,14 @@ public class ClusterSettings { public String id; public List kafkaBootstrapServers; + public List lanes; @JsonCreator public ClusterSettings(@JsonProperty(required = true, value = "id") String id, - @JsonProperty(required = true, value = "kafkaBootstrapServers") List kafkaBootstrapServers) { + @JsonProperty(required = true, value = "kafkaBootstrapServers") List kafkaBootstrapServers, + @JsonProperty(required = true, value = "lanes") List lanes) { this.id = id; this.kafkaBootstrapServers = kafkaBootstrapServers; + this.lanes = lanes; } } diff --git a/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java b/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java index 9ec948d..9a0f6e0 100644 --- a/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java +++ b/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java @@ -1,26 +1,20 @@ package it.tdlight.reactiveapi; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import org.apache.kafka.common.errors.SerializationException; public sealed interface CreateSessionRequest permits CreateUserSessionRequest, CreateBotSessionRequest, LoadSessionFromDiskRequest { long userId(); - record CreateUserSessionRequest(long userId, long phoneNumber) implements CreateSessionRequest {} + record CreateUserSessionRequest(long userId, long phoneNumber, String lane) implements CreateSessionRequest {} - record CreateBotSessionRequest(long userId, String token) implements CreateSessionRequest {} + record CreateBotSessionRequest(long userId, String token, String lane) implements CreateSessionRequest {} - record LoadSessionFromDiskRequest(long userId, String token, Long phoneNumber, boolean createNew) implements - CreateSessionRequest { + record LoadSessionFromDiskRequest(long userId, String token, Long phoneNumber, String lane, + boolean createNew) implements CreateSessionRequest { public LoadSessionFromDiskRequest { if ((token == null) == (phoneNumber == null)) { diff --git a/src/main/java/it/tdlight/reactiveapi/DiskSession.java b/src/main/java/it/tdlight/reactiveapi/DiskSession.java index 70f05b1..36cf567 100644 --- a/src/main/java/it/tdlight/reactiveapi/DiskSession.java +++ b/src/main/java/it/tdlight/reactiveapi/DiskSession.java @@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; import org.jetbrains.annotations.Nullable; @JsonInclude(Include.NON_NULL) @@ -13,12 +14,16 @@ public class DiskSession { public String token; @Nullable public Long phoneNumber; + @Nullable + public String lane; @JsonCreator public DiskSession(@JsonProperty("token") @Nullable String token, - @JsonProperty("phoneNumber") @Nullable Long phoneNumber) { + @JsonProperty("phoneNumber") @Nullable Long phoneNumber, + @JsonProperty("lane") @Nullable String lane) { this.token = token; this.phoneNumber = phoneNumber; + this.lane = Objects.requireNonNullElse(lane, ""); this.validate(); } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaChannelName.java b/src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java similarity index 87% rename from src/main/java/it/tdlight/reactiveapi/KafkaChannelName.java rename to src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java index f560974..53344d4 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaChannelName.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java @@ -1,8 +1,6 @@ package it.tdlight.reactiveapi; -import org.apache.kafka.common.serialization.Serializer; - -public enum KafkaChannelName { +public enum KafkaChannelCodec { CLIENT_BOUND_EVENT("event", ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class), TDLIB_REQUEST("request", TdlibRequestSerializer.class, TdlibRequestDeserializer.class), TDLIB_RESPONSE("response", TdlibResponseSerializer.class, TdlibResponseDeserializer.class); @@ -11,7 +9,7 @@ public enum KafkaChannelName { private final Class serializerClass; private final Class deserializerClass; - KafkaChannelName(String kafkaName, + KafkaChannelCodec(String kafkaName, Class serializerClass, Class deserializerClass) { this.name = kafkaName; diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java index 28a0a40..671b7ba 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java @@ -4,13 +4,27 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; public class KafkaClientBoundConsumer extends KafkaConsumer { - public KafkaClientBoundConsumer(KafkaParameters kafkaParameters) { + private final String lane; + private final String name; + + public KafkaClientBoundConsumer(KafkaParameters kafkaParameters, String lane) { super(kafkaParameters); + this.lane = lane; + if (lane.isBlank()) { + this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName(); + } else { + this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName() + "-" + lane; + } } @Override - public KafkaChannelName getChannelName() { - return KafkaChannelName.CLIENT_BOUND_EVENT; + public KafkaChannelCodec getChannelCodec() { + return KafkaChannelCodec.CLIENT_BOUND_EVENT; + } + + @Override + public String getChannelName() { + return name; } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java index b3bffc7..bc05555 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java @@ -1,16 +1,27 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; -import it.tdlight.reactiveapi.Event.OnRequest; public class KafkaClientBoundProducer extends KafkaProducer { - public KafkaClientBoundProducer(KafkaParameters kafkaParameters) { + private final String name; + + public KafkaClientBoundProducer(KafkaParameters kafkaParameters, String lane) { super(kafkaParameters); + if (lane.isBlank()) { + this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName(); + } else { + this.name = KafkaChannelCodec.CLIENT_BOUND_EVENT.getKafkaName() + "-" + lane; + } } @Override - public KafkaChannelName getChannelName() { - return KafkaChannelName.CLIENT_BOUND_EVENT; + public KafkaChannelCodec getChannelCodec() { + return KafkaChannelCodec.CLIENT_BOUND_EVENT; + } + + @Override + public String getChannelName() { + return name; } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 47bcc41..e86b3e9 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -6,8 +6,8 @@ import it.tdlight.common.Init; import it.tdlight.common.utils.CantLoadLibrary; import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.logging.Level; import java.util.regex.Pattern; @@ -20,7 +20,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.SignalType; import reactor.kafka.receiver.KafkaReceiver; @@ -37,7 +36,7 @@ public abstract class KafkaConsumer { this.kafkaParameters = kafkaParameters; } - public KafkaReceiver createReceiver(@NotNull String groupId, @Nullable Long userId) { + public KafkaReceiver createReceiver(@NotNull String kafkaGroupId) { try { Init.start(); } catch (CantLoadLibrary e) { @@ -46,10 +45,10 @@ public abstract class KafkaConsumer { } Map props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId() + (userId != null ? ("_" + userId) : "")); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getChannelName().getDeserializerClass()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getChannelCodec().getDeserializerClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, toIntExact(Duration.ofMinutes(5).toMillis())); if (!isQuickResponse()) { @@ -63,20 +62,16 @@ public abstract class KafkaConsumer { .commitBatchSize(isQuickResponse() ? 64 : 1024) .maxCommitAttempts(5) .maxDeferredCommits((isQuickResponse() ? 64 : 1024) * 5); - Pattern pattern; - if (userId == null) { - pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\.\\d+"); - } else { - pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\." + userId); - } ReceiverOptions options = receiverOptions - .subscription(pattern) + .subscription(List.of("tdlib." + getChannelName())) .addAssignListener(partitions -> LOG.debug("onPartitionsAssigned {}", partitions)) .addRevokeListener(partitions -> LOG.debug("onPartitionsRevoked {}", partitions)); return KafkaReceiver.create(options); } - public abstract KafkaChannelName getChannelName(); + public abstract KafkaChannelCodec getChannelCodec(); + + public abstract String getChannelName(); public abstract boolean isQuickResponse(); @@ -103,19 +98,15 @@ public abstract class KafkaConsumer { + " with max.poll.records."))); } - public Flux> consumeMessages(@NotNull String subGroupId, long userId) { - return consumeMessagesInternal(subGroupId, userId); - } - public Flux> consumeMessages(@NotNull String subGroupId) { - return consumeMessagesInternal(subGroupId, null); + return consumeMessagesInternal(subGroupId); } - private Flux> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { - return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) + private Flux> consumeMessagesInternal(@NotNull String subGroupId) { + return createReceiver(kafkaParameters.groupId() + (subGroupId.isBlank() ? "" : ("-" + subGroupId))) .receiveAutoAck(isQuickResponse() ? 1 : 4) .concatMap(Function.identity()) - .log("consume-messages" + (userId != null ? "-" + userId : ""), + .log("consume-messages", Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT, diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java index 08d4e44..8ee515f 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java @@ -1,10 +1,23 @@ package it.tdlight.reactiveapi; -import java.util.stream.Collectors; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; -public record KafkaParameters(String groupId, String clientId, String bootstrapServers) { +public record KafkaParameters(String groupId, String clientId, String bootstrapServers, List lanes) { public KafkaParameters(ClusterSettings clusterSettings, String clientId) { - this(clientId, clientId, String.join(",", clusterSettings.kafkaBootstrapServers)); + this(clientId, + clientId, + String.join(",", clusterSettings.kafkaBootstrapServers), + List.copyOf(clusterSettings.lanes) + ); + } + + public Set getAllLanes() { + var lanes = new LinkedHashSet(this.lanes.size() + 1); + lanes.add(""); + lanes.addAll(this.lanes); + return lanes; } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index ce5ee26..970a816 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -1,6 +1,5 @@ package it.tdlight.reactiveapi; -import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.logging.Level; @@ -31,22 +30,22 @@ public abstract class KafkaProducer { props.put(ProducerConfig.LINGER_MS_CONFIG, "20"); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getChannelName().getSerializerClass()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getChannelCodec().getSerializerClass()); SenderOptions senderOptions = SenderOptions.create(props); sender = KafkaSender.create(senderOptions.maxInFlight(1024)); } - public abstract KafkaChannelName getChannelName(); + public abstract KafkaChannelCodec getChannelCodec(); - public Mono sendMessages(long userId, Flux eventsFlux) { - var userTopic = new UserTopic(getChannelName(), userId); + public abstract String getChannelName(); + + public Mono sendMessages(Flux eventsFlux) { + var channelName = getChannelName(); return eventsFlux - .>map(event -> SenderRecord.create(new ProducerRecord<>( - userTopic.getTopic(), - event - ), null)) - .log("produce-messages-" + userTopic, + .>map(event -> + SenderRecord.create(new ProducerRecord<>(channelName, event), null)) + .log("produce-messages-" + channelName, Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT, diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java index 0dbe44e..498afc6 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java @@ -8,9 +8,12 @@ import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnResponse; import java.io.Closeable; import java.time.Duration; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.logging.Level; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.Disposable; @@ -32,16 +35,17 @@ public class KafkaSharedTdlibClients implements Closeable { private final Disposable requestsSub; private final AtomicReference eventsSub = new AtomicReference<>(); private final Flux>> responses; - private final Flux> events; + private final Map>> events; private final Many> requests = Sinks.many().unicast() .onBackpressureBuffer(Queues.>get(65535).get()); public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) { this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses"); - this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler"); + this.events = kafkaTdlibClientsChannels.events().entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages(e.getKey()))); this.requestsSub = kafkaTdlibClientsChannels.request() - .sendMessages(0L, requests.asFlux()) + .sendMessages(requests.asFlux()) .subscribeOn(Schedulers.parallel()) .subscribe(); } @@ -50,7 +54,15 @@ public class KafkaSharedTdlibClients implements Closeable { return responses; } - public Flux> events() { + public Flux> events(String lane) { + var result = events.get(lane); + if (result == null) { + throw new IllegalArgumentException("No lane " + lane); + } + return result; + } + + public Map>> events() { return events; } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java index 4e51f22..67cab9b 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java @@ -34,7 +34,7 @@ public class KafkaSharedTdlibServers implements Closeable { public KafkaSharedTdlibServers(KafkaTdlibServersChannels kafkaTdlibServersChannels) { this.kafkaTdlibServersChannels = kafkaTdlibServersChannels; this.responsesSub = kafkaTdlibServersChannels.response() - .sendMessages(0L, responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) + .sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) .subscribeOn(Schedulers.parallel()) .subscribe(); this.requests = kafkaTdlibServersChannels.request() @@ -47,9 +47,9 @@ public class KafkaSharedTdlibServers implements Closeable { .log("requests", Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT); } - public Disposable events(Flux eventFlux) { - return kafkaTdlibServersChannels.events() - .sendMessages(0L, eventFlux) + public Disposable events(String lane, Flux eventFlux) { + return kafkaTdlibServersChannels.events(lane) + .sendMessages(eventFlux) .subscribeOn(Schedulers.parallel()) .subscribe(); } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java index c09cc80..626db35 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java @@ -1,10 +1,11 @@ package it.tdlight.reactiveapi; import java.io.Closeable; +import java.util.Map; public record KafkaTdlibClientsChannels(KafkaTdlibRequestProducer request, KafkaTdlibResponseConsumer response, - KafkaClientBoundConsumer events) implements Closeable { + Map events) implements Closeable { @Override public void close() { diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java index 8de0b75..4770670 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java @@ -1,9 +1,7 @@ package it.tdlight.reactiveapi; import it.tdlight.jni.TdApi; -import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnRequest; -import it.tdlight.reactiveapi.Event.ServerBoundEvent; public class KafkaTdlibRequestConsumer extends KafkaConsumer> { @@ -12,8 +10,13 @@ public class KafkaTdlibRequestConsumer extends KafkaConsumer> { } @Override - public KafkaChannelName getChannelName() { - return KafkaChannelName.TDLIB_REQUEST; + public KafkaChannelCodec getChannelCodec() { + return KafkaChannelCodec.TDLIB_REQUEST; + } + + @Override + public String getChannelName() { + return getChannelCodec().getKafkaName(); } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java index 4c9814f..abb90d2 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java @@ -1,7 +1,6 @@ package it.tdlight.reactiveapi; import it.tdlight.jni.TdApi; -import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnResponse; public class KafkaTdlibResponseConsumer extends KafkaConsumer> { @@ -11,8 +10,13 @@ public class KafkaTdlibResponseConsumer extends KafkaConsumer> { @@ -11,7 +10,12 @@ public class KafkaTdlibResponseProducer extends KafkaProducer events) implements Closeable { + + public KafkaClientBoundProducer events(String lane) { + var p = events.get(lane); + if (p == null) { + throw new IllegalArgumentException("No lane " + lane); + } + return p; + } @Override public void close() { response.close(); - events.close(); + events.values().forEach(KafkaProducer::close); } } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 8aa2866..b819826 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -1,20 +1,28 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; import reactor.core.publisher.Flux; public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { - private final Flux clientBoundEvents; + private final KafkaSharedTdlibClients kafkaSharedTdlibClients; LiveAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients) { super(kafkaSharedTdlibClients); - this.clientBoundEvents = kafkaSharedTdlibClients.events().map(Timestamped::data); + this.kafkaSharedTdlibClients = kafkaSharedTdlibClients; } @Override - public Flux clientBoundEvents() { - return clientBoundEvents; + public Flux clientBoundEvents(String lane) { + return kafkaSharedTdlibClients.events(lane).map(Timestamped::data); } + @Override + public Map> clientBoundEvents() { + return kafkaSharedTdlibClients.events().entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().map(Timestamped::data))); + } } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java index 5c60172..fc8715b 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java @@ -5,12 +5,15 @@ import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import java.time.Instant; +import java.util.Map; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface ReactiveApiMultiClient { - Flux clientBoundEvents(); + Flux clientBoundEvents(String lane); + + Map> clientBoundEvents(); Mono request(long userId, TdApi.Function request, Instant timeout); diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 640e750..51ed63e 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -46,6 +46,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicReference; @@ -77,6 +78,7 @@ public abstract class ReactiveApiPublisher { private final AtomicReference state = new AtomicReference<>(new State(LOGGED_OUT)); protected final long userId; + protected final String lane; private final Many> responses; @@ -85,10 +87,11 @@ public abstract class ReactiveApiPublisher { private ReactiveApiPublisher(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, - long userId) { + long userId, String lane) { this.kafkaSharedTdlibServers = kafkaSharedTdlibServers; this.resultingEventTransformerSet = resultingEventTransformerSet; this.userId = userId; + this.lane = Objects.requireNonNull(lane); this.responses = this.kafkaSharedTdlibServers.responses(); this.rawTelegramClient = ClientManager.createReactive(); try { @@ -114,18 +117,21 @@ public abstract class ReactiveApiPublisher { public static ReactiveApiPublisher fromToken(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, - String token) { - return new ReactiveApiPublisherToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, token); + String token, + String lane) { + return new ReactiveApiPublisherToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, token, lane); } public static ReactiveApiPublisher fromPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, - long phoneNumber) { + long phoneNumber, + String lane) { return new ReactiveApiPublisherPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, - phoneNumber + phoneNumber, + lane ); } @@ -203,7 +209,7 @@ public abstract class ReactiveApiPublisher { // Buffer requests to avoid halting the event loop .onBackpressureBuffer(); - kafkaSharedTdlibServers.events(messagesToSend); + kafkaSharedTdlibServers.events(lane, messagesToSend); publishedResultingEvents // Obtain only cluster-bound events @@ -548,8 +554,9 @@ public abstract class ReactiveApiPublisher { public ReactiveApiPublisherToken(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, - String botToken) { - super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId); + String botToken, + String lane) { + super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, lane); this.botToken = botToken; } @@ -579,8 +586,9 @@ public abstract class ReactiveApiPublisher { public ReactiveApiPublisherPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, - long phoneNumber) { - super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId); + long phoneNumber, + String lane) { + super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, lane); this.phoneNumber = phoneNumber; } diff --git a/src/main/java/it/tdlight/reactiveapi/UserTopic.java b/src/main/java/it/tdlight/reactiveapi/UserTopic.java deleted file mode 100644 index 1a57ef6..0000000 --- a/src/main/java/it/tdlight/reactiveapi/UserTopic.java +++ /dev/null @@ -1,40 +0,0 @@ -package it.tdlight.reactiveapi; - -import java.util.Objects; - -public class UserTopic { - - private final String value; - - public UserTopic(KafkaChannelName channelName, long userId) { - value = "tdlib.%s.%d".formatted(channelName.getKafkaName(), userId); - } - - public String getTopic() { - return value; - } - - @Override - public String toString() { - return value; - } - - @Override - public int hashCode() { - return value.hashCode(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - UserTopic userTopic = (UserTopic) o; - - return Objects.equals(value, userTopic.value); - } -}