diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 1691dcc..5d73a5f 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -51,11 +51,13 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo = new ConcurrentHashMap<>(); private final AtomicLong requestId = new AtomicLong(0); private final Disposable subscription; + private final boolean pullMode; public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients, long userId) { this.userId = userId; this.clientId = System.nanoTime(); this.requests = kafkaSharedTdlibClients.requests(); + this.pullMode = kafkaSharedTdlibClients.canRequestsWait(); var disposable2 = kafkaSharedTdlibClients.responses(clientId) .doOnNext(response -> { @@ -120,7 +122,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo @Override public final boolean isPullMode() { - return true; + return pullMode; } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java index 8d92ace..916ef82 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java @@ -40,10 +40,10 @@ public class KafkaSharedTdlibClients implements Closeable { public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) { this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses") - .publish() + .publish(65535) .autoConnect(1, this.responsesSub::set); this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler") - .publish() + .publish(65535) .autoConnect(1, this.eventsSub::set); this.requestsSub = kafkaTdlibClientsChannels.request() .sendMessages(0L, requests.asFlux()) @@ -83,4 +83,8 @@ public class KafkaSharedTdlibClients implements Closeable { } kafkaTdlibClientsChannels.close(); } + + public boolean canRequestsWait() { + return false; + } } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java index c0be0e2..e9e133d 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java @@ -40,7 +40,7 @@ public class KafkaSharedTdlibServers implements Closeable { .subscribe(); this.requests = kafkaTdlibServersChannels.request() .consumeMessages("td-requests") - .publish() + .publish(65535) .autoConnect(1, this.requestsSub::set); }