diff --git a/pom.xml b/pom.xml index 76cdb6c..8a75918 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,10 @@ io.projectreactor reactor-tools + + io.projectreactor.kafka + reactor-kafka + org.apache.logging.log4j log4j-core diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 16fad0e..2735e5b 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -31,7 +31,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -50,6 +49,8 @@ public class AtomixReactiveApi implements ReactiveApi { @Nullable private final String nodeId; private final Atomix atomix; + private final KafkaProducer kafkaProducer; + private final KafkaConsumer kafkaConsumer; private final Set resultingEventTransformerSet; private final AsyncAtomicIdGenerator nextSessionLiveId; @@ -67,10 +68,13 @@ public class AtomixReactiveApi implements ReactiveApi { public AtomixReactiveApi(@Nullable String nodeId, Atomix atomix, + KafkaParameters kafkaParameters, @Nullable DiskSessionsManager diskSessions, @NotNull Set resultingEventTransformerSet) { this.nodeId = nodeId; this.atomix = atomix; + this.kafkaProducer = new KafkaProducer(kafkaParameters); + this.kafkaConsumer = new KafkaConsumer(kafkaParameters); this.resultingEventTransformerSet = resultingEventTransformerSet; if (nodeId == null) { @@ -277,7 +281,7 @@ public class AtomixReactiveApi implements ReactiveApi { userId = createBotSessionRequest.userId(); botToken = createBotSessionRequest.token(); phoneNumber = null; - reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, botToken @@ -287,7 +291,7 @@ public class AtomixReactiveApi implements ReactiveApi { userId = createUserSessionRequest.userId(); botToken = null; phoneNumber = createUserSessionRequest.phoneNumber(); - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, phoneNumber @@ -298,13 +302,13 @@ public class AtomixReactiveApi implements ReactiveApi { botToken = loadSessionFromDiskRequest.token(); phoneNumber = loadSessionFromDiskRequest.phoneNumber(); if (loadSessionFromDiskRequest.phoneNumber() != null) { - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, phoneNumber ); } else { - reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, botToken @@ -475,22 +479,24 @@ public class AtomixReactiveApi implements ReactiveApi { @Override public ReactiveApiClient dynamicClient(long userId) { - return new DynamicAtomixReactiveApiClient(this, userId); + return new DynamicAtomixReactiveApiClient(this, kafkaConsumer, userId); } @Override public ReactiveApiClient liveClient(long liveId, long userId) { - return new LiveAtomixReactiveApiClient(atomix, liveId, userId); + return new LiveAtomixReactiveApiClient(atomix, kafkaConsumer, liveId, userId); } @Override public ReactiveApiMultiClient multiClient() { - return new AtomixReactiveApiMultiClient(this); + return new AtomixReactiveApiMultiClient(this, kafkaConsumer); } @Override public Mono close() { - return Mono.fromCompletionStage(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty()); + var atomixStopper = Mono.fromCompletionStage(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty()); + var kafkaStopper = Mono.fromRunnable(kafkaProducer::close).subscribeOn(Schedulers.boundedElastic()); + return Mono.when(atomixStopper, kafkaStopper); } private record DiskSessionAndId(DiskSession diskSession, long id) {} diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index a2e36eb..7ff1979 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -20,35 +20,21 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut private final ClusterEventService eventService; - private final Flux clientBoundEvents; + private final KafkaConsumer kafkaConsumer; private volatile boolean closed = false; - AtomixReactiveApiMultiClient(AtomixReactiveApi api) { + AtomixReactiveApiMultiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer) { this.eventService = api.getAtomix().getEventService(); - - clientBoundEvents = Flux - .>push(sink -> { - var subscriptionFuture = eventService.subscribe("session-client-bound-events", - LiveAtomixReactiveApiClient::deserializeEvents, - s -> { - sink.next(s); - return CompletableFuture.completedFuture(null); - }, - (a) -> null - ); - sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); - }, OverflowStrategy.ERROR) - .subscribeOn(Schedulers.boundedElastic()) - .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) - .flatMapIterable(list -> list) - .takeUntil(s -> closed) - .share(); + this.kafkaConsumer = kafkaConsumer; } @Override - public Flux clientBoundEvents() { - return clientBoundEvents; + public Flux clientBoundEvents(boolean ack) { + if (closed) { + return Flux.empty(); + } + return kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), ack).takeUntil(s -> closed); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java new file mode 100644 index 0000000..95dc0ea --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventDeserializer.java @@ -0,0 +1,12 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import org.apache.kafka.common.serialization.Deserializer; + +public class ClientBoundEventDeserializer implements Deserializer { + + @Override + public ClientBoundEvent deserialize(String topic, byte[] data) { + return LiveAtomixReactiveApiClient.deserializeEvent(data); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java new file mode 100644 index 0000000..e9138f8 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ClientBoundEventSerializer.java @@ -0,0 +1,12 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import org.apache.kafka.common.serialization.Serializer; + +public class ClientBoundEventSerializer implements Serializer { + + @Override + public byte[] serialize(String topic, ClientBoundEvent data) { + return ReactiveApiPublisher.serializeEvent(data); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java index 58352a1..4cff6dc 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java +++ b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java @@ -10,12 +10,15 @@ import java.util.List; public class ClusterSettings { public String id; + public List kafkaBootstrapServers; public List nodes; @JsonCreator public ClusterSettings(@JsonProperty(required = true, value = "id") String id, + @JsonProperty(required = true, value = "kafkaBootstrapServers") List kafkaBootstrapServers, @JsonProperty(required = true, value = "nodes") List nodes) { this.id = id; + this.kafkaBootstrapServers = kafkaBootstrapServers; this.nodes = nodes; } } diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index e2fb2e7..c183393 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -33,27 +33,12 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl private final Flux liveIdChange; private final Mono liveIdResolution; - DynamicAtomixReactiveApiClient(AtomixReactiveApi api, long userId) { + DynamicAtomixReactiveApiClient(AtomixReactiveApi api, KafkaConsumer kafkaConsumer, long userId) { this.api = api; this.eventService = api.getAtomix().getEventService(); this.userId = userId; - clientBoundEvents = Flux - .>push(sink -> { - var subscriptionFuture = eventService.subscribe("session-client-bound-events", - LiveAtomixReactiveApiClient::deserializeEvents, - s -> { - sink.next(s); - return CompletableFuture.completedFuture(null); - }, - (a) -> null - ); - sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); - }, OverflowStrategy.ERROR) - .subscribeOn(Schedulers.boundedElastic()) - .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) - .flatMapIterable(list -> list) - .filter(e -> e.userId() == userId) + clientBoundEvents = kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), true, userId) .doOnNext(e -> liveId.set(e.liveId())) .share(); diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java index d60cef4..6b19d51 100644 --- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java +++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java @@ -27,7 +27,7 @@ public class Entrypoint { private static final Logger LOG = LoggerFactory.getLogger(Entrypoint.class); - public static record ValidEntrypointArgs(String clusterPath, String instancePath, String diskSessionsPath) {} + public record ValidEntrypointArgs(String clusterPath, String instancePath, String diskSessionsPath) {} public static ValidEntrypointArgs parseArguments(String[] args) { // Check arguments validity @@ -176,7 +176,9 @@ public class Entrypoint { atomix.start().join(); - var api = new AtomixReactiveApi(nodeId, atomix, diskSessions, resultingEventTransformerSet); + var kafkaParameters = new KafkaParameters(clusterSettings, instanceSettings.id); + + var api = new AtomixReactiveApi(nodeId, atomix, kafkaParameters, diskSessions, resultingEventTransformerSet); LOG.info("Starting ReactiveApi..."); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java new file mode 100644 index 0000000..9170c80 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -0,0 +1,91 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.regex.Pattern; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import reactor.core.publisher.Flux; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; + +public class KafkaConsumer { + + private static final Logger LOG = LogManager.getLogger(KafkaConsumer.class); + + private final KafkaParameters kafkaParameters; + + public KafkaConsumer(KafkaParameters kafkaParameters) { + this.kafkaParameters = kafkaParameters; + } + + public KafkaReceiver createReceiver(@NotNull String groupId, + @Nullable Long liveId, + @Nullable Long userId) { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClientBoundEventDeserializer.class); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + ReceiverOptions receiverOptions = ReceiverOptions.create(props); + ReceiverOptions.create(receiverOptions.consumerProperties()); + Pattern pattern; + if (liveId == null && userId == null) { + pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\.[0-9]+"); + } else if (liveId == null) { + pattern = Pattern.compile("tdlib\\.event\\." + userId + "\\.[0-9]+"); + } else if (userId == null) { + pattern = Pattern.compile("tdlib\\.event\\.[0-9]+\\." + liveId); + } else { + pattern = Pattern.compile("tdlib\\.event\\." + userId + "\\." + liveId); + } + ReceiverOptions options = receiverOptions + .subscription(pattern) + .addAssignListener(partitions -> LOG.debug("onPartitionsAssigned {}", partitions)) + .addRevokeListener(partitions -> LOG.debug("onPartitionsRevoked {}", partitions)); + return KafkaReceiver.create(options); + } + + public Flux consumeMessages(@NotNull String groupId, boolean ack, long userId, long liveId) { + if (ack) { + return createReceiver(groupId, liveId, userId) + .receiveAutoAck() + .flatMapSequential(a -> a) + .map(ConsumerRecord::value); + } else { + return createReceiver(groupId, liveId, userId).receive().map(ConsumerRecord::value); + } + } + + public Flux consumeMessages(@NotNull String groupId, boolean ack, long userId) { + if (ack) { + return createReceiver(groupId, null, userId) + .receiveAutoAck() + .flatMapSequential(a -> a) + .map(ConsumerRecord::value); + } else { + return createReceiver(groupId, null, userId).receive().map(ConsumerRecord::value); + } + } + + public Flux consumeMessages(@NotNull String groupId, boolean ack) { + if (ack) { + return createReceiver(groupId, null, null).receiveAutoAck().flatMapSequential(a -> a).map(ConsumerRecord::value); + } else { + return createReceiver(groupId, null, null).receive().map(ConsumerRecord::value); + } + } + + public String newRandomGroupId() { + return UUID.randomUUID().toString(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java new file mode 100644 index 0000000..abb3497 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java @@ -0,0 +1,10 @@ +package it.tdlight.reactiveapi; + +import java.util.stream.Collectors; + +public record KafkaParameters(String clientId, String bootstrapServers) { + + public KafkaParameters(ClusterSettings clusterSettings, String clientId) { + this(clientId, String.join(",", clusterSettings.kafkaBootstrapServers)); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java new file mode 100644 index 0000000..7af2596 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -0,0 +1,52 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; + +public class KafkaProducer { + + private static final Logger LOG = LogManager.getLogger(KafkaProducer.class); + + private final KafkaSender sender; + + + public KafkaProducer(KafkaParameters kafkaParameters) { + Map props = new HashMap<>(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); + props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); + props.put(ProducerConfig.ACKS_CONFIG, "all"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ClientBoundEventSerializer.class); + SenderOptions senderOptions = SenderOptions.create(props); + + sender = KafkaSender.create(senderOptions); + } + + public Mono sendMessages(long liveId, long userId, Flux eventsFlux) { + return eventsFlux + .>map(event -> SenderRecord.create(new ProducerRecord<>( + "tdlib.event.%d.%d".formatted(userId, liveId), + event + ), null)) + .windowTimeout(1024, Duration.ofMillis(10)) + .flatMap(sender::send) + .doOnError(e -> LOG.error("Send failed", e)) + .then(); + } + + public void close() { + sender.close(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index a93ca95..a99466b 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -1,7 +1,6 @@ package it.tdlight.reactiveapi; import io.atomix.cluster.messaging.ClusterEventService; -import io.atomix.cluster.messaging.Subscription; import io.atomix.core.Atomix; import it.tdlight.jni.TdApi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; @@ -21,11 +20,8 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import org.apache.commons.lang3.SerializationException; -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -37,26 +33,11 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { private final Flux clientBoundEvents; - LiveAtomixReactiveApiClient(Atomix atomix, long liveId, long userId) { + LiveAtomixReactiveApiClient(Atomix atomix, KafkaConsumer kafkaConsumer, long liveId, long userId) { this.eventService = atomix.getEventService(); this.liveId = liveId; this.userId = userId; - this.clientBoundEvents = Flux - .>push(sink -> { - var subscriptionFuture = eventService.subscribe("session-client-bound-events", - LiveAtomixReactiveApiClient::deserializeEvents, - s -> { - sink.next(s); - return CompletableFuture.completedFuture(null); - }, - (a) -> null - ); - sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); - }, OverflowStrategy.ERROR) - .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) - .flatMapIterable(list -> list) - .filter(e -> e.userId() == userId && e.liveId() == liveId) - .share(); + this.clientBoundEvents = kafkaConsumer.consumeMessages(kafkaConsumer.newRandomGroupId(), true, userId, liveId).share(); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java index 13a7189..5461f32 100644 --- a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java +++ b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java @@ -53,7 +53,7 @@ public class PeriodicRestarter { public Mono start() { return Mono.fromRunnable(() -> { LOG.info("Starting periodic restarter..."); - multiClient.clientBoundEvents().doOnNext(event -> { + multiClient.clientBoundEvents(true).doOnNext(event -> { if (event instanceof OnUpdateData onUpdate) { if (onUpdate.update() instanceof UpdateAuthorizationState updateAuthorizationState) { if (updateAuthorizationState.authorizationState instanceof AuthorizationStateReady) { diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java index 63dc7d6..c7d8775 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java @@ -8,7 +8,7 @@ import reactor.core.publisher.Mono; public interface ReactiveApiMultiClient { - Flux clientBoundEvents(); + Flux clientBoundEvents(boolean ack); Mono request(long userId, long liveId, TdApi.Function request, Instant timeout); diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index e2c7a2c..c0080bb 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -55,7 +55,6 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -66,6 +65,7 @@ public abstract class ReactiveApiPublisher { private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10); + private final KafkaProducer kafkaProducer; private final ClusterEventService eventService; private final Set resultingEventTransformerSet; private final ReactiveTelegramClient rawTelegramClient; @@ -80,9 +80,11 @@ public abstract class ReactiveApiPublisher { private final AtomicReference path = new AtomicReference<>(); private ReactiveApiPublisher(Atomix atomix, + KafkaProducer kafkaProducer, Set resultingEventTransformerSet, long liveId, long userId) { + this.kafkaProducer = kafkaProducer; this.eventService = atomix.getEventService(); this.resultingEventTransformerSet = resultingEventTransformerSet; this.userId = userId; @@ -101,19 +103,27 @@ public abstract class ReactiveApiPublisher { } public static ReactiveApiPublisher fromToken(Atomix atomix, + KafkaProducer kafkaProducer, Set resultingEventTransformerSet, Long liveId, long userId, String token) { - return new ReactiveApiPublisherToken(atomix, resultingEventTransformerSet, liveId, userId, token); + return new ReactiveApiPublisherToken(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, token); } public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, + KafkaProducer kafkaProducer, Set resultingEventTransformerSet, Long liveId, long userId, long phoneNumber) { - return new ReactiveApiPublisherPhoneNumber(atomix, resultingEventTransformerSet, liveId, userId, phoneNumber); + return new ReactiveApiPublisherPhoneNumber(atomix, + kafkaProducer, + resultingEventTransformerSet, + liveId, + userId, + phoneNumber + ); } public void start(Path path, @Nullable Runnable onClose) { @@ -170,23 +180,16 @@ public abstract class ReactiveApiPublisher { .subscribeOn(Schedulers.parallel()) .subscribe(); - publishedResultingEvents + var messagesToSend = publishedResultingEvents // Obtain only client-bound events .filter(s -> s instanceof ClientBoundResultingEvent) .cast(ClientBoundResultingEvent.class) .map(ClientBoundResultingEvent::event) // Buffer requests to avoid halting the event loop - .onBackpressureBuffer() + .onBackpressureBuffer(); - .limitRate(1) - .bufferTimeout(512, Duration.ofMillis(100)) - - // Send events to the client - .subscribeOn(Schedulers.parallel()) - .publishOn(Schedulers.boundedElastic()) - .subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events", - clientBoundEvent, ReactiveApiPublisher::serializeEvents)); + kafkaProducer.sendMessages(liveId, userId, messagesToSend).subscribeOn(Schedulers.parallel()).subscribe(); publishedResultingEvents // Obtain only cluster-bound events @@ -361,7 +364,7 @@ public abstract class ReactiveApiPublisher { return List.of(); } - private static byte[] serializeEvents(List clientBoundEvents) { + public static byte[] serializeEvents(List clientBoundEvents) { try (var byteArrayOutputStream = new ByteArrayOutputStream()) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { dataOutputStream.writeInt(clientBoundEvents.size()); @@ -375,7 +378,7 @@ public abstract class ReactiveApiPublisher { } } - private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { + public static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { try (var byteArrayOutputStream = new ByteArrayOutputStream()) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { writeClientBoundEvent(clientBoundEvent, dataOutputStream); @@ -496,11 +499,12 @@ public abstract class ReactiveApiPublisher { private final String botToken; public ReactiveApiPublisherToken(Atomix atomix, + KafkaProducer kafkaProducer, Set resultingEventTransformerSet, Long liveId, long userId, String botToken) { - super(atomix, resultingEventTransformerSet, liveId, userId); + super(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId); this.botToken = botToken; } @@ -529,11 +533,12 @@ public abstract class ReactiveApiPublisher { private final long phoneNumber; public ReactiveApiPublisherPhoneNumber(Atomix atomix, + KafkaProducer kafkaProducer, Set resultingEventTransformerSet, Long liveId, long userId, long phoneNumber) { - super(atomix, resultingEventTransformerSet, liveId, userId); + super(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId); this.phoneNumber = phoneNumber; } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 3adab6f..c238aa9 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -11,12 +11,14 @@ + + - + - + - +