This commit is contained in:
Andrea Cavalli 2021-01-13 21:09:29 +01:00
parent 4ca148531c
commit 9258cc4d1b
2 changed files with 8 additions and 3 deletions

View File

@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class TDLibRemoteClient implements AutoCloseable { public class TDLibRemoteClient implements AutoCloseable {
@ -34,6 +36,7 @@ public class TDLibRemoteClient implements AutoCloseable {
private final int port; private final int port;
private final Set<String> membersAddresses; private final Set<String> membersAddresses;
private final Many<TdClusterManager> clusterManager = Sinks.many().replay().latest(); private final Many<TdClusterManager> clusterManager = Sinks.many().replay().latest();
private final Scheduler deploymentScheduler = Schedulers.newSingle("TDLib", false);
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses) { public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses) {
this.securityInfo = securityInfo; this.securityInfo = securityInfo;
@ -151,7 +154,7 @@ public class TDLibRemoteClient implements AutoCloseable {
}) })
.doOnError(ex -> { .doOnError(ex -> {
logger.error(ex.getLocalizedMessage(), ex); logger.error(ex.getLocalizedMessage(), ex);
}).subscribe(i -> {}, e -> { }).subscribeOn(deploymentScheduler).subscribe(i -> {}, e -> {
logger.error("Remote client error", e); logger.error("Remote client error", e);
}, () -> startedEventHandler.handle(null)); }, () -> startedEventHandler.handle(null));
} catch (IOException ex) { } catch (IOException ex) {
@ -292,5 +295,6 @@ public class TDLibRemoteClient implements AutoCloseable {
@Override @Override
public void close() { public void close() {
clusterManager.asFlux().blockFirst(); clusterManager.asFlux().blockFirst();
deploymentScheduler.dispose();
} }
} }

View File

@ -149,6 +149,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
if (msg.succeeded()) { if (msg.succeeded()) {
this.listen() this.listen()
.timeout(Duration.ofSeconds(30)) .timeout(Duration.ofSeconds(30))
.subscribeOn(tdMiddleScheduler)
.subscribe(v -> {}, future::fail, future::complete); .subscribe(v -> {}, future::fail, future::complete);
} else { } else {
future.fail(msg.cause()); future.fail(msg.cause());
@ -243,7 +244,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
tdClosed.tryEmitNext(true); tdClosed.tryEmitNext(true);
} }
} }
})); })).subscribeOn(tdMiddleScheduler);
} }
@Override @Override
@ -299,6 +300,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
} }
}).switchIfEmpty(Mono.fromSupplier(() -> { }).switchIfEmpty(Mono.fromSupplier(() -> {
return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty")); return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"));
})); })).subscribeOn(tdMiddleScheduler);
} }
} }