diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 3fb1295..2133e50 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -20,18 +20,21 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.Disposable; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.scheduler.Schedulers; public class AtomixReactiveApi implements ReactiveApi { private static final Logger LOG = LoggerFactory.getLogger(AtomixReactiveApi.class); - private final boolean clientOnly; + private final AtomixReactiveApiMode mode; private final KafkaSharedTdlibClients kafkaSharedTdlibClients; @Nullable private final KafkaSharedTdlibServers kafkaSharedTdlibServers; + private final ReactiveApiMultiClient client; private final Set resultingEventTransformerSet; /** @@ -44,12 +47,19 @@ public class AtomixReactiveApi implements ReactiveApi { @Nullable private final DiskSessionsManager diskSessions; private volatile boolean closeRequested; + private volatile Disposable requestsSub; - public AtomixReactiveApi(boolean clientOnly, + public enum AtomixReactiveApiMode { + CLIENT, + SERVER, + FULL + } + + public AtomixReactiveApi(AtomixReactiveApiMode mode, KafkaParameters kafkaParameters, @Nullable DiskSessionsManager diskSessions, @NotNull Set resultingEventTransformerSet) { - this.clientOnly = clientOnly; + this.mode = mode; var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters); var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters); var kafkaClientBoundConsumer = new KafkaClientBoundConsumer(kafkaParameters); @@ -57,10 +67,14 @@ public class AtomixReactiveApi implements ReactiveApi { kafkaTDLibResponseConsumer, kafkaClientBoundConsumer ); - this.kafkaSharedTdlibClients = new KafkaSharedTdlibClients(kafkaTdlibClientsChannels); - if (clientOnly) { - this.kafkaSharedTdlibServers = null; + if (mode != AtomixReactiveApiMode.SERVER) { + this.kafkaSharedTdlibClients = new KafkaSharedTdlibClients(kafkaTdlibClientsChannels); + this.client = new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients); } else { + this.kafkaSharedTdlibClients = null; + this.client = null; + } + if (mode != AtomixReactiveApiMode.CLIENT) { var kafkaTDLibRequestConsumer = new KafkaTdlibRequestConsumer(kafkaParameters); var kafkaTDLibResponseProducer = new KafkaTdlibResponseProducer(kafkaParameters); var kafkaClientBoundProducer = new KafkaClientBoundProducer(kafkaParameters); @@ -69,6 +83,8 @@ public class AtomixReactiveApi implements ReactiveApi { kafkaClientBoundProducer ); this.kafkaSharedTdlibServers = new KafkaSharedTdlibServers(kafkaTDLibServer); + } else { + this.kafkaSharedTdlibServers = null; } this.resultingEventTransformerSet = resultingEventTransformerSet; @@ -90,7 +106,7 @@ public class AtomixReactiveApi implements ReactiveApi { .flatMapIterable(a -> a) .map(a -> new DiskSessionAndId(a.getValue(), a.getKey())); - return idsSavedIntoLocalConfiguration + var loadSessions = idsSavedIntoLocalConfiguration .filter(diskSessionAndId -> { try { diskSessionAndId.diskSession().validate(); @@ -111,13 +127,22 @@ public class AtomixReactiveApi implements ReactiveApi { }) .then() .doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk")); + + return loadSessions.then(Mono.fromRunnable(() -> { + if (kafkaSharedTdlibServers != null) { + requestsSub = kafkaSharedTdlibServers.requests() + .doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data())) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + } + })); } @Override public Mono createSession(CreateSessionRequest req) { LOG.debug("Received create session request: {}", req); - if (clientOnly) { + if (mode == AtomixReactiveApiMode.CLIENT) { return Mono.error(new UnsupportedOperationException("This is a client, it can't have own sessions")); } @@ -225,8 +250,8 @@ public class AtomixReactiveApi implements ReactiveApi { } @Override - public ReactiveApiClient client(long userId) { - return new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients, userId); + public ReactiveApiMultiClient client() { + return client; } @Override @@ -238,9 +263,17 @@ public class AtomixReactiveApi implements ReactiveApi { } else { kafkaServerProducersStopper = Mono.empty(); } - Mono kafkaClientProducersStopper = Mono - .fromRunnable(kafkaSharedTdlibClients::close) - .subscribeOn(Schedulers.boundedElastic()); + Mono kafkaClientProducersStopper; + if (kafkaSharedTdlibClients != null) { + kafkaClientProducersStopper = Mono + .fromRunnable(kafkaSharedTdlibClients::close) + .subscribeOn(Schedulers.boundedElastic()); + } else { + kafkaClientProducersStopper = Mono.empty(); + } + if (requestsSub != null) { + requestsSub.dispose(); + } return Mono.when(kafkaServerProducersStopper, kafkaClientProducersStopper); } diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 5d73a5f..b9fadc9 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -4,6 +4,8 @@ import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.Ignored; import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested; @@ -31,19 +33,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; -import reactor.util.concurrent.Queues; -abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoCloseable { +abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { private static final Logger LOG = LoggerFactory.getLogger(BaseAtomixReactiveApiClient.class); - private static final Duration TEN_MS = Duration.ofMillis(10); + private static final Duration HUNDRED_MS = Duration.ofMillis(100); + private static final long EMPTY_USER_ID = 0; - protected final long userId; // Temporary id used to make requests private final long clientId; private final Many> requests; @@ -51,33 +51,23 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo = new ConcurrentHashMap<>(); private final AtomicLong requestId = new AtomicLong(0); private final Disposable subscription; - private final boolean pullMode; - public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients, long userId) { - this.userId = userId; + public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients) { this.clientId = System.nanoTime(); this.requests = kafkaSharedTdlibClients.requests(); - this.pullMode = kafkaSharedTdlibClients.canRequestsWait(); - var disposable2 = kafkaSharedTdlibClients.responses(clientId) - .doOnNext(response -> { - var responseSink = responses.get(response.data().requestId()); - if (responseSink == null) { - LOG.debug("Bot #IDU{} received a response for an unknown request id: {}", - userId, response.data().requestId()); - return; - } - responseSink.complete(response); - }) - .subscribeOn(Schedulers.parallel()) - .subscribe(); - this.subscription = () -> { - disposable2.dispose(); - }; + this.subscription = kafkaSharedTdlibClients.responses().doOnNext(response -> { + var responseSink = responses.get(response.data().requestId()); + if (responseSink == null) { + LOG.debug("Bot received a response for an unknown request id: {}", response.data().requestId()); + return; + } + responseSink.complete(response); + }).subscribeOn(Schedulers.parallel()).subscribe(); } @Override - public final Mono request(TdApi.Function request, Instant timeout) { + public Mono request(long userId, Function request, Instant timeout) { return Mono.defer(() -> { var requestId = this.requestId.getAndIncrement(); var timeoutError = new TdError(408, "Request Timeout"); @@ -110,22 +100,12 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo } }) .doFinally(s -> this.responses.remove(requestId)); - requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.busyLooping(TEN_MS)); + requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.busyLooping( + HUNDRED_MS)); return response; }); } - @Override - public final long getUserId() { - return userId; - } - - @Override - public final boolean isPullMode() { - return pullMode; - } - - static ClientBoundEvent deserializeEvent(byte[] bytes) { try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) { try (var is = new DataInputStream(byteArrayInputStream)) { @@ -154,12 +134,14 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo } @Override - public void close() { - subscription.dispose(); - long now = System.currentTimeMillis(); - responses.forEach((requestId, cf) -> cf.complete(new Timestamped<>(now, - new Response<>(clientId, requestId, userId, new Error(408, "Request Timeout")) - ))); - responses.clear(); + public Mono close() { + return Mono.fromRunnable(() -> { + subscription.dispose(); + long now = System.currentTimeMillis(); + responses.forEach((requestId, cf) -> cf.complete(new Timestamped<>(now, + new Response<>(clientId, requestId, EMPTY_USER_ID, new Error(408, "Request Timeout")) + ))); + responses.clear(); + }); } } diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java index 4e549cc..c69b027 100644 --- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java +++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java @@ -4,6 +4,7 @@ import static java.util.Collections.unmodifiableSet; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import it.tdlight.reactiveapi.AtomixReactiveApi.AtomixReactiveApiMode; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.file.Files; @@ -66,7 +67,7 @@ public class Entrypoint { @Nullable DiskSessionsManager diskSessions) { Set resultingEventTransformerSet; - boolean clientOnly = false; + AtomixReactiveApiMode mode = AtomixReactiveApiMode.SERVER; if (instanceSettings.client) { if (diskSessions != null) { throw new IllegalArgumentException("A client instance can't have a session manager!"); @@ -74,7 +75,7 @@ public class Entrypoint { if (instanceSettings.clientAddress == null) { throw new IllegalArgumentException("A client instance must have an address (host:port)"); } - clientOnly = true; + mode = AtomixReactiveApiMode.CLIENT; resultingEventTransformerSet = Set.of(); } else { if (diskSessions == null) { @@ -103,7 +104,7 @@ public class Entrypoint { var kafkaParameters = new KafkaParameters(clusterSettings, instanceSettings.id); - var api = new AtomixReactiveApi(clientOnly, kafkaParameters, diskSessions, resultingEventTransformerSet); + var api = new AtomixReactiveApi(mode, kafkaParameters, diskSessions, resultingEventTransformerSet); LOG.info("Starting ReactiveApi..."); diff --git a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java index 7cf2734..abd86c3 100644 --- a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java +++ b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java @@ -13,9 +13,7 @@ public class InstanceSettings { public String id; /** - * True if this is just a client, false if this is a complete node - *

- * A client is a lightweight node + * True if this is just a client, false if this is a server */ public boolean client; diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index 31f824d..a8c05e1 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -7,6 +7,7 @@ import it.tdlight.common.utils.CantLoadLibrary; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.function.Function; import java.util.logging.Level; import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.CommitFailedException; @@ -49,16 +50,16 @@ public abstract class KafkaConsumer { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getChannelName().getDeserializerClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, toIntExact(Duration.ofMinutes(5).toMillis())); - if (isQuickResponse()) { - props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000"); - props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); - } else { + if (!isQuickResponse()) { props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576"); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100"); } - ReceiverOptions receiverOptions = ReceiverOptions.create(props); + ReceiverOptions receiverOptions = ReceiverOptions.create(props) + .commitInterval(Duration.ofSeconds(10)) + .commitBatchSize(65535) + .maxCommitAttempts(100); Pattern pattern; if (userId == null) { pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\.\\d+"); @@ -109,7 +110,9 @@ public abstract class KafkaConsumer { private Flux> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) - .receive() + .receiveAutoAck(isQuickResponse() ? null : 32) + .concatMap(Flux::collectList) + .flatMapIterable(Function.identity()) .log("consume-messages" + (userId != null ? "-" + userId : ""), Level.FINEST, SignalType.REQUEST, @@ -117,7 +120,6 @@ public abstract class KafkaConsumer { SignalType.ON_ERROR, SignalType.ON_COMPLETE ) - //.doOnNext(result -> result.receiverOffset().acknowledge()) .map(record -> { if (record.timestampType() == TimestampType.CREATE_TIME) { return new Timestamped<>(record.timestamp(), record.value()); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index cedbc9f..ce5ee26 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -26,9 +26,9 @@ public abstract class KafkaProducer { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); - //props.put(ProducerConfig.ACKS_CONFIG, "1"); - props.put(ProducerConfig.ACKS_CONFIG, "0"); - props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); + props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); + props.put(ProducerConfig.LINGER_MS_CONFIG, "20"); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getChannelName().getSerializerClass()); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java index 916ef82..0dbe44e 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java @@ -1,7 +1,6 @@ package it.tdlight.reactiveapi; import static java.util.Objects.requireNonNullElse; -import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; @@ -39,31 +38,20 @@ public class KafkaSharedTdlibClients implements Closeable { public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) { this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; - this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses") - .publish(65535) - .autoConnect(1, this.responsesSub::set); - this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler") - .publish(65535) - .autoConnect(1, this.eventsSub::set); + this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses"); + this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler"); this.requestsSub = kafkaTdlibClientsChannels.request() .sendMessages(0L, requests.asFlux()) .subscribeOn(Schedulers.parallel()) .subscribe(); } - public Flux>> responses(long clientId) { - return responses - .filter(group -> group.data().clientId() == clientId) - //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) - .log("req-" + clientId, Level.FINEST, SignalType.REQUEST); + public Flux>> responses() { + return responses; } - public Flux> events(long userId) { - return events - .filter(group -> group.data().userId() == userId) - //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) - .doOnSubscribe(s -> LOG.info("Reading updates of client: {}", userId)) - .log("event-" + userId, Level.FINEST, SignalType.REQUEST); + public Flux> events() { + return events; } public Many> requests() { @@ -83,8 +71,4 @@ public class KafkaSharedTdlibClients implements Closeable { } kafkaTdlibClientsChannels.close(); } - - public boolean canRequestsWait() { - return false; - } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java index e9e133d..4e51f22 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java @@ -1,7 +1,6 @@ package it.tdlight.reactiveapi; import static java.util.Objects.requireNonNullElse; -import static reactor.core.publisher.Sinks.EmitFailureHandler.busyLooping; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Object; @@ -39,16 +38,13 @@ public class KafkaSharedTdlibServers implements Closeable { .subscribeOn(Schedulers.parallel()) .subscribe(); this.requests = kafkaTdlibServersChannels.request() - .consumeMessages("td-requests") - .publish(65535) - .autoConnect(1, this.requestsSub::set); + .consumeMessages("td-requests"); } - public Flux>> requests(long userId) { + public Flux>> requests() { return requests - .filter(group -> group.data().userId() == userId) //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) - .log("requests-" + userId, Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT); + .log("requests", Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT); } public Disposable events(Flux eventFlux) { diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 10e66e9..8aa2866 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -7,9 +7,9 @@ public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { private final Flux clientBoundEvents; - LiveAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients, long userId) { - super(kafkaSharedTdlibClients, userId); - this.clientBoundEvents = kafkaSharedTdlibClients.events(userId).map(Timestamped::data); + LiveAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients) { + super(kafkaSharedTdlibClients); + this.clientBoundEvents = kafkaSharedTdlibClients.events().map(Timestamped::data); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java index e1fae43..93fb7be 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java @@ -11,7 +11,7 @@ public interface ReactiveApi { Mono createSession(CreateSessionRequest req); - ReactiveApiClient client(long userId); + ReactiveApiMultiClient client(); Mono close(); diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java index e92cbf0..837cfe9 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiClient.java @@ -6,13 +6,9 @@ import java.time.Instant; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface ReactiveApiClient { +public interface ReactiveApiClient extends ReactiveApiThinClient { Flux clientBoundEvents(); - Mono request(TdApi.Function request, Instant timeout); - - long getUserId(); - boolean isPullMode(); } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java index c7d8775..5c60172 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java @@ -1,6 +1,8 @@ package it.tdlight.reactiveapi; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import java.time.Instant; import reactor.core.publisher.Flux; @@ -8,9 +10,23 @@ import reactor.core.publisher.Mono; public interface ReactiveApiMultiClient { - Flux clientBoundEvents(boolean ack); + Flux clientBoundEvents(); - Mono request(long userId, long liveId, TdApi.Function request, Instant timeout); + Mono request(long userId, TdApi.Function request, Instant timeout); - void close(); + Mono close(); + + default ReactiveApiThinClient view(long userId) { + return new ReactiveApiThinClient() { + @Override + public Mono request(Function request, Instant timeout) { + return ReactiveApiMultiClient.this.request(userId, request, timeout); + } + + @Override + public long getUserId() { + return userId; + } + }; + } } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index b5a6720..716e20b 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -49,26 +49,27 @@ import java.util.List; import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.kafka.common.errors.SerializationException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; -import reactor.util.concurrent.Queues; public abstract class ReactiveApiPublisher { private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5); - private static final Duration TEN_MS = Duration.ofMillis(10); + private static final Duration HUNDRED_MS = Duration.ofMillis(100); private final KafkaSharedTdlibServers kafkaSharedTdlibServers; private final Set resultingEventTransformerSet; private final ReactiveTelegramClient rawTelegramClient; @@ -96,7 +97,6 @@ public abstract class ReactiveApiPublisher { throw new RuntimeException("Can't load TDLight", e); } this.telegramClient = Flux.create(sink -> { - var subscription = this.registerTopics(); try { rawTelegramClient.createAndRegisterClient(); } catch (Throwable ex) { @@ -106,7 +106,6 @@ public abstract class ReactiveApiPublisher { rawTelegramClient.setListener(sink::next); sink.onCancel(rawTelegramClient::cancel); sink.onDispose(() -> { - subscription.dispose(); rawTelegramClient.dispose(); }); }); @@ -455,21 +454,6 @@ public abstract class ReactiveApiPublisher { } } - @SuppressWarnings("unchecked") - private Disposable registerTopics() { - var subscription1 = kafkaSharedTdlibServers.requests(userId) - .flatMapSequential(req -> this - .handleRequest(req.data()) - .doOnNext(response -> this.responses.emitNext(response, EmitFailureHandler.busyLooping(TEN_MS))) - .then() - ) - .subscribeOn(Schedulers.parallel()) - .subscribe(); - return () -> { - subscription1.dispose(); - }; - } - private static byte[] serializeResponse(Response response) { if (response == null) return null; var id = response.getId(); @@ -486,13 +470,19 @@ public abstract class ReactiveApiPublisher { } } - private Mono> handleRequest(OnRequest onRequestObj) { + public void handleRequest(OnRequest onRequestObj) { + handleRequestInternal(onRequestObj, + response -> this.responses.emitNext(response, EmitFailureHandler.busyLooping(HUNDRED_MS))); + } + + private void handleRequestInternal(OnRequest onRequestObj, Consumer> r) { if (onRequestObj instanceof OnRequest.InvalidRequest invalidRequest) { - return Mono.just(new Event.OnResponse.Response<>(invalidRequest.clientId(), + r.accept(new Event.OnResponse.Response<>(invalidRequest.clientId(), invalidRequest.requestId(), userId, new TdApi.Error(400, "Conflicting protocol version") )); + return; } var requestObj = (Request) onRequestObj; var requestWithTimeoutInstant = new RequestWithTimeoutInstant<>(requestObj.request(), requestObj.timeout()); @@ -504,15 +494,36 @@ public abstract class ReactiveApiPublisher { LOG.warn("Received an expired request. Expiration: {}", requestWithTimeoutInstant.timeout()); } - return Mono - .from(rawTelegramClient.send(request, timeoutDuration)) - .map(responseObj -> new Event.OnResponse.Response<>(onRequestObj.clientId(), + rawTelegramClient.send(request, timeoutDuration).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(1); + } + + @Override + public void onNext(Object responseObj) { + r.accept(new Event.OnResponse.Response<>(onRequestObj.clientId(), onRequestObj.requestId(), - userId, responseObj)) - .publishOn(Schedulers.parallel()); + userId, responseObj)); + } + + @Override + public void onError(Throwable throwable) { + LOG.error("Unexpected error while processing response for update {}, user {}, client {}", + onRequestObj.requestId(), + onRequestObj.userId(), + onRequestObj.clientId() + ); + } + + @Override + public void onComplete() { + + } + }); } else { LOG.error("Ignored a request to {} because the current state is {}. Request: {}", userId, state, requestObj); - return Mono.just(new Event.OnResponse.Response<>(onRequestObj.clientId(), + r.accept(new Event.OnResponse.Response<>(onRequestObj.clientId(), onRequestObj.requestId(), userId, new TdApi.Error(503, "Service Unavailable: " + state))); } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiThinClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiThinClient.java new file mode 100644 index 0000000..64fd7e4 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiThinClient.java @@ -0,0 +1,14 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.time.Instant; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface ReactiveApiThinClient { + + Mono request(TdApi.Function request, Instant timeout); + + long getUserId(); +}