Update SecurityInfo

This commit is contained in:
Andrea Cavalli 2021-02-14 22:59:20 +01:00
parent 601c15d8ee
commit 1abbfcb19b
8 changed files with 47 additions and 45 deletions

View File

@ -30,16 +30,17 @@ public class SecurityInfo {
return keyStorePasswordPath;
}
public String getKeyStorePassword() {
public String getKeyStorePassword(boolean required) {
try {
if (Files.isReadable(keyStorePasswordPath) && Files.size(keyStorePasswordPath) >= 6) {
return Files.readString(keyStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0];
} else {
} else if (required) {
throw new NoSuchElementException("No keystore password is set on '" + keyStorePasswordPath.toString() + "'");
}
} catch (IOException ex) {
throw new FileSystemException(ex);
}
return null;
}
public Path getTrustStorePath() {
@ -50,16 +51,17 @@ public class SecurityInfo {
return trustStorePasswordPath;
}
public String getTrustStorePassword() {
public String getTrustStorePassword(boolean required) {
try {
if (Files.isReadable(trustStorePasswordPath) && Files.size(trustStorePasswordPath) >= 6) {
return Files.readString(trustStorePasswordPath, StandardCharsets.UTF_8).split("\n")[0];
} else {
} else if (required) {
throw new NoSuchElementException("No truststore password is set on '" + trustStorePasswordPath.toString() + "'");
}
} catch (IOException ex) {
throw new FileSystemException(ex);
}
return null;
}
@Override

View File

@ -21,6 +21,7 @@ import java.nio.file.Paths;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@ -35,6 +36,7 @@ public class TDLibRemoteClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TDLibRemoteClient.class);
@Nullable
private final SecurityInfo securityInfo;
private final String masterHostname;
private final String netInterface;
@ -51,7 +53,7 @@ public class TDLibRemoteClient implements AutoCloseable {
|| System.getProperty("idea.test.cyclic.buffer.size") != null;
}
public TDLibRemoteClient(SecurityInfo securityInfo,
public TDLibRemoteClient(@Nullable SecurityInfo securityInfo,
String masterHostname,
String netInterface,
int port,
@ -124,13 +126,15 @@ public class TDLibRemoteClient implements AutoCloseable {
}
public Mono<Void> start() {
var keyStoreOptions = securityInfo == null ? null : new JksOptions()
var ksp = securityInfo == null ? null : securityInfo.getKeyStorePassword(false);
var keyStoreOptions = securityInfo == null || ksp == null ? null : new JksOptions()
.setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getKeyStorePassword());
.setPassword(ksp);
var trustStoreOptions = securityInfo == null ? null : new JksOptions()
var tsp = securityInfo == null ? null : securityInfo.getTrustStorePassword(false);
var trustStoreOptions = securityInfo == null || tsp == null ? null : new JksOptions()
.setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getTrustStorePassword());
.setPassword(tsp);
return MonoUtils
.fromBlockingEmpty(() -> {
@ -144,6 +148,8 @@ public class TDLibRemoteClient implements AutoCloseable {
logger.info(
"TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname);
logger.info(
"TDLib remote client SSL enabled: " + (keyStoreOptions != null && trustStoreOptions != null));
})
.then(TdClusterManager.ofNodes(keyStoreOptions,
trustStoreOptions,

View File

@ -42,6 +42,6 @@ public class ScannerParameterRequestHandler implements ParameterRequestHandler {
} else {
return result;
}
}).publishOn(Schedulers.boundedElastic());
}).subscribeOn(Schedulers.boundedElastic());
}
}

View File

@ -65,7 +65,7 @@ public class TdClusterManager {
}
}
public static Mono<TdClusterManager> ofMaster(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set<String> nodesAddresses) {
public static Mono<TdClusterManager> ofMaster(@Nullable JksOptions keyStoreOptions, @Nullable JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set<String> nodesAddresses) {
if (definedMasterCluster.compareAndSet(false, true)) {
var vertxOptions = new VertxOptions();
netInterface = onlyLocal ? "127.0.0.1" : netInterface;
@ -84,7 +84,7 @@ public class TdClusterManager {
}
}
public static Mono<TdClusterManager> ofNodes(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set<String> nodesAddresses) {
public static Mono<TdClusterManager> ofNodes(@Nullable JksOptions keyStoreOptions, @Nullable JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set<String> nodesAddresses) {
return Mono.defer(() -> {
if (definedNodesCluster.compareAndSet(false, true)) {
var vertxOptions = new VertxOptions();
@ -105,8 +105,8 @@ public class TdClusterManager {
public static Mono<TdClusterManager> of(@Nullable Config cfg,
VertxOptions vertxOptions,
JksOptions keyStoreOptions,
JksOptions trustStoreOptions,
@Nullable JksOptions keyStoreOptions,
@Nullable JksOptions trustStoreOptions,
String masterHostname,
String netInterface,
int port,
@ -149,15 +149,21 @@ public class TdClusterManager {
//vertxOptions.getEventBusOptions().setSsl(false);
vertxOptions.getEventBusOptions().setSslHandshakeTimeout(120000).setSslHandshakeTimeoutUnit(TimeUnit.MILLISECONDS);
vertxOptions.getEventBusOptions().setKeyStoreOptions(keyStoreOptions);
vertxOptions.getEventBusOptions().setTrustStoreOptions(trustStoreOptions);
if (keyStoreOptions != null && trustStoreOptions != null) {
vertxOptions.getEventBusOptions().setKeyStoreOptions(keyStoreOptions);
vertxOptions.getEventBusOptions().setTrustStoreOptions(trustStoreOptions);
vertxOptions
.getEventBusOptions()
.setUseAlpn(true)
.setSsl(true)
.setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2"));
} else {
vertxOptions
.getEventBusOptions()
.setSsl(false);
}
vertxOptions.getEventBusOptions().setHost(masterHostname);
vertxOptions.getEventBusOptions().setPort(port + 1);
vertxOptions
.getEventBusOptions()
.setUseAlpn(true)
.setSsl(true)
.setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2"));
vertxOptions.getEventBusOptions().setClientAuth(ClientAuth.REQUIRED);
} else {
mgr = null;
@ -187,12 +193,10 @@ public class TdClusterManager {
return Mono.just(Vertx.vertx(vertxOptions));
}
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(vertx -> Mono
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
.subscribeOn(Schedulers.boundedElastic())
)
.publishOn(Schedulers.parallel());
);
}
public Vertx getVertx() {

View File

@ -141,8 +141,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.subscribeOn(Schedulers.boundedElastic());
}))
.then(setupPing());
})
.publishOn(Schedulers.parallel());
});
}
private Mono<Void> setupPing() {
@ -206,9 +205,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono
.fromRunnable(() -> logger.trace("Called receive() from parent"))
.then(updates.asMono().publishOn(Schedulers.parallel()))
.timeout(Duration.ofSeconds(5))
.then(updates.asMono())
.publishOn(Schedulers.parallel())
.timeout(Duration.ofSeconds(5))
.flatMap(MonoUtils::fromMessageConsumer)
.flatMapMany(registration -> Mono
.fromRunnable(() -> logger.trace("Registering updates flux"))
@ -240,7 +239,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
}
})
.publishOn(Schedulers.parallel())
.flatMapSequential(this::interceptUpdate)
// Redirect errors to crash sink
.doOnError(crash::tryEmitError)
@ -249,8 +247,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono.empty();
})
.doOnTerminate(updatesStreamEnd::tryEmitEmpty)
.publishOn(Schedulers.parallel());
.doOnTerminate(updatesStreamEnd::tryEmitEmpty);
}
private Mono<TdApi.Object> interceptUpdate(TdApi.Object update) {
@ -266,7 +263,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
.publishOn(Schedulers.parallel())
.thenReturn(update);
}
break;
@ -297,7 +293,6 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
)
.switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> {
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty"));
})))
.publishOn(Schedulers.parallel());
})));
}
}

View File

@ -98,7 +98,6 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
public <T extends Object> Mono<TdResult<T>> execute(Function requestFunction, boolean executeDirectly) {
return td
.<T>execute(requestFunction, executeDirectly)
.onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error))
.publishOn(Schedulers.parallel());
.onErrorMap(error -> ResponseError.newResponseError(requestFunction, botAlias, error));
}
}

View File

@ -107,9 +107,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local);
})
.flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel()))
.flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()))
.doOnSuccess(s -> logger.trace("Stated verticle"))
.publishOn(Schedulers.parallel())
);
}
@ -258,8 +257,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.subscribeOn(Schedulers.parallel())
.subscribe(v -> {}, registrationSink::error, registrationSink::success);
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel());
.subscribeOn(Schedulers.boundedElastic());
}
/**
@ -298,10 +296,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
// Since every consumer of ReadBinLog is identical, this should not pose a problem.
.delay(Duration.ofMinutes(30))
.then(ec.rxUnregister().as(MonoUtils::toMono))
.publishOn(Schedulers.parallel())
.subscribe();
return null;
}).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.parallel()))
}).subscribeOn(Schedulers.boundedElastic()))
)
.then(readyToReceiveConsumer
.asMono()
@ -314,7 +311,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex))
.doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped"))
)
.publishOn(Schedulers.parallel())
);
}

View File

@ -477,7 +477,7 @@ public class MonoUtils {
}
return this;
}).publishOn(Schedulers.boundedElastic());
}).subscribeOn(Schedulers.boundedElastic());
}
public static <T> Mono<SinkRWStream<T>> create(Many<T> sink,
@ -636,7 +636,7 @@ public class MonoUtils {
}
public Flux<T> readAsFlux() {
return flux.publishOn(Schedulers.parallel());
return flux;
}
public ReactiveReactorReadStream<T> readAsStream() {