From bd463a74d21304363dcf32cc6b40aec544b4c789 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 10 Sep 2022 20:25:54 +0200 Subject: [PATCH] Single consumers and producers --- .../reactiveapi/AtomixReactiveApi.java | 30 +++--- .../BaseAtomixReactiveApiClient.java | 35 ++----- .../java/it/tdlight/reactiveapi/Event.java | 6 +- .../it/tdlight/reactiveapi/KafkaConsumer.java | 1 + .../reactiveapi/KafkaSharedTdlibClients.java | 92 +++++++++++++++++++ .../reactiveapi/KafkaSharedTdlibServers.java | 76 +++++++++++++++ .../tdlight/reactiveapi/KafkaTdlibClient.java | 15 --- .../KafkaTdlibClientsChannels.java | 13 +++ .../tdlight/reactiveapi/KafkaTdlibServer.java | 16 ---- .../KafkaTdlibServersChannels.java | 14 +++ .../LiveAtomixReactiveApiClient.java | 11 +-- .../it/tdlight/reactiveapi/ReactiveApi.java | 2 +- .../reactiveapi/ReactiveApiPublisher.java | 42 +++------ .../reactiveapi/TdlibRequestDeserializer.java | 5 +- .../reactiveapi/TdlibRequestSerializer.java | 1 + 15 files changed, 245 insertions(+), 114 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java create mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibClient.java create mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibServer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index bd3acb9..3fb1295 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -29,9 +29,9 @@ public class AtomixReactiveApi implements ReactiveApi { private final boolean clientOnly; - private final KafkaTdlibClient kafkaTDLibClient; + private final KafkaSharedTdlibClients kafkaSharedTdlibClients; @Nullable - private final KafkaTdlibServer kafkaTDLibServer; + private final KafkaSharedTdlibServers kafkaSharedTdlibServers; private final Set resultingEventTransformerSet; /** @@ -53,20 +53,22 @@ public class AtomixReactiveApi implements ReactiveApi { var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters); var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters); var kafkaClientBoundConsumer = new KafkaClientBoundConsumer(kafkaParameters); - this.kafkaTDLibClient = new KafkaTdlibClient(kafkaTDLibRequestProducer, + var kafkaTdlibClientsChannels = new KafkaTdlibClientsChannels(kafkaTDLibRequestProducer, kafkaTDLibResponseConsumer, kafkaClientBoundConsumer ); + this.kafkaSharedTdlibClients = new KafkaSharedTdlibClients(kafkaTdlibClientsChannels); if (clientOnly) { - this.kafkaTDLibServer = null; + this.kafkaSharedTdlibServers = null; } else { var kafkaTDLibRequestConsumer = new KafkaTdlibRequestConsumer(kafkaParameters); var kafkaTDLibResponseProducer = new KafkaTdlibResponseProducer(kafkaParameters); var kafkaClientBoundProducer = new KafkaClientBoundProducer(kafkaParameters); - this.kafkaTDLibServer = new KafkaTdlibServer(kafkaTDLibRequestConsumer, + var kafkaTDLibServer = new KafkaTdlibServersChannels(kafkaTDLibRequestConsumer, kafkaTDLibResponseProducer, kafkaClientBoundProducer ); + this.kafkaSharedTdlibServers = new KafkaSharedTdlibServers(kafkaTDLibServer); } this.resultingEventTransformerSet = resultingEventTransformerSet; @@ -130,7 +132,7 @@ public class AtomixReactiveApi implements ReactiveApi { userId = createBotSessionRequest.userId(); botToken = createBotSessionRequest.token(); phoneNumber = null; - reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaTDLibServer, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, botToken ); @@ -139,7 +141,7 @@ public class AtomixReactiveApi implements ReactiveApi { userId = createUserSessionRequest.userId(); botToken = null; phoneNumber = createUserSessionRequest.phoneNumber(); - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaTDLibServer, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, phoneNumber ); @@ -149,13 +151,13 @@ public class AtomixReactiveApi implements ReactiveApi { botToken = loadSessionFromDiskRequest.token(); phoneNumber = loadSessionFromDiskRequest.phoneNumber(); if (loadSessionFromDiskRequest.phoneNumber() != null) { - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaTDLibServer, + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, phoneNumber ); } else { - reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaTDLibServer, + reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, botToken @@ -223,21 +225,21 @@ public class AtomixReactiveApi implements ReactiveApi { } @Override - public ReactiveApiClient client(String subGroupId, long userId) { - return new LiveAtomixReactiveApiClient(kafkaTDLibClient, userId, subGroupId); + public ReactiveApiClient client(long userId) { + return new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients, userId); } @Override public Mono close() { closeRequested = true; Mono kafkaServerProducersStopper; - if (kafkaTDLibServer != null) { - kafkaServerProducersStopper = Mono.fromRunnable(kafkaTDLibServer::close).subscribeOn(Schedulers.boundedElastic()); + if (kafkaSharedTdlibServers != null) { + kafkaServerProducersStopper = Mono.fromRunnable(kafkaSharedTdlibServers::close).subscribeOn(Schedulers.boundedElastic()); } else { kafkaServerProducersStopper = Mono.empty(); } Mono kafkaClientProducersStopper = Mono - .fromRunnable(kafkaTDLibClient::close) + .fromRunnable(kafkaSharedTdlibClients::close) .subscribeOn(Schedulers.boundedElastic()); return Mono.when(kafkaServerProducersStopper, kafkaClientProducersStopper); } diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 0afa055..1691dcc 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -2,10 +2,8 @@ package it.tdlight.reactiveapi; import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; -import it.tdlight.common.utils.LibraryVersion; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; -import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.Ignored; import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; @@ -14,40 +12,25 @@ import it.tdlight.reactiveapi.Event.OnPasswordRequested; import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnRequest.Request; import it.tdlight.reactiveapi.Event.OnResponse; -import it.tdlight.reactiveapi.Event.OnResponse.InvalidResponse; import it.tdlight.reactiveapi.Event.OnResponse.Response; import it.tdlight.reactiveapi.Event.OnUpdateData; import it.tdlight.reactiveapi.Event.OnUpdateError; import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.net.ConnectException; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.common.errors.SerializationException; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoSink; -import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; @@ -63,23 +46,18 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo protected final long userId; // Temporary id used to make requests private final long clientId; - private final Many> requests - = Sinks.many().unicast().onBackpressureBuffer(Queues.>small().get()); + private final Many> requests; private final Map>>> responses = new ConcurrentHashMap<>(); private final AtomicLong requestId = new AtomicLong(0); private final Disposable subscription; - public BaseAtomixReactiveApiClient(KafkaTdlibClient kafkaTdlibClient, long userId) { + public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients, long userId) { this.userId = userId; this.clientId = System.nanoTime(); - var subscription1 = kafkaTdlibClient.request().sendMessages(userId, requests.asFlux()) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(v -> {}, ex -> LOG.error("Failed to send requests", ex)); + this.requests = kafkaSharedTdlibClients.requests(); - var subscription2 = kafkaTdlibClient.response() - .consumeMessages("td-responses", userId) - .filter(response -> response.data().clientId() == clientId) + var disposable2 = kafkaSharedTdlibClients.responses(clientId) .doOnNext(response -> { var responseSink = responses.get(response.data().requestId()); if (responseSink == null) { @@ -92,8 +70,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo .subscribeOn(Schedulers.parallel()) .subscribe(); this.subscription = () -> { - subscription1.dispose(); - subscription2.dispose(); + disposable2.dispose(); }; } @@ -131,7 +108,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo } }) .doFinally(s -> this.responses.remove(requestId)); - requests.emitNext(new Request<>(clientId, requestId, request, timeout), EmitFailureHandler.busyLooping(TEN_MS)); + requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.busyLooping(TEN_MS)); return response; }); } diff --git a/src/main/java/it/tdlight/reactiveapi/Event.java b/src/main/java/it/tdlight/reactiveapi/Event.java index ea2b7b5..be0f49a 100644 --- a/src/main/java/it/tdlight/reactiveapi/Event.java +++ b/src/main/java/it/tdlight/reactiveapi/Event.java @@ -54,10 +54,12 @@ public sealed interface Event { sealed interface OnRequest extends ServerBoundEvent { - record Request(long clientId, long requestId, TdApi.Function request, + record Request(long userId, long clientId, long requestId, TdApi.Function request, Instant timeout) implements OnRequest {} - record InvalidRequest(long clientId, long requestId) implements OnRequest {} + record InvalidRequest(long userId, long clientId, long requestId) implements OnRequest {} + + long userId(); long clientId(); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 3d28e6f..ec640b4 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -51,6 +51,7 @@ public abstract class KafkaConsumer { props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, toIntExact(Duration.ofMinutes(5).toMillis())); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576"); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100"); + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); ReceiverOptions receiverOptions = ReceiverOptions .create(props) .commitInterval(Duration.ofSeconds(10)) diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java new file mode 100644 index 0000000..7a7b406 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java @@ -0,0 +1,92 @@ +package it.tdlight.reactiveapi; + +import static java.util.Objects.requireNonNullElse; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; + +import it.tdlight.jni.TdApi.Object; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnRequest; +import it.tdlight.reactiveapi.Event.OnResponse; +import java.io.Closeable; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.logging.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.GroupedFlux; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; + +public class KafkaSharedTdlibClients implements Closeable { + + private static final Logger LOG = LogManager.getLogger(KafkaSharedTdlibClients.class); + + private final KafkaTdlibClientsChannels kafkaTdlibClientsChannels; + private final AtomicReference responsesSub = new AtomicReference<>(); + private final Disposable requestsSub; + private final AtomicReference eventsSub = new AtomicReference<>(); + private final Flux>>> responses; + private final Flux>> events; + private final Many> requests = Sinks.many().unicast() + .onBackpressureBuffer(Queues.>get(65535).get()); + + public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) { + this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; + this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses") + .onBackpressureBuffer() + .groupBy(k1 -> k1.data().clientId(), 1) + .replay() + .autoConnect(1, this.responsesSub::set); + this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler") + .onBackpressureBuffer() + .groupBy(k -> k.data().userId(), 1) + .doOnNext(g -> LOG.info("Receiving updates of client: {}", g.key())) + .replay() + .autoConnect(1, this.eventsSub::set); + this.requestsSub = kafkaTdlibClientsChannels.request() + .sendMessages(0L, requests.asFlux()) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + } + + public Flux>> responses(long clientId) { + return responses.filter(group -> group.key() == clientId) + .take(1, true) + .singleOrEmpty() + .flatMapMany(Function.identity()) + .log("req-" + clientId, Level.FINE, SignalType.REQUEST); + } + + public Flux> events(long userId) { + return events.filter(group -> group.key() == userId) + .take(1, true) + .singleOrEmpty() + .flatMapMany(Function.identity()) + .doOnSubscribe(s -> LOG.info("Reading updates of client: {}", userId)); + //.log("event-" + userId, Level.FINE, SignalType.REQUEST); + } + + public Many> requests() { + return requests; + } + + @Override + public void close() { + requestsSub.dispose(); + var responsesSub = this.responsesSub.get(); + if (responsesSub != null) { + responsesSub.dispose(); + } + var eventsSub = this.eventsSub.get(); + if (eventsSub != null) { + eventsSub.dispose(); + } + kafkaTdlibClientsChannels.close(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java new file mode 100644 index 0000000..44cf37c --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java @@ -0,0 +1,76 @@ +package it.tdlight.reactiveapi; + +import static java.util.Objects.requireNonNullElse; +import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnRequest; +import it.tdlight.reactiveapi.Event.OnResponse; +import java.io.Closeable; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.logging.Level; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.GroupedFlux; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; + +public class KafkaSharedTdlibServers implements Closeable { + + private final KafkaTdlibServersChannels kafkaTdlibServersChannels; + private final Disposable responsesSub; + private final AtomicReference requestsSub = new AtomicReference<>(); + private final Many> responses = Sinks.many().unicast().onBackpressureBuffer( + Queues.>get(65535).get()); + private final Flux>>> requests; + + public KafkaSharedTdlibServers(KafkaTdlibServersChannels kafkaTdlibServersChannels) { + this.kafkaTdlibServersChannels = kafkaTdlibServersChannels; + this.responsesSub = kafkaTdlibServersChannels.response() + .sendMessages(0L, responses.asFlux()) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + this.requests = kafkaTdlibServersChannels.request() + .consumeMessages("td-requests") + .onBackpressureBuffer() + .groupBy(k -> k.data().userId(), 1) + .replay() + .autoConnect(1, this.requestsSub::set); + } + + public Flux>> requests(long userId) { + return requests.filter(group -> group.key() == userId) + .take(1, true) + .singleOrEmpty() + .flatMapMany(Function.identity()) + .log("req-" + userId, Level.FINE, SignalType.REQUEST, SignalType.ON_NEXT); + } + + public Disposable events(Flux eventFlux) { + return kafkaTdlibServersChannels.events() + .sendMessages(0L, eventFlux) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + } + + public Many> responses() { + return responses; + } + + @Override + public void close() { + responsesSub.dispose(); + var requestsSub = this.requestsSub.get(); + if (requestsSub != null) { + requestsSub.dispose(); + } + kafkaTdlibServersChannels.close(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClient.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClient.java deleted file mode 100644 index 0457b41..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClient.java +++ /dev/null @@ -1,15 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.jni.TdApi; -import java.io.Closeable; -import java.io.IOException; - -public record KafkaTdlibClient(KafkaTdlibRequestProducer request, - KafkaTdlibResponseConsumer response, - KafkaClientBoundConsumer events) implements Closeable { - - @Override - public void close() { - request.close(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java new file mode 100644 index 0000000..c09cc80 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java @@ -0,0 +1,13 @@ +package it.tdlight.reactiveapi; + +import java.io.Closeable; + +public record KafkaTdlibClientsChannels(KafkaTdlibRequestProducer request, + KafkaTdlibResponseConsumer response, + KafkaClientBoundConsumer events) implements Closeable { + + @Override + public void close() { + request.close(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServer.java deleted file mode 100644 index 4fc067d..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServer.java +++ /dev/null @@ -1,16 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.jni.TdApi; -import java.io.Closeable; -import java.io.IOException; - -public record KafkaTdlibServer(KafkaTdlibRequestConsumer request, - KafkaTdlibResponseProducer response, - KafkaClientBoundProducer events) implements Closeable { - - @Override - public void close() { - response.close(); - events.close(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java new file mode 100644 index 0000000..347ee9f --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java @@ -0,0 +1,14 @@ +package it.tdlight.reactiveapi; + +import java.io.Closeable; + +public record KafkaTdlibServersChannels(KafkaTdlibRequestConsumer request, + KafkaTdlibResponseProducer response, + KafkaClientBoundProducer events) implements Closeable { + + @Override + public void close() { + response.close(); + events.close(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 6702530..10e66e9 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -1,20 +1,15 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; -import java.time.Duration; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.util.retry.Retry; public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { private final Flux clientBoundEvents; - LiveAtomixReactiveApiClient(KafkaTdlibClient kafkaTdlibClient, long userId, String subGroupId) { - super(kafkaTdlibClient, userId); - this.clientBoundEvents = kafkaTdlibClient.events() - .consumeMessages(subGroupId, userId) - .map(Timestamped::data); + LiveAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients, long userId) { + super(kafkaSharedTdlibClients, userId); + this.clientBoundEvents = kafkaSharedTdlibClients.events(userId).map(Timestamped::data); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java index cc6de82..e1fae43 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java @@ -11,7 +11,7 @@ public interface ReactiveApi { Mono createSession(CreateSessionRequest req); - ReactiveApiClient client(String subGroupId, long userId); + ReactiveApiClient client(long userId); Mono close(); diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 0901a57..b5a6720 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -4,9 +4,7 @@ import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN; import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT; import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.CompletableFuture.completedFuture; -import com.google.common.primitives.Longs; import it.tdlight.common.Init; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.common.Response; @@ -39,9 +37,7 @@ import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent; import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed; import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; import it.tdlight.tdlight.ClientManager; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -52,7 +48,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.StringJoiner; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.errors.SerializationException; import org.jetbrains.annotations.NotNull; @@ -74,8 +69,7 @@ public abstract class ReactiveApiPublisher { private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5); private static final Duration TEN_MS = Duration.ofMillis(10); - - private final KafkaTdlibServer kafkaTdlibServer; + private final KafkaSharedTdlibServers kafkaSharedTdlibServers; private final Set resultingEventTransformerSet; private final ReactiveTelegramClient rawTelegramClient; private final Flux telegramClient; @@ -83,18 +77,18 @@ public abstract class ReactiveApiPublisher { private final AtomicReference state = new AtomicReference<>(new State(LOGGED_OUT)); protected final long userId; - private final Many> responses - = Sinks.many().unicast().onBackpressureBuffer(Queues.>small().get()); + private final Many> responses; private final AtomicReference disposable = new AtomicReference<>(); private final AtomicReference path = new AtomicReference<>(); - private ReactiveApiPublisher(KafkaTdlibServer kafkaTdlibServer, + private ReactiveApiPublisher(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId) { - this.kafkaTdlibServer = kafkaTdlibServer; + this.kafkaSharedTdlibServers = kafkaSharedTdlibServers; this.resultingEventTransformerSet = resultingEventTransformerSet; this.userId = userId; + this.responses = this.kafkaSharedTdlibServers.responses(); this.rawTelegramClient = ClientManager.createReactive(); try { Init.start(); @@ -118,18 +112,18 @@ public abstract class ReactiveApiPublisher { }); } - public static ReactiveApiPublisher fromToken(KafkaTdlibServer kafkaTdlibServer, + public static ReactiveApiPublisher fromToken(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, String token) { - return new ReactiveApiPublisherToken(kafkaTdlibServer, resultingEventTransformerSet, userId, token); + return new ReactiveApiPublisherToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, token); } - public static ReactiveApiPublisher fromPhoneNumber(KafkaTdlibServer kafkaTdlibServer, + public static ReactiveApiPublisher fromPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber) { - return new ReactiveApiPublisherPhoneNumber(kafkaTdlibServer, + return new ReactiveApiPublisherPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, phoneNumber @@ -210,7 +204,7 @@ public abstract class ReactiveApiPublisher { // Buffer requests to avoid halting the event loop .onBackpressureBuffer(); - kafkaTdlibServer.events().sendMessages(userId, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe(); + kafkaSharedTdlibServers.events(messagesToSend); publishedResultingEvents // Obtain only cluster-bound events @@ -463,7 +457,7 @@ public abstract class ReactiveApiPublisher { @SuppressWarnings("unchecked") private Disposable registerTopics() { - var subscription1 = kafkaTdlibServer.request().consumeMessages("td-requests-handler", userId) + var subscription1 = kafkaSharedTdlibServers.requests(userId) .flatMapSequential(req -> this .handleRequest(req.data()) .doOnNext(response -> this.responses.emitNext(response, EmitFailureHandler.busyLooping(TEN_MS))) @@ -471,14 +465,8 @@ public abstract class ReactiveApiPublisher { ) .subscribeOn(Schedulers.parallel()) .subscribe(); - var subscription2 = this.kafkaTdlibServer - .response() - .sendMessages(userId, responses.asFlux()) - .subscribeOn(Schedulers.parallel()) - .subscribe(); return () -> { subscription1.dispose(); - subscription2.dispose(); }; } @@ -543,11 +531,11 @@ public abstract class ReactiveApiPublisher { private final String botToken; - public ReactiveApiPublisherToken(KafkaTdlibServer kafkaTdlibServer, + public ReactiveApiPublisherToken(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, String botToken) { - super(kafkaTdlibServer, resultingEventTransformerSet, userId); + super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId); this.botToken = botToken; } @@ -574,11 +562,11 @@ public abstract class ReactiveApiPublisher { private final long phoneNumber; - public ReactiveApiPublisherPhoneNumber(KafkaTdlibServer kafkaTdlibServer, + public ReactiveApiPublisherPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber) { - super(kafkaTdlibServer, resultingEventTransformerSet, userId); + super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId); this.phoneNumber = phoneNumber; } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java index cae067c..cf53703 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibRequestDeserializer.java @@ -27,11 +27,12 @@ public class TdlibRequestDeserializer implements Deseria try { var bais = new ByteArrayInputStream(data); var dais = new DataInputStream(bais); + var userId = dais.readLong(); var clientId = dais.readLong(); var requestId = dais.readLong(); if (dais.readInt() != SERIAL_VERSION) { // Deprecated request - return new InvalidRequest<>(clientId, requestId); + return new InvalidRequest<>(userId, clientId, requestId); } else { long millis = dais.readLong(); Instant timeout; @@ -42,7 +43,7 @@ public class TdlibRequestDeserializer implements Deseria } @SuppressWarnings("unchecked") TdApi.Function request = (TdApi.Function) TdApi.Deserializer.deserialize(dais); - return new Request<>(clientId, requestId, request, timeout); + return new Request<>(userId, clientId, requestId, request, timeout); } } catch (UnsupportedOperationException | IOException e) { throw new SerializationException(e); diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java index aeb36fe..a6f1704 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibRequestSerializer.java @@ -25,6 +25,7 @@ public class TdlibRequestSerializer implements Serialize } else { try(var baos = new ByteArrayOutputStream()) { try (var daos = new DataOutputStream(baos)) { + daos.writeLong(data.userId()); daos.writeLong(data.clientId()); daos.writeLong(data.requestId()); daos.writeInt(SERIAL_VERSION);