This commit is contained in:
Andrea Cavalli 2022-09-11 17:36:15 +02:00
parent 83613b2d01
commit 5e40530a20
3 changed files with 10 additions and 4 deletions

View File

@ -51,11 +51,13 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo
= new ConcurrentHashMap<>(); = new ConcurrentHashMap<>();
private final AtomicLong requestId = new AtomicLong(0); private final AtomicLong requestId = new AtomicLong(0);
private final Disposable subscription; private final Disposable subscription;
private final boolean pullMode;
public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients, long userId) { public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients, long userId) {
this.userId = userId; this.userId = userId;
this.clientId = System.nanoTime(); this.clientId = System.nanoTime();
this.requests = kafkaSharedTdlibClients.requests(); this.requests = kafkaSharedTdlibClients.requests();
this.pullMode = kafkaSharedTdlibClients.canRequestsWait();
var disposable2 = kafkaSharedTdlibClients.responses(clientId) var disposable2 = kafkaSharedTdlibClients.responses(clientId)
.doOnNext(response -> { .doOnNext(response -> {
@ -120,7 +122,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo
@Override @Override
public final boolean isPullMode() { public final boolean isPullMode() {
return true; return pullMode;
} }

View File

@ -40,10 +40,10 @@ public class KafkaSharedTdlibClients implements Closeable {
public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) { public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) {
this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels;
this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses") this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses")
.publish() .publish(65535)
.autoConnect(1, this.responsesSub::set); .autoConnect(1, this.responsesSub::set);
this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler") this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler")
.publish() .publish(65535)
.autoConnect(1, this.eventsSub::set); .autoConnect(1, this.eventsSub::set);
this.requestsSub = kafkaTdlibClientsChannels.request() this.requestsSub = kafkaTdlibClientsChannels.request()
.sendMessages(0L, requests.asFlux()) .sendMessages(0L, requests.asFlux())
@ -83,4 +83,8 @@ public class KafkaSharedTdlibClients implements Closeable {
} }
kafkaTdlibClientsChannels.close(); kafkaTdlibClientsChannels.close();
} }
public boolean canRequestsWait() {
return false;
}
} }

View File

@ -40,7 +40,7 @@ public class KafkaSharedTdlibServers implements Closeable {
.subscribe(); .subscribe();
this.requests = kafkaTdlibServersChannels.request() this.requests = kafkaTdlibServersChannels.request()
.consumeMessages("td-requests") .consumeMessages("td-requests")
.publish() .publish(65535)
.autoConnect(1, this.requestsSub::set); .autoConnect(1, this.requestsSub::set);
} }