This commit is contained in:
Andrea Cavalli 2021-01-23 22:33:52 +01:00
parent 355b419736
commit 82ba0bcd39
6 changed files with 202 additions and 42 deletions

View File

@ -95,7 +95,7 @@ public class TDLibRemoteClient implements AutoCloseable {
.setPassword(securityInfo.getTrustStorePassword());
return MonoUtils
.fromBlockingSingle(() -> {
.fromBlockingMaybe(() -> {
// Set verbosity level here, before creating the bots
if (Files.notExists(Paths.get("logs"))) {
try {
@ -123,6 +123,7 @@ public class TDLibRemoteClient implements AutoCloseable {
clusterManager.tryEmitEmpty();
}
})
.single()
.flatMap(clusterManager -> {
MessageConsumer<StartSessionMessage> startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot");
startBotConsumer.handler(msg -> {
@ -137,10 +138,12 @@ public class TDLibRemoteClient implements AutoCloseable {
var verticle = new AsyncTdMiddleEventBusServer();
// Binlog path
var blPath = Paths.get(".sessions-cache").resolve("id" + req.id()).resolve("td.binlog");
var sessPath = getSessionDirectory(req.id());
var blPath = getSessionBinlogDirectory(req.id());
BinlogUtils
.chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
.then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath))
.then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
.subscribeOn(Schedulers.single())
.subscribe(
@ -152,11 +155,23 @@ public class TDLibRemoteClient implements AutoCloseable {
() -> msg.reply(new byte[0])
);
});
return Mono.empty();
return startBotConsumer.rxCompletionHandler().as(MonoUtils::toMono);
})
.then();
}
public static Path getSessionDirectory(int botId) {
return Paths.get(".sessions-cache").resolve("id" + botId);
}
public static Path getMediaDirectory(int botId) {
return Paths.get(".cache").resolve("media").resolve("id" + botId);
}
public static Path getSessionBinlogDirectory(int botId) {
return getSessionDirectory(botId).resolve("td.binlog");
}
@Override
public void close() {
this.clusterManager

View File

@ -8,7 +8,7 @@ public final class EndSessionMessage {
private final int id;
private final byte[] binlog;
EndSessionMessage(int id, byte[] binlog) {
public EndSessionMessage(int id, byte[] binlog) {
this.id = id;
this.binlog = binlog;
}

View File

@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.td.middle.client;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.tdlibsession.td.ResponseError;
@ -10,6 +11,7 @@ import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.EndSessionMessage;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.StartSessionMessage;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
@ -19,6 +21,7 @@ import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
import it.tdlight.utils.MonoUtils.SinkRWStream;
import java.nio.file.Path;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@ -65,11 +68,19 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
Path binlogsArchiveDirectory) {
var instance = new AsyncTdMiddleEventBusClient(clusterManager);
return retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId)
.single()
.flatMap(binlog -> instance
.flatMap(binlog -> binlog
.getLastModifiedTime()
.filter(modTime -> modTime == 0)
.doOnNext(v -> LoggerFactory
.getLogger(AsyncTdMiddleEventBusClient.class)
.error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one..."))
.thenReturn(binlog)
)
.<AsyncTdMiddle>flatMap(binlog -> instance
.start(botId, botAlias, local, binlog)
.thenReturn(instance)
);
)
.single();
}
/**
@ -108,7 +119,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
private Mono<Void> setupUpdatesListener() {
var updateConsumer = cluster.getEventBus().<TdResultList>consumer(botAddress + ".update");
MessageConsumer<TdResultList> updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(botAddress + ".updates").getDelegate());
updateConsumer.endHandler(h -> {
logger.error("<<<<<<<<<<<<<<<<EndHandler?>>>>>>>>>>>>>");
});
@ -129,12 +140,33 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return updates
.readAsFlux()
.subscribeOn(Schedulers.single())
.flatMap(updates -> Flux.fromIterable(updates.body().getValues()))
.cast(io.vertx.core.eventbus.Message.class)
.flatMap(updates -> Flux.fromIterable(((TdResultList) updates.body()).getValues()))
.flatMap(update -> Mono.fromCallable(update::orElseThrow))
.flatMap(this::interceptUpdate)
.doOnError(crash::tryEmitError)
.doOnTerminate(updatesStreamEnd::tryEmitEmpty);
}
private Publisher<TdApi.Object> interceptUpdate(TdApi.Object update) {
switch (update.getConstructor()) {
case TdApi.UpdateAuthorizationState.CONSTRUCTOR:
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return cluster
.getEventBus()
.<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)
.doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length)))
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog()))
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
.thenReturn(update);
}
break;
}
return Mono.just(update);
}
@Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) {
var req = new ExecuteObject(executeDirectly, request);
@ -152,8 +184,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
}
}))
)
.switchIfEmpty(Mono.fromCallable(() -> {
.switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> {
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty"));
}));
})));
}
}

View File

@ -9,14 +9,20 @@ import io.vertx.reactivex.core.eventbus.MessageProducer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.SetTdlibParameters;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
import it.tdlight.tdlibsession.td.middle.EndSessionMessage;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.utils.BinlogAsyncFile;
import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
import java.time.Duration;
import java.util.Collections;
@ -48,6 +54,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
// Variables configured at startup
private final One<AsyncTdDirectImpl> td = Sinks.one();
private final One<MessageConsumer<ExecuteObject>> executeConsumer = Sinks.one();
private final One<MessageConsumer<byte[]>> readBinlogConsumer = Sinks.one();
public AsyncTdMiddleEventBusServer() {
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100);
@ -87,15 +94,15 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
if (this.td.tryEmitValue(td).isFailure()) {
throw new IllegalStateException("Failed to set td instance");
}
return onSuccessfulStartRequest(td, botAddress, botAlias, local);
return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local);
})
.flatMap(Mono::hide));
}
private Mono<Void> onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) {
private Mono<Void> onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
return this
.listen(td, botAddress, botAlias, local)
.then(this.pipe(td, botAddress, botAlias, local))
.listen(td, botAddress, botAlias, botId, local)
.then(this.pipe(td, botAddress, botAlias, botId, local))
.doOnSuccess(s -> {
logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded");
})
@ -104,7 +111,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
});
}
private Mono<Void> listen(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) {
private Mono<Void> listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
return Mono.<Void>create(registrationSink -> {
MessageConsumer<ExecuteObject> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute");
if (this.executeConsumer.tryEmitValue(executeConsumer).isFailure()) {
@ -112,13 +119,17 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
return;
}
Flux.<Message<ExecuteObject>>create(sink -> {
executeConsumer.handler(sink::next);
executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})
.flatMap(msg -> td
.execute(msg.body().getRequest(), msg.body().isExecuteDirectly())
.map(result -> Tuples.of(msg, result)))
Flux
.<Message<ExecuteObject>>create(sink -> {
executeConsumer.handler(sink::next);
executeConsumer.endHandler(h -> sink.complete());
})
.flatMap(msg -> {
var request = overrideRequest(msg.body().getRequest(), botId);
return td
.execute(request, msg.body().isExecuteDirectly())
.map(result -> Tuples.of(msg, result));
})
.handle((tuple, sink) -> {
var msg = tuple.getT1();
var response = tuple.getT2();
@ -133,13 +144,60 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
})
.then()
.doOnError(ex -> {
logger.error("Error when processing a request", ex);
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, ex -> logger.error("Error when processing an execute request", ex));
MessageConsumer<byte[]> readBinlogConsumer = vertx.eventBus().consumer(botAddress + ".read-binlog");
if (this.readBinlogConsumer.tryEmitValue(readBinlogConsumer).isFailure()) {
registrationSink.error(new IllegalStateException("Failed to set readBinlogConsumer"));
return;
}
Flux
.<Message<byte[]>>create(sink -> {
readBinlogConsumer.handler(sink::next);
readBinlogConsumer.endHandler(h -> sink.complete());
})
.subscribe();
.flatMap(req -> BinlogUtils
.retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId))
.flatMap(BinlogAsyncFile::readFullyBytes)
.single()
.map(binlog -> Tuples.of(req, binlog))
)
.doOnNext(tuple -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts);
})
.then()
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex));
//noinspection ResultOfMethodCallIgnored
executeConsumer
.rxCompletionHandler()
.andThen(readBinlogConsumer.rxCompletionHandler())
.subscribeOn(io.reactivex.schedulers.Schedulers.single())
.subscribe(registrationSink::success, registrationSink::error);
});
}
/**
* Override some requests
*/
private Function overrideRequest(Function request, int botId) {
switch (request.getConstructor()) {
case SetTdlibParameters.CONSTRUCTOR:
// Fix session directory locations
var setTdlibParamsObj = (SetTdlibParameters) request;
setTdlibParamsObj.parameters.databaseDirectory = TDLibRemoteClient.getSessionDirectory(botId).toString();
setTdlibParamsObj.parameters.filesDirectory = TDLibRemoteClient.getMediaDirectory(botId).toString();
return request;
default:
return request;
}
}
@Override
public Completable rxStop() {
return MonoUtils.toCompletable(botAlias
@ -155,7 +213,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped"))));
}
private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) {
private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
Flux<TdResultList> updatesFlux = td
.receive(tdOptions)
.flatMap(item -> Mono.defer(() -> {

View File

@ -4,10 +4,14 @@ import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.AsyncFile;
import io.vertx.reactivex.core.file.FileProps;
import io.vertx.reactivex.core.file.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
public class BinlogAsyncFile {
private static final Logger logger = LoggerFactory.getLogger(BinlogAsyncFile.class);
private final FileSystem filesystem;
private final String path;
private final AsyncFile file;
@ -25,6 +29,7 @@ public class BinlogAsyncFile {
.as(MonoUtils::toMono)
.flatMap(size -> {
var buf = Buffer.buffer(size);
logger.debug("Reading binlog from disk. Size: " + BinlogUtils.humanReadableByteCountBin(size));
return file.rxRead(buf, 0, 0, size).as(MonoUtils::toMono).thenReturn(buf);
});
}
@ -38,10 +43,16 @@ public class BinlogAsyncFile {
}
public Mono<Void> overwrite(Buffer newData) {
return file.rxWrite(newData, 0)
.andThen(file.rxFlush())
.andThen(filesystem.rxTruncate(path, newData.length()))
.as(MonoUtils::toMono);
return getSize()
.doOnNext(size -> logger.debug("Preparing to overwrite binlog. Initial size: " + BinlogUtils.humanReadableByteCountBin(size)))
.then(file.rxWrite(newData, 0)
.andThen(file.rxFlush())
.andThen(filesystem.rxTruncate(path, newData.length()))
.as(MonoUtils::toMono)
)
.then(getSize())
.doOnNext(size -> logger.debug("Overwritten binlog. Final size: " + BinlogUtils.humanReadableByteCountBin(size)))
.then();
}
public Mono<Void> overwrite(byte[] newData) {
@ -59,7 +70,14 @@ public class BinlogAsyncFile {
public Mono<Long> getLastModifiedTime() {
return filesystem
.rxProps(path)
.map(FileProps::lastModifiedTime)
.map(fileProps -> fileProps.size() == 0 ? 0 : fileProps.lastModifiedTime())
.as(MonoUtils::toMono);
}
public Mono<Long> getSize() {
return filesystem
.rxProps(path)
.map(FileProps::size)
.as(MonoUtils::toMono);
}
}

View File

@ -4,8 +4,13 @@ import io.vertx.core.file.OpenOptions;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.FileSystem;
import java.nio.file.Path;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import java.time.Instant;
import java.time.ZoneOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
@ -13,17 +18,20 @@ public class BinlogUtils {
private static final Logger logger = LoggerFactory.getLogger(BinlogUtils.class);
/**
*
* @return optional result
*/
public static Mono<BinlogAsyncFile> retrieveBinlog(FileSystem vertxFilesystem, Path binlogPath) {
var path = binlogPath.toString();
var openOptions = new OpenOptions().setWrite(true).setRead(true).setCreate(false).setDsync(true);
return vertxFilesystem.rxExists(path).filter(exists -> exists)
.flatMapSingle(x -> vertxFilesystem.rxOpen(path, openOptions))
return vertxFilesystem
// Create file if not exist to avoid errors
.rxExists(path).filter(exists -> exists).as(MonoUtils::toMono)
.switchIfEmpty(Mono.defer(() -> vertxFilesystem.rxMkdirs(binlogPath.getParent().toString()).as(MonoUtils::toMono))
.then(vertxFilesystem.rxCreateFile(path).as(MonoUtils::toMono))
.thenReturn(true)
)
// Open file
.flatMap(x -> vertxFilesystem.rxOpen(path, openOptions).as(MonoUtils::toMono))
.map(file -> new BinlogAsyncFile(vertxFilesystem, path, file))
.as(MonoUtils::toMono);
.single();
}
public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, byte[] data) {
@ -40,15 +48,44 @@ public class BinlogUtils {
.just(binlog)
.zipWith(binlog.getLastModifiedTime())
)
.doOnSuccess(s -> logger.info("Local binlog: " + binlogPath + ". Local date: " + Instant.ofEpochMilli(s == null ? 0 : s.getT2()).atZone(ZoneOffset.UTC).toString() + " Remote date: " + Instant.ofEpochMilli(remoteBinlogDate).atZone(ZoneOffset.UTC).toString()))
// Files older than the remote file will be overwritten
.filter(tuple -> tuple.getT2() < remoteBinlogDate)
.filter(tuple -> tuple.getT2() >= remoteBinlogDate)
.doOnNext(v -> logger.info("Using local binlog: " + binlogPath))
.map(Tuple2::getT1)
.switchIfEmpty(Mono
.fromRunnable(() -> logger.info("Overwriting local binlog: " + binlogPath))
.switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> logger.info("Using remote binlog. Overwriting " + binlogPath)))
.then(vertxFilesystem.rxWriteFile(path, Buffer.buffer(remoteBinlog)).as(MonoUtils::toMono))
.then(retrieveBinlog(vertxFilesystem, binlogPath))
)
.single()
.then();
}
public static Mono<Void> cleanSessionPath(FileSystem vertxFilesystem, Path binlogPath, Path sessionPath) {
return vertxFilesystem
.rxReadFile(binlogPath.toString()).as(MonoUtils::toMono)
.flatMap(buffer -> vertxFilesystem
.rxReadDir(sessionPath.toString(), "^(?!td.binlog$).*").as(MonoUtils::toMono)
.flatMapMany(Flux::fromIterable)
.doOnNext(file -> logger.debug("Deleting file {}", file))
.flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono))
.onErrorResume(ex -> Mono.empty())
.then()
);
}
public static String humanReadableByteCountBin(long bytes) {
long absB = bytes == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(bytes);
if (absB < 1024) {
return bytes + " B";
}
long value = absB;
CharacterIterator ci = new StringCharacterIterator("KMGTPE");
for (int i = 40; i >= 0 && absB > 0xfffccccccccccccL >> i; i -= 10) {
value >>= 10;
ci.next();
}
value *= Long.signum(bytes);
return String.format("%.1f %ciB", value / 1024.0, ci.current());
}
}