diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java index 225a197..bc5a4ea 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/SecurityInfo.java @@ -30,16 +30,17 @@ public class SecurityInfo { return keyStorePasswordPath; } - public String getKeyStorePassword() { + public String getKeyStorePassword(boolean required) { try { if (Files.isReadable(keyStorePasswordPath) && Files.size(keyStorePasswordPath) >= 6) { return Files.readString(keyStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0]; - } else { + } else if (required) { throw new NoSuchElementException("No keystore password is set on '" + keyStorePasswordPath.toString() + "'"); } } catch (IOException ex) { throw new FileSystemException(ex); } + return null; } public Path getTrustStorePath() { @@ -50,16 +51,17 @@ public class SecurityInfo { return trustStorePasswordPath; } - public String getTrustStorePassword() { + public String getTrustStorePassword(boolean required) { try { if (Files.isReadable(trustStorePasswordPath) && Files.size(trustStorePasswordPath) >= 6) { return Files.readString(trustStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0]; - } else { + } else if (required) { throw new NoSuchElementException("No truststore password is set on '" + trustStorePasswordPath.toString() + "'"); } } catch (IOException ex) { throw new FileSystemException(ex); } + return null; } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index a7a812b..0743004 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -21,6 +21,7 @@ import java.nio.file.Paths; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -35,6 +36,7 @@ public class TDLibRemoteClient implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(TDLibRemoteClient.class); + @Nullable private final SecurityInfo securityInfo; private final String masterHostname; private final String netInterface; @@ -51,7 +53,7 @@ public class TDLibRemoteClient implements AutoCloseable { || System.getProperty("idea.test.cyclic.buffer.size") != null; } - public TDLibRemoteClient(SecurityInfo securityInfo, + public TDLibRemoteClient(@Nullable SecurityInfo securityInfo, String masterHostname, String netInterface, int port, @@ -124,13 +126,15 @@ public class TDLibRemoteClient implements AutoCloseable { } public Mono start() { - var keyStoreOptions = securityInfo == null ? null : new JksOptions() + var ksp = securityInfo == null ? null : securityInfo.getKeyStorePassword(false); + var keyStoreOptions = securityInfo == null || ksp == null ? null : new JksOptions() .setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString()) - .setPassword(securityInfo.getKeyStorePassword()); + .setPassword(ksp); - var trustStoreOptions = securityInfo == null ? null : new JksOptions() + var tsp = securityInfo == null ? null : securityInfo.getTrustStorePassword(false); + var trustStoreOptions = securityInfo == null || tsp == null ? null : new JksOptions() .setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString()) - .setPassword(securityInfo.getTrustStorePassword()); + .setPassword(tsp); return MonoUtils .fromBlockingEmpty(() -> { @@ -144,6 +148,8 @@ public class TDLibRemoteClient implements AutoCloseable { logger.info( "TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname); + logger.info( + "TDLib remote client SSL enabled: " + (keyStoreOptions != null && trustStoreOptions != null)); }) .then(TdClusterManager.ofNodes(keyStoreOptions, trustStoreOptions, diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/ScannerParameterRequestHandler.java b/src/main/java/it/tdlight/tdlibsession/td/easy/ScannerParameterRequestHandler.java index e86d746..b8f0f8a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/ScannerParameterRequestHandler.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/ScannerParameterRequestHandler.java @@ -42,6 +42,6 @@ public class ScannerParameterRequestHandler implements ParameterRequestHandler { } else { return result; } - }).publishOn(Schedulers.boundedElastic()); + }).subscribeOn(Schedulers.boundedElastic()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index 7ef71b8..fa8b932 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -65,7 +65,7 @@ public class TdClusterManager { } } - public static Mono ofMaster(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { + public static Mono ofMaster(@Nullable JksOptions keyStoreOptions, @Nullable JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { if (definedMasterCluster.compareAndSet(false, true)) { var vertxOptions = new VertxOptions(); netInterface = onlyLocal ? "127.0.0.1" : netInterface; @@ -84,7 +84,7 @@ public class TdClusterManager { } } - public static Mono ofNodes(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { + public static Mono ofNodes(@Nullable JksOptions keyStoreOptions, @Nullable JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { return Mono.defer(() -> { if (definedNodesCluster.compareAndSet(false, true)) { var vertxOptions = new VertxOptions(); @@ -105,8 +105,8 @@ public class TdClusterManager { public static Mono of(@Nullable Config cfg, VertxOptions vertxOptions, - JksOptions keyStoreOptions, - JksOptions trustStoreOptions, + @Nullable JksOptions keyStoreOptions, + @Nullable JksOptions trustStoreOptions, String masterHostname, String netInterface, int port, @@ -149,15 +149,21 @@ public class TdClusterManager { //vertxOptions.getEventBusOptions().setSsl(false); vertxOptions.getEventBusOptions().setSslHandshakeTimeout(120000).setSslHandshakeTimeoutUnit(TimeUnit.MILLISECONDS); - vertxOptions.getEventBusOptions().setKeyStoreOptions(keyStoreOptions); - vertxOptions.getEventBusOptions().setTrustStoreOptions(trustStoreOptions); + if (keyStoreOptions != null && trustStoreOptions != null) { + vertxOptions.getEventBusOptions().setKeyStoreOptions(keyStoreOptions); + vertxOptions.getEventBusOptions().setTrustStoreOptions(trustStoreOptions); + vertxOptions + .getEventBusOptions() + .setUseAlpn(true) + .setSsl(true) + .setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2")); + } else { + vertxOptions + .getEventBusOptions() + .setSsl(false); + } vertxOptions.getEventBusOptions().setHost(masterHostname); vertxOptions.getEventBusOptions().setPort(port + 1); - vertxOptions - .getEventBusOptions() - .setUseAlpn(true) - .setSsl(true) - .setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2")); vertxOptions.getEventBusOptions().setClientAuth(ClientAuth.REQUIRED); } else { mgr = null; @@ -187,12 +193,10 @@ public class TdClusterManager { return Mono.just(Vertx.vertx(vertxOptions)); } }) - .subscribeOn(Schedulers.boundedElastic()) .flatMap(vertx -> Mono .fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx)) .subscribeOn(Schedulers.boundedElastic()) - ) - .publishOn(Schedulers.parallel()); + ); } public Vertx getVertx() { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index fe65c50..c914d6c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -141,8 +141,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .subscribeOn(Schedulers.boundedElastic()); })) .then(setupPing()); - }) - .publishOn(Schedulers.parallel()); + }); } private Mono setupPing() { @@ -206,9 +205,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono .fromRunnable(() -> logger.trace("Called receive() from parent")) - .then(updates.asMono().publishOn(Schedulers.parallel())) - .timeout(Duration.ofSeconds(5)) + .then(updates.asMono()) .publishOn(Schedulers.parallel()) + .timeout(Duration.ofSeconds(5)) .flatMap(MonoUtils::fromMessageConsumer) .flatMapMany(registration -> Mono .fromRunnable(() -> logger.trace("Registering updates flux")) @@ -240,7 +239,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); } }) - .publishOn(Schedulers.parallel()) .flatMapSequential(this::interceptUpdate) // Redirect errors to crash sink .doOnError(crash::tryEmitError) @@ -249,8 +247,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { return Mono.empty(); }) - .doOnTerminate(updatesStreamEnd::tryEmitEmpty) - .publishOn(Schedulers.parallel()); + .doOnTerminate(updatesStreamEnd::tryEmitEmpty); } private Mono interceptUpdate(TdApi.Object update) { @@ -266,7 +263,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) - .publishOn(Schedulers.parallel()) .thenReturn(update); } break; @@ -297,7 +293,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { ) .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); - }))) - .publishOn(Schedulers.parallel()); + }))); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index c33bc17..2849c04 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -98,7 +98,6 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd public Mono> execute(Function requestFunction, boolean executeDirectly) { return td .execute(requestFunction, executeDirectly) - .onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error)) - .publishOn(Schedulers.parallel()); + .onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error)); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index b312a63..c99739b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -107,9 +107,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); }) - .flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel())) + .flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic())) .doOnSuccess(s -> logger.trace("Stated verticle")) - .publishOn(Schedulers.parallel()) ); } @@ -258,8 +257,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, registrationSink::error, registrationSink::success); }) - .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.parallel()); + .subscribeOn(Schedulers.boundedElastic()); } /** @@ -298,10 +296,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { // Since every consumer of ReadBinLog is identical, this should not pose a problem. .delay(Duration.ofMinutes(30)) .then(ec.rxUnregister().as(MonoUtils::toMono)) - .publishOn(Schedulers.parallel()) .subscribe(); return null; - }).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel())) + }).subscribeOn(Schedulers.boundedElastic())) ) .then(readyToReceiveConsumer .asMono() @@ -314,7 +311,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex)) .doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped")) ) - .publishOn(Schedulers.parallel()) ); } diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index edebb76..edc8f05 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -477,7 +477,7 @@ public class MonoUtils { } return this; - }).publishOn(Schedulers.boundedElastic()); + }).subscribeOn(Schedulers.boundedElastic()); } public static Mono> create(Many sink, @@ -636,7 +636,7 @@ public class MonoUtils { } public Flux readAsFlux() { - return flux.publishOn(Schedulers.parallel()); + return flux; } public ReactiveReactorReadStream readAsStream() {