This commit is contained in:
Andrea Cavalli 2021-01-24 03:15:45 +01:00
parent 255aacaa4c
commit df116331d7
6 changed files with 145 additions and 40 deletions

View File

@ -80,17 +80,23 @@ public class TDLibRemoteClient implements AutoCloseable {
var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath);
new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses)
var client = new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses);
client
.start()
.block();
// Close vert.x on shutdown
var vertx = client.clusterManager.asMono().block().getVertx();
Runtime.getRuntime().addShutdownHook(new Thread(() -> MonoUtils.toMono(vertx.rxClose()).blockOptional()));
}
public Mono<Void> start() {
var keyStoreOptions = new JksOptions()
var keyStoreOptions = securityInfo == null ? null : new JksOptions()
.setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getKeyStorePassword());
var trustStoreOptions = new JksOptions()
var trustStoreOptions = securityInfo == null ? null : new JksOptions()
.setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getTrustStorePassword());

View File

@ -84,20 +84,22 @@ public class TdClusterManager {
}
public static Mono<TdClusterManager> ofNodes(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set<String> nodesAddresses) {
if (definedNodesCluster.compareAndSet(false, true)) {
var vertxOptions = new VertxOptions();
netInterface = onlyLocal ? "127.0.0.1" : netInterface;
Config cfg;
if (!onlyLocal) {
cfg = new Config();
cfg.setInstanceName("Node-" + new Random().nextLong());
return Mono.defer(() -> {
if (definedNodesCluster.compareAndSet(false, true)) {
var vertxOptions = new VertxOptions();
var netInterfaceF = onlyLocal ? "127.0.0.1" : netInterface;
Config cfg;
if (!onlyLocal) {
cfg = new Config();
cfg.setInstanceName("Node-" + new Random().nextLong());
} else {
cfg = null;
}
return of(cfg, vertxOptions, keyStoreOptions, trustStoreOptions, masterHostname, netInterfaceF, port, nodesAddresses);
} else {
cfg = null;
return Mono.error(new AlreadyBoundException());
}
return of(cfg, vertxOptions, keyStoreOptions, trustStoreOptions, masterHostname, netInterface, port, nodesAddresses);
} else {
return Mono.error(new AlreadyBoundException());
}
});
}
public static Mono<TdClusterManager> of(@Nullable Config cfg,
@ -157,6 +159,8 @@ public class TdClusterManager {
vertxOptions.setClusterManager(null);
}
vertxOptions.setPreferNativeTransport(true);
return Mono
.<Vertx>create(sink -> {
if (mgr != null) {

View File

@ -21,6 +21,7 @@ import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
import it.tdlight.utils.MonoUtils.SinkRWStream;
import java.nio.file.Path;
import java.time.Duration;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,7 +44,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final One<BinlogAsyncFile> binlog = Sinks.one();
SinkRWStream<Message<TdResultList>> updates = MonoUtils.unicastBackpressureStream(1000);
SinkRWStream<Message<TdResultList>> updates = MonoUtils.unicastBackpressureStream(10000);
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> updatesStreamEnd = Sinks.one();
// This will only result in a crash, never completes in other ways
@ -137,10 +138,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
@Override
public Flux<TdApi.Object> receive() {
// Here the updates will be received
return updates
.readAsFlux()
.subscribeOn(Schedulers.single())
return cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive", EMPTY).as(MonoUtils::toMono)
.thenMany(updates.readAsFlux())
.cast(io.vertx.core.eventbus.Message.class)
.timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> {
throw new IllegalStateException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)");
}))
.flatMap(updates -> Flux.fromIterable(((TdResultList) updates.body()).getValues()))
.flatMap(update -> Mono.fromCallable(update::orElseThrow))
.flatMap(this::interceptUpdate)
@ -174,7 +177,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.firstWithSignal(
MonoUtils.castVoid(crash.asMono()),
cluster.getEventBus()
.<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptionsWithTimeout).as(MonoUtils::toMono)
.<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono)
.onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex))
.<TdResult<T>>flatMap(resp -> Mono.fromCallable(() -> {
if (resp.body() == null) {

View File

@ -17,11 +17,9 @@ import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
import it.tdlight.tdlibsession.td.middle.EndSessionMessage;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.utils.BinlogAsyncFile;
import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
import java.time.Duration;
@ -55,6 +53,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private final One<AsyncTdDirectImpl> td = Sinks.one();
private final One<MessageConsumer<ExecuteObject>> executeConsumer = Sinks.one();
private final One<MessageConsumer<byte[]>> readBinlogConsumer = Sinks.one();
private final One<MessageConsumer<byte[]>> readyToReceiveConsumer = Sinks.one();
private final One<MessageConsumer<byte[]>> pingConsumer = Sinks.one();
private final One<Flux<Void>> pipeFlux = Sinks.one();
public AsyncTdMiddleEventBusServer() {
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100);
@ -118,7 +119,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
registrationSink.error(new IllegalStateException("Failed to set executeConsumer"));
return;
}
Flux
.<Message<ExecuteObject>>create(sink -> {
executeConsumer.handler(sink::next);
@ -152,31 +152,64 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
registrationSink.error(new IllegalStateException("Failed to set readBinlogConsumer"));
return;
}
BinlogUtils
.readBinlogConsumer(vertx, readBinlogConsumer, botId, local)
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex));
MessageConsumer<byte[]> readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive");
if (this.readyToReceiveConsumer.tryEmitValue(readyToReceiveConsumer).isFailure()) {
registrationSink.error(new IllegalStateException("Failed to set readyToReceiveConsumer"));
return;
}
Flux
.<Message<byte[]>>create(sink -> {
readBinlogConsumer.handler(sink::next);
readBinlogConsumer.endHandler(h -> sink.complete());
readyToReceiveConsumer.handler(sink::next);
readyToReceiveConsumer.endHandler(h -> sink.complete());
})
.flatMap(req -> BinlogUtils
.retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId))
.flatMap(BinlogAsyncFile::readFullyBytes)
.single()
.map(binlog -> Tuples.of(req, binlog))
)
.flatMap(msg -> this.pipeFlux
.asMono()
.timeout(Duration.ofSeconds(5))
.map(pipeFlux -> Tuples.of(msg, pipeFlux)))
.doOnNext(tuple -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts);
tuple.getT1().reply(EMPTY, opts);
// Start piping the data
//noinspection CallingSubscribeInNonBlockingScope
tuple.getT2()
.subscribeOn(Schedulers.single())
.subscribe();
})
.then()
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex));
.subscribe(v -> {}, ex -> logger.error("Error when processing a ready-to-receive request", ex));
MessageConsumer<byte[]> pingConsumer = vertx.eventBus().consumer(botAddress + ".ping");
if (this.pingConsumer.tryEmitValue(pingConsumer).isFailure()) {
registrationSink.error(new IllegalStateException("Failed to set pingConsumer"));
return;
}
Flux
.<Message<byte[]>>create(sink -> {
pingConsumer.handler(sink::next);
pingConsumer.endHandler(h -> sink.complete());
})
.doOnNext(msg -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
msg.reply(EMPTY, opts);
})
.then()
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, ex -> logger.error("Error when processing a ping request", ex));
//noinspection ResultOfMethodCallIgnored
executeConsumer
.rxCompletionHandler()
.andThen(readBinlogConsumer.rxCompletionHandler())
.andThen(readyToReceiveConsumer.rxCompletionHandler())
.andThen(pingConsumer.rxCompletionHandler())
.subscribeOn(io.reactivex.schedulers.Schedulers.single())
.subscribe(registrationSink::success, registrationSink::error);
});
@ -209,6 +242,26 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.asMono()
.timeout(Duration.ofSeconds(5), Mono.empty())
.flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)))
.then(readBinlogConsumer
.asMono()
.timeout(Duration.ofSeconds(10), Mono.empty())
.doOnNext(ec -> Mono
// ReadBinLog will live for another 30 minutes.
// Since every consumer of ReadBinLog is identical, this should not pose a problem.
.delay(Duration.ofMinutes(30))
.then(ec.rxUnregister().as(MonoUtils::toMono))
.subscribeOn(Schedulers.single())
.subscribe()
)
)
.then(readyToReceiveConsumer
.asMono()
.timeout(Duration.ofSeconds(5), Mono.empty())
.flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)))
.then(pingConsumer
.asMono()
.timeout(Duration.ofSeconds(5), Mono.empty())
.flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)))
.doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex))
.doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped"))));
}
@ -254,9 +307,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.eventBus()
.sender(botAddress + ".updates", opts);
//noinspection CallingSubscribeInNonBlockingScope
updatesFlux
.flatMap(update -> updatesSender.rxWrite(update).as(MonoUtils::toMono))
var pipeFlux = updatesFlux
.flatMap(update -> updatesSender.rxWrite(update).as(MonoUtils::toMono).then())
.doOnTerminate(() -> updatesSender.close(h -> {
if (h.failed()) {
logger.error("Failed to close \"updates\" message sender");
@ -267,9 +319,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
return td.execute(new TdApi.Close(), false)
.doOnError(ex2 -> logger.error("Unexpected error", ex2))
.then();
})
.subscribeOn(Schedulers.single())
.subscribe();
});
MonoUtils.emitValue(this.pipeFlux, pipeFlux);
return Mono.empty();
}
}

View File

@ -1,11 +1,18 @@
package it.tdlight.utils;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.file.OpenOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import io.vertx.reactivex.core.file.FileSystem;
import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient;
import it.tdlight.tdlibsession.td.middle.EndSessionMessage;
import java.nio.file.Path;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import org.slf4j.Logger;
@ -13,6 +20,7 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class BinlogUtils {
@ -88,4 +96,26 @@ public class BinlogUtils {
value *= Long.signum(bytes);
return String.format("%.1f %ciB", value / 1024.0, ci.current());
}
public static Mono<Void> readBinlogConsumer(Vertx vertx,
MessageConsumer<byte[]> readBinlogConsumer,
int botId,
boolean local) {
return Flux
.<Message<byte[]>>create(sink -> {
readBinlogConsumer.handler(sink::next);
readBinlogConsumer.endHandler(h -> sink.complete());
})
.flatMap(req -> BinlogUtils
.retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId))
.flatMap(BinlogAsyncFile::readFullyBytes)
.single()
.map(binlog -> Tuples.of(req, binlog))
)
.doOnNext(tuple -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts);
})
.then();
}
}

View File

@ -498,7 +498,18 @@ public class MonoUtils {
@Override
public void end(Handler<AsyncResult<Void>> handler) {
MonoUtils.emitCompleteFuture(sink).onComplete(h -> {
MonoUtils.emitCompleteFuture(sink).recover(error -> {
if (error instanceof EmissionException) {
var sinkError = (EmissionException) error;
switch (sinkError.getReason()) {
case FAIL_CANCELLED:
case FAIL_ZERO_SUBSCRIBER:
case FAIL_TERMINATED:
return Future.succeededFuture();
}
}
return Future.failedFuture(error);
}).onComplete(h -> {
if (drainSubscription != null) {
drainSubscription.dispose();
}