Version 5

This commit is contained in:
Andrea Cavalli 2021-01-23 18:49:21 +01:00
parent 8137067edf
commit 355b419736
22 changed files with 1303 additions and 795 deletions

View File

@ -2,12 +2,11 @@ package it.tdlight.tdlibsession;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.nio.charset.StandardCharsets;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
public class SignalMessageCodec<T> implements MessageCodec<SignalMessage<T>, SignalMessage<T>> {
@ -22,8 +21,8 @@ public class SignalMessageCodec<T> implements MessageCodec<SignalMessage<T>, Sig
@Override
public void encodeToWire(Buffer buffer, SignalMessage<T> t) {
try (var bos = new FastByteArrayOutputStream()) {
try (var dos = new DataOutputStream(bos)) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
switch (t.getSignalType()) {
case ITEM:
dos.writeByte(0x01);
@ -35,11 +34,9 @@ public class SignalMessageCodec<T> implements MessageCodec<SignalMessage<T>, Sig
dos.writeByte(0x03);
break;
default:
throw new UnsupportedOperationException();
throw new IllegalStateException("Unexpected value: " + t.getSignalType());
}
}
bos.trim();
buffer.appendBytes(bos.array);
switch (t.getSignalType()) {
case ITEM:
typeCodec.encodeToWire(buffer, t.getItem());
@ -50,15 +47,13 @@ public class SignalMessageCodec<T> implements MessageCodec<SignalMessage<T>, Sig
buffer.appendBytes(stringBytes);
break;
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
@Override
public SignalMessage<T> decodeFromWire(int pos, Buffer buffer) {
try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) {
try (var dis = new DataInputStream(fis)) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
switch (dis.readByte()) {
case 0x01:
return SignalMessage.onNext(typeCodec.decodeFromWire(pos + 1, buffer));
@ -68,13 +63,10 @@ public class SignalMessageCodec<T> implements MessageCodec<SignalMessage<T>, Sig
case 0x03:
return SignalMessage.onComplete();
default:
throw new UnsupportedOperationException();
throw new IllegalStateException("Unexpected value: " + dis.readByte());
}
}
} catch (IOException ex) {
ex.printStackTrace();
}
return null;
}
@Override

View File

@ -0,0 +1,3 @@
package it.tdlight.tdlibsession.remoteclient;
public class BinlogManager {}

View File

@ -1,17 +1,16 @@
package it.tdlight.tdlibsession.remoteclient;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.reactivex.core.Promise;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.reactivex.core.shareddata.AsyncMap;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import it.tdlight.common.Init;
import it.tdlight.common.utils.CantLoadLibrary;
import it.tdlight.tdlibsession.td.middle.StartSessionMessage;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer;
import java.io.IOException;
import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
@ -22,10 +21,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
public class TDLibRemoteClient implements AutoCloseable {
@ -37,8 +35,10 @@ public class TDLibRemoteClient implements AutoCloseable {
private final String netInterface;
private final int port;
private final Set<String> membersAddresses;
private final Many<TdClusterManager> clusterManager = Sinks.many().replay().latest();
private final Scheduler deploymentScheduler = Schedulers.newSingle("TDLib", false);
private final One<TdClusterManager> clusterManager = Sinks.one();
/**
* Statistic about active deployments count
*/
private final AtomicInteger statsActiveDeployments = new AtomicInteger();
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses) {
@ -80,237 +80,89 @@ public class TDLibRemoteClient implements AutoCloseable {
var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath);
new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses).run(x -> {});
new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses)
.start()
.block();
}
public void start(Handler<Void> startedEventHandler) throws IllegalStateException {
run(startedEventHandler);
}
public Mono<Void> start() {
var keyStoreOptions = new JksOptions()
.setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getKeyStorePassword());
public void run(Handler<Void> startedEventHandler) {
try {
// Set verbosity level here, before creating the bots
if (Files.notExists(Paths.get("logs"))) {
try {
Files.createDirectory(Paths.get("logs"));
} catch (FileAlreadyExistsException ignored) {
}
}
var trustStoreOptions = new JksOptions()
.setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getTrustStorePassword());
logger.info("TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname);
return MonoUtils
.fromBlockingSingle(() -> {
// Set verbosity level here, before creating the bots
if (Files.notExists(Paths.get("logs"))) {
try {
Files.createDirectory(Paths.get("logs"));
} catch (FileAlreadyExistsException ignored) {
}
}
var botAddresses = new RemoteClientBotAddresses(Paths.get("remote_client_bot_addresses.txt"));
botAddresses.values().forEach(botAddress -> logger.info("Bot address is registered on this cluster:" + botAddress));
var keyStoreOptions = new JksOptions()
.setPath(securityInfo.getKeyStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getKeyStorePassword());
var trustStoreOptions = new JksOptions()
.setPath(securityInfo.getTrustStorePath().toAbsolutePath().toString())
.setPassword(securityInfo.getTrustStorePassword());
TdClusterManager.ofNodes(keyStoreOptions,
logger.info(
"TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname);
return null;
})
.then(TdClusterManager.ofNodes(keyStoreOptions,
trustStoreOptions,
false,
masterHostname,
netInterface,
port,
membersAddresses
)
.doOnNext(clusterManager::tryEmitNext)
.doOnTerminate(clusterManager::tryEmitComplete)
.doOnError(clusterManager::tryEmitError)
.flatMapMany(clusterManager -> {
return Flux.create(sink -> {
clusterManager.getSharedData().getClusterWideMap("deployableBotAddresses", mapResult -> {
if (mapResult.succeeded()) {
var deployableBotAddresses = mapResult.result();
))
.doOnNext(clusterManager::tryEmitValue)
.doOnError(clusterManager::tryEmitError)
.doOnSuccess(s -> {
if (s == null) {
clusterManager.tryEmitEmpty();
}
})
.flatMap(clusterManager -> {
MessageConsumer<StartSessionMessage> startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot");
startBotConsumer.handler(msg -> {
clusterManager.getSharedData().getLock("deployment", lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult<Void> putAllResult) -> {
if (putAllResult.succeeded()) {
listenForDeployRequests(botAddresses,
clusterManager,
sink,
deployableBotAddresses
);
} else {
logger.error("Can't update shared map", putAllResult.cause());
sink.error(putAllResult.cause());
}
deploymentLock.release();
});
} else {
logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause());
sink.error(lockAcquisitionResult.cause());
}
});
} else {
logger.error("Can't get shared map", mapResult.cause());
sink.error(mapResult.cause());
}
});
});
})
.doOnError(ex -> {
logger.error(ex.getLocalizedMessage(), ex);
}).subscribeOn(deploymentScheduler).subscribe(i -> {}, e -> {
logger.error("Remote client error", e);
}, () -> startedEventHandler.handle(null));
} catch (IOException ex) {
logger.error("Remote client error", ex);
}
}
StartSessionMessage req = msg.body();
DeploymentOptions deploymentOptions = clusterManager
.newDeploymentOpts()
.setConfig(new JsonObject()
.put("botId", req.id())
.put("botAlias", req.alias())
.put("local", false));
var verticle = new AsyncTdMiddleEventBusServer();
private void listenForDeployRequests(RemoteClientBotAddresses botAddresses,
TdClusterManager clusterManager,
reactor.core.publisher.FluxSink<Object> sink,
AsyncMap<Object, Object> deployableBotAddresses) {
clusterManager.getEventBus().consumer("tdlib.remoteclient.clients.deploy", (Message<String> msg) -> {
// Binlog path
var blPath = Paths.get(".sessions-cache").resolve("id" + req.id()).resolve("td.binlog");
clusterManager.getSharedData().getLock("deployment", lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
var botAddress = msg.body();
if (botAddresses.has(botAddress)) {
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500,
"Failed to deploy existing bot \"" + botAddress + "\": " + deploymentResult
.cause()
.getLocalizedMessage()
BinlogUtils
.chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
.then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
.subscribeOn(Schedulers.single())
.subscribe(
v -> {},
ex -> {
logger.error("Failed to deploy bot verticle", ex);
msg.fail(500, "Failed to deploy bot verticle: " + ex.getMessage());
},
() -> msg.reply(new byte[0])
);
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
botAddresses.putTempAddress(botAddress);
deployableBotAddresses.putIfAbsent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() == null) {
logger.info("Deploying new bot at address \"" + botAddress + "\"");
try {
botAddresses.putAddress(botAddress);
} catch (IOException e) {
logger.error("Can't save bot address \"" + botAddress + "\" to addresses file", e);
}
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500,
"Failed to deploy new bot \"" + botAddress + "\": " + deploymentResult
.cause()
.getLocalizedMessage()
);
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
// Ignore this bot because it's present on another cluster
deploymentLock.release();
}
} else {
logger.error("Can't update shared map", putResult.cause());
sink.error(putResult.cause());
deploymentLock.release();
}
});
}
} else {
logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause());
sink.error(lockAcquisitionResult.cause());
}
});
});
}
private void deployBot(TdClusterManager clusterManager, String botAddress, Handler<AsyncResult<String>> deploymentHandler) {
logger.info("Active deployments: " + statsActiveDeployments.incrementAndGet());
AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager);
var afterStopPromise = Promise.promise();
if (verticle.onAfterStop(handler -> {
afterStopPromise.complete();
handler.complete();
}, true).isFailure()) {
deploymentHandler.handle(Future.failedFuture(new IllegalStateException("Failed to register to event onAfterStop")));
return;
}
if (verticle.onBeforeStop(handler -> {
clusterManager.getSharedData().getLock("deployment", lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
afterStopPromise.future().onComplete(handler2 -> {
deploymentLock.release();
logger.info("Active deployments: " + statsActiveDeployments.decrementAndGet());
});
clusterManager.getSharedData().getClusterWideMap("runningBotAddresses", (AsyncResult<AsyncMap<String, String>> mapResult) -> {
if (mapResult.succeeded()) {
var runningBotAddresses = mapResult.result();
runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() != null) {
handler.complete();
} else {
handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed");
}
} else {
handler.fail(putResult.cause());
}
});
} else {
handler.fail(mapResult.cause());
}
});
} else {
handler.fail(lockAcquisitionResult.cause());
}
});
}, false).isFailure()) {
deploymentHandler.handle(Future.failedFuture(new IllegalStateException("Failed to register to event onBeforeStop")));
return;
}
verticle.start(botAddress, botAddress, false).doOnError(error -> {
logger.error("Can't deploy bot \"" + botAddress + "\"", error);
}).subscribeOn(deploymentScheduler).subscribe(v -> {}, err -> {
deploymentHandler.handle(Future.failedFuture(err));
}, () -> {
deploymentHandler.handle(Future.succeededFuture());
});
}
private void putAllAsync(AsyncMap<Object, Object> sharedMap,
Set<String> valuesToAdd,
Handler<AsyncResult<Void>> resultHandler) {
if (valuesToAdd.isEmpty()) {
resultHandler.handle(Future.succeededFuture());
} else {
var valueToAdd = valuesToAdd.stream().findFirst().get();
valuesToAdd.remove(valueToAdd);
sharedMap.putIfAbsent(valueToAdd, netInterface, result -> {
if (result.succeeded()) {
if (result.result() == null || result.result().equals(netInterface)) {
putAllAsync(sharedMap, valuesToAdd, resultHandler);
} else {
resultHandler.handle(Future.failedFuture(new UnsupportedOperationException("Key already present! Key: \"" + valueToAdd + "\", Value: \"" + result.result() + "\"")));
}
} else {
resultHandler.handle(Future.failedFuture(result.cause()));
}
});
}
return Mono.empty();
})
.then();
}
@Override
public void close() {
clusterManager.asFlux().blockFirst();
deploymentScheduler.dispose();
this.clusterManager
.asMono()
.blockOptional()
.map(TdClusterManager::getVertx)
.ifPresent(v -> v.rxClose().blockingAwait());
}
}

View File

@ -10,13 +10,13 @@ import it.tdlight.jni.TdApi.Ok;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlight.ClientManager;
import it.tdlight.utils.MonoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AsyncTdDirectImpl implements AsyncTdDirect {
@ -24,7 +24,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class);
private final One<TelegramClient> td = Sinks.one();
private final Scheduler tdScheduler = Schedulers.newSingle("TdMain", false);
private final String botAlias;
@ -35,7 +34,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
@Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean synchronous) {
if (synchronous) {
return td.asMono().single().flatMap(td -> Mono.fromCallable(() -> {
return td.asMono().single().flatMap(td -> MonoUtils.fromBlockingSingle(() -> {
if (td != null) {
return TdResult.<T>of(td.execute(request));
} else {
@ -44,7 +43,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
throw new IllegalStateException("TDLib client is destroyed");
}
}).publishOn(Schedulers.boundedElastic()).single()).subscribeOn(tdScheduler);
}));
} else {
return td.asMono().single().flatMap(td -> Mono.<TdResult<T>>create(sink -> {
if (td != null) {
@ -58,7 +57,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
sink.error(new IllegalStateException("TDLib client is destroyed"));
}
}
})).single().subscribeOn(tdScheduler);
})).single();
}
}
@ -90,18 +89,17 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
closedFromTd.tryEmitValue(false);
closedFromTd.asMono()
.doOnNext(isClosedFromTd -> {
if (!isClosedFromTd) {
logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()...");
client.send(new Close(),
result -> logger.warn("Close result: {}", result),
ex -> logger.error("Error when disposing td client", ex)
);
}
.filter(isClosedFromTd -> !isClosedFromTd)
.doOnNext(x -> {
logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()...");
client.send(new Close(),
result -> logger.warn("Close result: {}", result),
ex -> logger.error("Error when disposing td client", ex)
);
})
.subscribeOn(tdScheduler)
.subscribeOn(Schedulers.single())
.subscribe();
});
}).subscribeOn(tdScheduler);
});
}
}

View File

@ -1,10 +0,0 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.AbstractVerticle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AsyncTdMiddleCommon extends AbstractVerticle {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleCommon.class);
}

View File

@ -0,0 +1,45 @@
package it.tdlight.tdlibsession.td.middle;
import java.util.Arrays;
import java.util.Objects;
public final class EndSessionMessage {
private final int id;
private final byte[] binlog;
EndSessionMessage(int id, byte[] binlog) {
this.id = id;
this.binlog = binlog;
}
public int id() {
return id;
}
public byte[] binlog() {
return binlog;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null || obj.getClass() != this.getClass()) {
return false;
}
var that = (EndSessionMessage) obj;
return this.id == that.id && Arrays.equals(this.binlog, that.binlog);
}
@Override
public int hashCode() {
return Objects.hash(id, binlog);
}
@Override
public String toString() {
return "EndSessionMessage[" + "id=" + id + ", " + "binlog=" + Arrays.hashCode(binlog) + ']';
}
}

View File

@ -0,0 +1,56 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
public class EndSessionMessageCodec implements MessageCodec<EndSessionMessage, EndSessionMessage> {
private final String codecName;
public EndSessionMessageCodec() {
super();
this.codecName = "EndSessionMessageCodec";
}
@Override
public void encodeToWire(Buffer buffer, EndSessionMessage t) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
dos.writeInt(t.id());
dos.writeInt(t.binlog().length);
dos.write(t.binlog());
}
}
}
@Override
public EndSessionMessage decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
return new EndSessionMessage(dis.readInt(), dis.readNBytes(dis.readInt()));
}
}
}
@Override
public EndSessionMessage transform(EndSessionMessage t) {
// If a message is sent *locally* across the event bus.
// This sends message just as is
return t;
}
@Override
public String name() {
return codecName;
}
@Override
public byte systemCodecID() {
// Always -1
return -1;
}
}

View File

@ -0,0 +1,78 @@
package it.tdlight.tdlibsession.td.middle;
import java.util.Arrays;
import java.util.Objects;
import java.util.StringJoiner;
public final class StartSessionMessage {
private final int id;
private final String alias;
private final byte[] binlog;
private final long binlogDate;
public StartSessionMessage(int id, String alias, byte[] binlog, long binlogDate) {
this.id = id;
this.alias = alias;
this.binlog = binlog;
this.binlogDate = binlogDate;
}
public int id() {
return id;
}
public String alias() {
return alias;
}
public byte[] binlog() {
return binlog;
}
public long binlogDate() {
return binlogDate;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StartSessionMessage that = (StartSessionMessage) o;
if (id != that.id) {
return false;
}
if (binlogDate != that.binlogDate) {
return false;
}
if (!Objects.equals(alias, that.alias)) {
return false;
}
return Arrays.equals(binlog, that.binlog);
}
@Override
public int hashCode() {
int result = id;
result = 31 * result + (alias != null ? alias.hashCode() : 0);
result = 31 * result + Arrays.hashCode(binlog);
result = 31 * result + (int) (binlogDate ^ (binlogDate >>> 32));
return result;
}
@Override
public String toString() {
return new StringJoiner(", ", StartSessionMessage.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("alias='" + alias + "'")
.add("binlog=" + Arrays.toString(binlog))
.add("binlogDate=" + binlogDate)
.toString();
}
}

View File

@ -0,0 +1,58 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
public class StartSessionMessageCodec implements MessageCodec<StartSessionMessage, StartSessionMessage> {
private final String codecName;
public StartSessionMessageCodec() {
super();
this.codecName = "StartSessionMessageCodec";
}
@Override
public void encodeToWire(Buffer buffer, StartSessionMessage t) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
dos.writeInt(t.id());
dos.writeUTF(t.alias());
dos.writeInt(t.binlog().length);
dos.write(t.binlog());
dos.writeLong(t.binlogDate());
}
}
}
@Override
public StartSessionMessage decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
return new StartSessionMessage(dis.readInt(), dis.readUTF(), dis.readNBytes(dis.readInt()), dis.readLong());
}
}
}
@Override
public StartSessionMessage transform(StartSessionMessage t) {
// If a message is sent *locally* across the event bus.
// This sends message just as is
return t;
}
@Override
public String name() {
return codecName;
}
@Override
public byte systemCodecID() {
// Always -1
return -1;
}
}

View File

@ -43,6 +43,7 @@ public class TdClusterManager {
private final VertxOptions vertxOptions;
private final Vertx vertx;
@SuppressWarnings({"unchecked", "rawtypes"})
public TdClusterManager(ClusterManager mgr, VertxOptions vertxOptions, Vertx vertx) {
this.mgr = mgr;
this.vertxOptions = vertxOptions;
@ -54,7 +55,9 @@ public class TdClusterManager {
.getDelegate()
.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())
.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec())
.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec());
.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec())
.registerDefaultCodec(StartSessionMessage.class, new StartSessionMessageCodec())
.registerDefaultCodec(EndSessionMessage.class, new EndSessionMessageCodec());
for (Class<?> value : ConstructorDetector.getTDConstructorsUnsafe().values()) {
vertx.eventBus().getDelegate().registerDefaultCodec(value, new TdMessageCodec(value));
}

View File

@ -4,11 +4,11 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.tdlight.utils.VertxBufferInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.warp.commonutils.stream.SafeDataInputStream;
public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject, ExecuteObject> {
@ -32,8 +32,8 @@ public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject,
@Override
public ExecuteObject decodeFromWire(int pos, Buffer buffer) {
try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) {
try (var dis = new DataInputStream(fis)) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
return new ExecuteObject(dis.readBoolean(), (Function) TdApi.Deserializer.deserialize(dis));
}
} catch (IOException ex) {

View File

@ -3,11 +3,11 @@ package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.tdlight.utils.VertxBufferInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.warp.commonutils.stream.SafeDataInputStream;
public class TdMessageCodec<T extends TdApi.Object> implements MessageCodec<T, T> {
@ -35,8 +35,8 @@ public class TdMessageCodec<T extends TdApi.Object> implements MessageCodec<T, T
@Override
public T decodeFromWire(int pos, Buffer buffer) {
try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) {
try (var dis = new DataInputStream(fis)) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
//noinspection unchecked
return (T) TdApi.Deserializer.deserialize(dis);
}

View File

@ -7,11 +7,11 @@ import it.tdlight.jni.TdApi.Error;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdResultList> {
@ -22,16 +22,16 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
@Override
public void encodeToWire(Buffer buffer, TdResultList ts) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new DataOutputStream(bos)) {
try (var dos = new SafeDataOutputStream(bos)) {
var t = ts.getValues();
dos.writeInt(t.size());
for (TdResult<TdApi.Object> t1 : t) {
if (t1.succeeded()) {
dos.writeBoolean(true);
t1.result().serialize(dos);
t1.result().serialize(dos.asDataOutputStream());
} else {
dos.writeBoolean(false);
t1.cause().serialize(dos);
t1.cause().serialize(dos.asDataOutputStream());
}
}
}
@ -42,8 +42,8 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
@Override
public TdResultList decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos, buffer.length())) {
try (var dis = new DataInputStream(fis)) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
var size = dis.readInt();
ArrayList<TdResult<TdApi.Object>> list = new ArrayList<>(size);
for (int i = 0; i < size; i++) {

View File

@ -4,11 +4,11 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.io.IOException;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
@SuppressWarnings("rawtypes")
public class TdResultMessageCodec implements MessageCodec<TdResultMessage, TdResultMessage> {
@ -22,18 +22,16 @@ public class TdResultMessageCodec implements MessageCodec<TdResultMessage, TdRes
@Override
public void encodeToWire(Buffer buffer, TdResultMessage t) {
try (var bos = new FastByteArrayOutputStream()) {
try (var dos = new DataOutputStream(bos)) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
if (t.value != null) {
dos.writeBoolean(true);
t.value.serialize(dos);
t.value.serialize(dos.asDataOutputStream());
} else {
dos.writeBoolean(false);
t.cause.serialize(dos);
t.cause.serialize(dos.asDataOutputStream());
}
}
bos.trim();
buffer.appendBytes(bos.array);
} catch (IOException ex) {
ex.printStackTrace();
}
@ -41,8 +39,8 @@ public class TdResultMessageCodec implements MessageCodec<TdResultMessage, TdRes
@Override
public TdResultMessage decodeFromWire(int pos, Buffer buffer) {
try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) {
try (var dis = new DataInputStream(fis)) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
if (dis.readBoolean()) {
return new TdResultMessage(TdApi.Deserializer.deserialize(dis), null);
} else {

View File

@ -1,281 +1,159 @@
package it.tdlight.tdlibsession.td.middle.client;
import io.reactivex.Completable;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.eventbus.Message;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.EventBusFlux;
import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.StartSessionMessage;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.utils.BinlogAsyncFile;
import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Objects;
import it.tdlight.utils.MonoUtils.SinkRWStream;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.error.InitializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle {
public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class );
private Logger logger;
public static final boolean OUTPUT_REQUESTS = false;
public static final byte[] EMPTY = new byte[0];
private final TdClusterManager cluster;
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
private final Empty<Void> tdCloseRequested = Sinks.empty();
private final Empty<Void> tdClosed = Sinks.empty();
private final One<Error> tdCrashed = Sinks.one();
private final One<BinlogAsyncFile> binlog = Sinks.one();
SinkRWStream<Message<TdResultList>> updates = MonoUtils.unicastBackpressureStream(1000);
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> updatesStreamEnd = Sinks.one();
// This will only result in a crash, never completes in other ways
private final Empty<Void> crash = Sinks.one();
private int botId;
private String botAddress;
private String botAlias;
private boolean local;
private long initTime;
public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) {
cluster = clusterManager;
this.logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusClient.class);
this.cluster = clusterManager;
this.deliveryOptions = cluster.newDeliveryOpts().setLocalOnly(local);
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000);
}
public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager,
int botId,
String botAlias,
String botAddress,
boolean local) {
boolean local,
Path binlogsArchiveDirectory) {
var instance = new AsyncTdMiddleEventBusClient(clusterManager);
var options = clusterManager
.newDeploymentOpts()
.setConfig(new JsonObject().put("botAddress", botAddress).put("botAlias", botAlias).put("local", local));
return clusterManager
.getVertx()
.rxDeployVerticle(instance, options)
.as(MonoUtils::toMono)
.doOnSuccess(s -> logger.trace("Deployed verticle for bot address: " + botAddress))
.thenReturn(instance);
return retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId)
.single()
.flatMap(binlog -> instance
.start(botId, botAlias, local, binlog)
.thenReturn(instance)
);
}
@Override
public Completable rxStart() {
return Mono
.fromCallable(() -> {
var botAddress = config().getString("botAddress");
if (botAddress == null || botAddress.isEmpty()) {
throw new IllegalArgumentException("botAddress is not set!");
}
this.botAddress = botAddress;
var botAlias = config().getString("botAlias");
if (botAlias == null || botAlias.isEmpty()) {
throw new IllegalArgumentException("botAlias is not set!");
}
this.botAlias = botAlias;
var local = config().getBoolean("local");
if (local == null) {
throw new IllegalArgumentException("local is not set!");
}
this.local = local;
this.initTime = System.currentTimeMillis();
return null;
})
.then(Mono.create(future -> {
logger.debug("Waiting for " + botAddress + ".readyToStart");
var readyToStartConsumer = cluster.getEventBus().<byte[]>consumer(botAddress + ".readyToStart");
readyToStartConsumer.handler((Message<byte[]> pingMsg) -> {
logger.debug("Received ping reply (succeeded)");
readyToStartConsumer
.rxUnregister()
.as(MonoUtils::toMono)
.doOnError(ex -> {
logger.error("Failed to unregister readyToStartConsumer", ex);
})
.then(Mono.fromCallable(() -> {
pingMsg.reply(new byte[0]);
logger.debug("Requesting " + botAddress + ".start");
return null;
}))
.then(cluster
.getEventBus()
.rxRequest(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout)
.as(MonoUtils::toMono)
.doOnError(ex -> logger.error("Failed to request bot start", ex)))
.doOnNext(msg -> logger.debug("Requesting " + botAddress + ".isWorking"))
.then(cluster.getEventBus()
.rxRequest(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout)
.as(MonoUtils::toMono)
.doOnError(ex -> logger.error("Failed to request isWorking", ex)))
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, future::error, future::success);
});
readyToStartConsumer
.rxCompletionHandler()
.as(MonoUtils::toMono)
.doOnSuccess(s -> logger.debug("Requesting tdlib.remoteclient.clients.deploy.existing"))
.then(Mono.fromCallable(() -> cluster.getEventBus().publish("tdlib.remoteclient.clients.deploy", botAddress, deliveryOptions)))
.subscribe(v -> {}, future::error);
})
)
.onErrorMap(ex -> {
logger.error("Failure when starting bot " + botAddress, ex);
return new InitializationException("Can't connect tdlib middle client to tdlib middle server!");
})
.as(MonoUtils::toCompletable);
/**
*
* @return optional result
*/
public static Mono<BinlogAsyncFile> retrieveBinlog(Vertx vertx, Path binlogsArchiveDirectory, int botId) {
return BinlogUtils.retrieveBinlog(vertx.fileSystem(), binlogsArchiveDirectory.resolve(botId + ".binlog"));
}
@Override
public Completable rxStop() {
return Mono
.fromRunnable(() -> logger.debug("Stopping AsyncTdMiddle client verticle..."))
.then(Mono
.firstWithSignal(
tdCloseRequested.asMono(),
tdClosed.asMono(),
Mono.firstWithSignal(
tdCrashed.asMono(),
Mono
.fromRunnable(() -> logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping..."))
.then(this.execute(new TdApi.Close(), false))
.then()
)
.doOnTerminate(() -> {
logger.debug("Close() sent to td");
markCloseRequested();
})
.then(tdClosed.asMono())
)
)
.doOnError(ex -> logger.debug("Failed to stop AsyncTdMiddle client verticle"))
.doOnSuccess(s -> logger.debug("Stopped AsyncTdMiddle client verticle"))
.as(MonoUtils::toCompletable);
private Mono<Void> saveBinlog(byte[] data) {
return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data));
}
public Mono<Void> start(int botId, String botAlias, boolean local, BinlogAsyncFile binlog) {
this.botId = botId;
this.botAlias = botAlias;
this.botAddress = "bots.bot." + this.botId;
this.local = local;
this.logger = LoggerFactory.getLogger(this.botId + " " + botAlias);
return MonoUtils
.emitValue(this.binlog, binlog)
.then(binlog.getLastModifiedTime())
.zipWith(binlog.readFullyBytes())
.single()
.flatMap(tuple -> {
var binlogLastModifiedTime = tuple.getT1();
var binlogData = tuple.getT2();
var msg = new StartSessionMessage(this.botId, this.botAlias, binlogData, binlogLastModifiedTime);
return setupUpdatesListener()
.then(cluster.getEventBus().<byte[]>rxRequest("bots.start-bot", msg).as(MonoUtils::toMono))
.then();
});
}
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
private Mono<Void> setupUpdatesListener() {
var updateConsumer = cluster.getEventBus().<TdResultList>consumer(botAddress + ".update");
updateConsumer.endHandler(h -> {
logger.error("<<<<<<<<<<<<<<<<EndHandler?>>>>>>>>>>>>>");
});
// Here the updates will be piped from the server to the client
updateConsumer
.rxPipeTo(updates.writeAsStream()).as(MonoUtils::toMono)
.subscribeOn(Schedulers.single())
.subscribe();
// Return when the registration of all the consumers has been done across the cluster
return updateConsumer.rxCompletionHandler().as(MonoUtils::toMono);
}
@Override
public Flux<TdApi.Object> receive() {
var fluxCodec = new TdResultListMessageCodec();
return Flux
.firstWithSignal(
tdCloseRequested.asMono().flux().cast(TdApi.Object.class),
tdClosed.asMono().flux().cast(TdApi.Object.class),
EventBusFlux
.<TdResultList>connect(cluster.getEventBus(),
botAddress + ".updates",
deliveryOptions,
fluxCodec,
Duration.ofMillis(deliveryOptionsWithTimeout.getSendTimeout())
)
.filter(Objects::nonNull)
.flatMap(block -> Flux.fromIterable(block.getValues()))
.filter(Objects::nonNull)
.onErrorResume(error -> {
TdApi.Error theError;
if (error instanceof ConnectException) {
theError = new Error(444, "CONNECTION_KILLED");
} else if (error.getMessage().contains("Timed out")) {
theError = new Error(444, "CONNECTION_KILLED");
} else {
theError = new Error(406, "INVALID_UPDATE");
logger.error("Bot updates request failed! Marking as closed.", error);
}
tdCrashed.tryEmitValue(theError);
return Flux.just(TdResult.failed(theError));
}).flatMap(item -> Mono.fromCallable(item::orElseThrow))
.filter(Objects::nonNull)
.doOnNext(item -> {
if (OUTPUT_REQUESTS) {
System.out.println(" <- " + item.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "=")
);
}
if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var state = (UpdateAuthorizationState) item;
if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
// Send tdClosed early to avoid errors
logger.debug("Received AuthorizationStateClosed from td. Marking td as closed");
markCloseRequested();
markClosed();
}
}
}).doFinally(s -> {
if (s == SignalType.ON_ERROR) {
// Send tdClosed early to avoid errors
logger.debug("Updates flux terminated with an error signal. Marking td as closed");
markCloseRequested();
markClosed();
}
})
);
}
private void markCloseRequested() {
if (tdCloseRequested.tryEmitEmpty().isFailure()) {
logger.error("Failed to set tdCloseRequested");
}
}
private void markClosed() {
if (tdClosed.tryEmitEmpty().isFailure()) {
logger.error("Failed to set tdClosed");
}
if (tdCrashed.tryEmitEmpty().isFailure()) {
logger.debug("TDLib already crashed");
}
// Here the updates will be received
return updates
.readAsFlux()
.subscribeOn(Schedulers.single())
.flatMap(updates -> Flux.fromIterable(updates.body().getValues()))
.flatMap(update -> Mono.fromCallable(update::orElseThrow))
.doOnError(crash::tryEmitError)
.doOnTerminate(updatesStreamEnd::tryEmitEmpty);
}
@Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) {
var req = new ExecuteObject(executeDirectly, request);
return Mono
.fromRunnable(() -> {
if (OUTPUT_REQUESTS) {
System.out.println(" -> " + request.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
})
.then(Mono.firstWithSignal(
tdCloseRequested.asMono().flatMap(t -> Mono.empty()),
tdClosed.asMono().flatMap(t -> Mono.empty()),
.firstWithSignal(
MonoUtils.castVoid(crash.asMono()),
cluster.getEventBus()
.<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions)
.as(MonoUtils::toMono)
.<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptionsWithTimeout).as(MonoUtils::toMono)
.onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex))
.<TdResult<T>>flatMap(resp -> resp.body() == null ? Mono.<TdResult<T>>error(new NullPointerException("Response is empty")) : Mono.just(resp.body().toTdResult()))
.switchIfEmpty(Mono.defer(() -> Mono.just(TdResult.<T>failed(new TdApi.Error(500, "Client is closed or response is empty")))))
.doOnNext(response -> {
if (OUTPUT_REQUESTS) {
System.out.println(" <- " + response.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
.<TdResult<T>>flatMap(resp -> Mono.fromCallable(() -> {
if (resp.body() == null) {
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty"));
} else {
return resp.body().toTdResult();
}
})
));
}))
)
.switchIfEmpty(Mono.fromCallable(() -> {
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty"));
}));
}
}

View File

@ -1,5 +1,8 @@
package it.tdlight.tdlibsession.td.middle.direct;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
@ -8,6 +11,8 @@ import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient;
import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer;
import it.tdlight.utils.MonoUtils;
import java.nio.file.Path;
import org.warp.commonutils.error.InitializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -16,26 +21,38 @@ import reactor.core.publisher.Sinks.One;
public class AsyncTdMiddleLocal implements AsyncTdMiddle {
private final AsyncTdMiddleEventBusServer srv;
private final TdClusterManager masterClusterManager;
private final One<AsyncTdMiddle> cli = Sinks.one();
private final String botAlias;
private final String botAddress;
private final int botId;
private final DeploymentOptions deploymentOptions;
private final Vertx vertx;
public AsyncTdMiddleLocal(TdClusterManager masterClusterManager, String botAlias, String botAddress) {
this.srv = new AsyncTdMiddleEventBusServer(masterClusterManager);
private final AsyncTdMiddleEventBusServer srv;
private final One<AsyncTdMiddle> cli = Sinks.one();
public AsyncTdMiddleLocal(TdClusterManager masterClusterManager, String botAlias, int botId) {
this.masterClusterManager = masterClusterManager;
this.botAlias = botAlias;
this.botAddress = botAddress;
this.botId = botId;
this.vertx = masterClusterManager.getVertx();
this.deploymentOptions = masterClusterManager
.newDeploymentOpts()
.setConfig(new JsonObject()
.put("botId", botId)
.put("botAlias", botAlias)
.put("local", true));
this.srv = new AsyncTdMiddleEventBusServer();
}
public Mono<AsyncTdMiddleLocal> start() {
return srv
.start(botAddress, botAlias, true)
return vertx
.rxDeployVerticle(srv, deploymentOptions).as(MonoUtils::toMono)
.single()
.then(Mono.fromSupplier(() -> new AsyncTdMiddleEventBusClient(masterClusterManager)))
.zipWith(AsyncTdMiddleEventBusClient.retrieveBinlog(vertx, Path.of("binlogs"), botId))
.flatMap(tuple -> tuple.getT1().start(botId, botAlias, true, tuple.getT2()).thenReturn(tuple.getT1()))
.onErrorMap(InitializationException::new)
.single()
.then(AsyncTdMiddleEventBusClient.getAndDeployInstance(masterClusterManager, botAlias, botAddress, true))
.single()
.doOnNext(this.cli::tryEmitValue)
.doOnError(this.cli::tryEmitError)
.thenReturn(this);

View File

@ -1,229 +1,112 @@
package it.tdlight.tdlibsession.td.middle.server;
import static it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient.OUTPUT_REQUESTS;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.reactivex.Completable;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import io.vertx.reactivex.core.eventbus.MessageProducer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.EventBusFlux;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.utils.MonoUtils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuples;
public class AsyncTdMiddleEventBusServer {
public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
// Static values
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusServer.class);
protected static final Logger logger = LoggerFactory.getLogger("TdMiddleServer");
public static final byte[] EMPTY = new byte[0];
public static final Duration WAIT_DURATION = Duration.ofSeconds(1);
// Values configured from constructor
private final TdClusterManager cluster;
private final AsyncTdDirectOptions tdOptions;
// Variables configured by the user at startup
private final One<String> botAlias = Sinks.one();
private final One<Integer> botId = Sinks.one();
private final One<String> botAddress = Sinks.one();
private final One<String> botAlias = Sinks.one();
private final One<Boolean> local = Sinks.one();
private final Many<Consumer<Promise<Void>>> onBeforeStopListeners = Sinks.many().replay().all();
private final Many<Consumer<Promise<Void>>> onAfterStopListeners = Sinks.many().replay().all();
// Variables configured at startup
private final One<AsyncTdDirectImpl> td = Sinks.one();
private final One<MessageConsumer<byte[]>> startConsumer = Sinks.one();
private final One<MessageConsumer<byte[]>> isWorkingConsumer = Sinks.one();
private final One<MessageConsumer<ExecuteObject>> executeConsumer = Sinks.one();
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
this.cluster = clusterManager;
public AsyncTdMiddleEventBusServer() {
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100);
}
public Mono<AsyncTdMiddleEventBusServer> start(String botAddress, String botAlias, boolean local) {
return Mono.<Void>create(sink -> {
if (botAddress == null || botAddress.isEmpty()) {
sink.error(new IllegalArgumentException("botAddress is not set!"));
return;
}
if (this.botAddress.tryEmitValue(botAddress).isFailure()) {
sink.error(new IllegalStateException("Failed to set botAddress"));
return;
}
if (botAlias == null || botAlias.isEmpty()) {
sink.error(new IllegalArgumentException("botAlias is not set!"));
return;
}
if (this.botAlias.tryEmitValue(botAlias).isFailure()) {
sink.error(new IllegalStateException("Failed to set botAlias"));
return;
}
if (this.local.tryEmitValue(local).isFailure()) {
sink.error(new IllegalStateException("Failed to set local"));
return;
}
var td = new AsyncTdDirectImpl(botAlias);
if (this.td.tryEmitValue(td).isFailure()) {
sink.error(new IllegalStateException("Failed to set td instance"));
return;
}
if (this.onBeforeStopListeners.tryEmitComplete().isFailure()) {
sink.error(new IllegalStateException("Failed to finalize \"before stop\" listeners"));
return;
}
if (this.onAfterStopListeners.tryEmitComplete().isFailure()) {
sink.error(new IllegalStateException("Failed to finalize \"after stop\" listeners"));
return;
}
@Override
public Completable rxStart() {
return MonoUtils.toCompletable(Mono
.fromCallable(() -> {
var botId = config().getInteger("botId");
if (botId == null || botId <= 0) {
throw new IllegalArgumentException("botId is not set!");
}
if (this.botId.tryEmitValue(botId).isFailure()) {
throw new IllegalStateException("Failed to set botId");
}
var botAddress = "bots.bot." + botId;
if (this.botAddress.tryEmitValue(botAddress).isFailure()) {
throw new IllegalStateException("Failed to set botAddress");
}
var botAlias = config().getString("botAlias");
if (botAlias == null || botAlias.isEmpty()) {
throw new IllegalArgumentException("botAlias is not set!");
}
if (this.botAlias.tryEmitValue(botAlias).isFailure()) {
throw new IllegalStateException("Failed to set botAlias");
}
var local = config().getBoolean("local");
if (local == null) {
throw new IllegalArgumentException("local is not set!");
}
if (this.local.tryEmitValue(local).isFailure()) {
throw new IllegalStateException("Failed to set local");
}
AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
var startConsumer = cluster.getEventBus().<byte[]>consumer(botAddress + ".start");
if (this.startConsumer.tryEmitValue(startConsumer).isSuccess()) {
startConsumer.handler((Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) {
startConsumer.unregister(startConsumerUnregistered -> {
if (startConsumerUnregistered.succeeded()) {
onSuccessfulStartRequest(msg, td, botAddress, botAlias, local);
} else {
logger.error("Failed to unregister start consumer");
}
});
} else {
msg.reply(EMPTY);
var td = new AsyncTdDirectImpl(botAlias);
if (this.td.tryEmitValue(td).isFailure()) {
throw new IllegalStateException("Failed to set td instance");
}
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
var readyToStartOpts = cluster.newDeliveryOpts().setSendTimeout(30000);
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, readyToStartOpts, msg -> sink.success());
} else {
sink.error(h.cause());
}
});
} else {
sink.error(new IllegalStateException("Failed to set startConsumer"));
}
}).thenReturn(this);
return onSuccessfulStartRequest(td, botAddress, botAlias, local);
})
.flatMap(Mono::hide));
}
private void onSuccessfulStartRequest(Message<byte[]> msg, AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) {
this.listen(td, botAddress, botAlias, local).then(this.pipe(td, botAddress, botAlias, local)).then(Mono.<Void>create(registrationSink -> {
var isWorkingConsumer = cluster.getEventBus().<byte[]>consumer(botAddress + ".isWorking");
if (this.isWorkingConsumer.tryEmitValue(isWorkingConsumer).isSuccess()) {
isWorkingConsumer.handler((Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
private Mono<Void> onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) {
return this
.listen(td, botAddress, botAlias, local)
.then(this.pipe(td, botAddress, botAlias, local))
.doOnSuccess(s -> {
logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded");
})
.doOnError(ex -> {
logger.error("Deploy and start of bot \"" + botAlias + "\": ❌ Failed", ex);
});
isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
} else {
logger.error("Failed to set isWorkingConsumer");
msg.fail(500, "Failed to set isWorkingConsumer");
}
})).subscribe(v -> {}, ex -> {
logger.error("Deploy and start of bot \"" + botAlias + "\": ❌ Failed", ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded");
msg.reply(EMPTY);
});
}
/**
* Register a before stop listener
* @param eventListener listener
* @return success if the listener has been registered correctly
*/
public EmitResult onBeforeStop(Consumer<Promise<Void>> eventListener, boolean crashOnFail) {
if (crashOnFail) {
return this.onBeforeStopListeners.tryEmitNext(eventListener);
} else {
return this.onBeforeStopListeners.tryEmitNext(promise -> {
Promise<Void> falliblePromise = Promise.promise();
falliblePromise.future().onComplete(result -> {
if (result.failed()) {
logger.warn("A beforeStop listener failed. Ignored error", result.cause());
}
promise.complete();
});
eventListener.accept(falliblePromise);
});
}
}
/**
* Register an after stop listener
* @param eventListener listener
* @return success if the listener has been registered correctly
*/
public EmitResult onAfterStop(Consumer<Promise<Void>> eventListener, boolean crashOnFail) {
if (crashOnFail) {
return this.onAfterStopListeners.tryEmitNext(eventListener);
} else {
return this.onAfterStopListeners.tryEmitNext(promise -> {
Promise<Void> falliblePromise = Promise.promise();
falliblePromise.future().onComplete(result -> {
if (result.failed()) {
logger.warn("An afterStop listener failed. Ignored error", result.cause());
}
promise.complete();
});
eventListener.accept(falliblePromise);
});
}
}
private void runAll(List<Consumer<Promise<Void>>> actions, Handler<AsyncResult<Void>> resultHandler) {
if (actions.isEmpty()) {
resultHandler.handle(Future.succeededFuture());
} else {
var firstAction = actions.remove(0);
Promise<Void> promise = Promise.promise();
firstAction.accept(promise);
promise.future().onComplete(handler -> {
if (handler.succeeded()) {
runAll(new ArrayList<>(actions), resultHandler);
} else {
resultHandler.handle(Future.failedFuture(handler.cause()));
}
});
}
}
private Mono<Void> listen(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) {
return Mono.<Void>create(registrationSink -> {
MessageConsumer<ExecuteObject> executeConsumer = cluster.getEventBus().consumer(botAddress + ".execute");
MessageConsumer<ExecuteObject> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute");
if (this.executeConsumer.tryEmitValue(executeConsumer).isFailure()) {
registrationSink.error(new IllegalStateException("Failed to set executeConsumer"));
return;
@ -233,25 +116,13 @@ public class AsyncTdMiddleEventBusServer {
executeConsumer.handler(sink::next);
executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})
.doOnNext(msg -> {
if (OUTPUT_REQUESTS) {
System.out.println(":=> " + msg
.body()
.getRequest()
.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
})
.flatMap(msg -> td
.execute(msg.body().getRequest(), msg.body().isExecuteDirectly())
.map(result -> Tuples.of(msg, result)))
.handle((tuple, sink) -> {
var msg = tuple.getT1();
var response = tuple.getT2();
var replyOpts = cluster.newDeliveryOpts().setLocalOnly(local);
var replyOpts = new DeliveryOptions().setLocalOnly(local);
var replyValue = new TdResultMessage(response.result(), response.cause());
try {
msg.reply(replyValue, replyOpts);
@ -269,62 +140,25 @@ public class AsyncTdMiddleEventBusServer {
});
}
private void undeploy(Runnable whenUndeployed) {
botAlias.asMono().single().flatMap(botAlias -> {
logger.info("Undeploy of bot \"" + botAlias + "\": stopping");
return onBeforeStopListeners.asFlux()
.collectList()
.single()
.flatMap(onBeforeStopListeners ->
Mono.<Void>create(sink -> runAll(onBeforeStopListeners, MonoUtils.toHandler(sink)))
.doOnError(ex -> logger.error("A beforeStop listener failed", ex)))
.then(Flux
.merge(unregisterConsumerOrLog(this.isWorkingConsumer.asMono(), "isWorkingConsumer"),
unregisterConsumerOrLog(this.startConsumer.asMono(), "isWorkingConsumer"),
unregisterConsumerOrLog(this.executeConsumer.asMono(), "isWorkingConsumer"))
.then())
.then(onAfterStopListeners.asFlux().collectList())
.single()
.doOnNext(onAfterStopListeners -> {
logger.info("TdMiddle verticle \"" + botAddress + "\" stopped");
runAll(onAfterStopListeners, onAfterStopHandler -> {
if (onAfterStopHandler.failed()) {
logger.error("An afterStop listener failed: " + onAfterStopHandler.cause());
}
logger.info("Undeploy of bot \"" + botAlias + "\": stopped");
whenUndeployed.run();
});
});
})
.doOnError(ex -> logger.error("Error when stopping", ex))
.subscribe();
}
private <T> Mono<Void> unregisterConsumerOrLog(Mono<MessageConsumer<T>> consumerMono, String consumerName) {
return consumerMono
.flatMap(consumer -> Mono
.<Void>create(sink -> consumer.unregister(MonoUtils.toHandler(sink)))
.onErrorResume(ex -> Mono.fromRunnable(() -> {
logger.error("Can't unregister consumer \"" + consumerName + "\"", ex);
})));
@Override
public Completable rxStop() {
return MonoUtils.toCompletable(botAlias
.asMono()
.timeout(Duration.ofSeconds(1), Mono.just("???"))
.flatMap(botAlias -> Mono
.fromRunnable(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopping"))
.then(executeConsumer
.asMono()
.timeout(Duration.ofSeconds(5), Mono.empty())
.flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)))
.doOnError(ex -> logger.error("Undeploy of bot \"" + botAlias + "\": stop failed", ex))
.doOnTerminate(() -> logger.info("Undeploy of bot \"" + botAlias + "\": stopped"))));
}
private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, boolean local) {
var updatesFlux = td
Flux<TdResultList> updatesFlux = td
.receive(tdOptions)
.doOnNext(update -> {
if (OUTPUT_REQUESTS) {
System.out.println("<=: " + update
.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
})
.flatMap(item -> Mono.<TdResult<TdApi.Object>>create(sink -> {
.flatMap(item -> Mono.defer(() -> {
if (item.succeeded()) {
var tdObject = item.result();
if (tdObject instanceof Update) {
@ -334,47 +168,50 @@ public class AsyncTdMiddleEventBusServer {
if (tdUpdateAuthorizationState.authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) {
logger.debug("Undeploying after receiving AuthorizationStateClosed");
this.undeploy(() -> sink.success(item));
return;
return rxStop().as(MonoUtils::toMono).thenReturn(item);
}
}
} else if (tdObject instanceof Error) {
// An error in updates means that a fatal error occurred
logger.debug("Undeploying after receiving a fatal error");
this.undeploy(() -> sink.success(item));
return;
return rxStop().as(MonoUtils::toMono).thenReturn(item);
}
sink.success(item);
return Mono.just(item);
} else {
return Mono.just(item);
}
}))
.bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100))
.windowTimeout(1, Duration.ofSeconds(5))
.flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))
.map(TdResultList::new).doOnTerminate(() -> {
if (OUTPUT_REQUESTS) {
System.out.println("<=: end (3)");
}
});
.map(TdResultList::new);
var fluxCodec = new TdResultListMessageCodec();
var tuple = EventBusFlux.<TdResultList>serve(updatesFlux,
cluster.getEventBus(),
botAddress + ".updates",
cluster.newDeliveryOpts().setLocalOnly(local),
fluxCodec,
Duration.ofSeconds(30)
);
var served = tuple.getT1();
var fatalError = tuple.getT2();
var opts = new DeliveryOptions()
.setLocalOnly(local)
.setSendTimeout(Duration.ofSeconds(30).toMillis())
.setCodecName(fluxCodec.name());
MessageProducer<TdResultList> updatesSender = vertx
.eventBus()
.sender(botAddress + ".updates", opts);
//noinspection CallingSubscribeInNonBlockingScope
fatalError
.doOnNext(e -> logger.warn("Undeploying after a fatal error in a served flux"))
.flatMap(error -> td.execute(new TdApi.Close(), false))
.doOnError(ex -> logger.error("Unexpected error", ex))
updatesFlux
.flatMap(update -> updatesSender.rxWrite(update).as(MonoUtils::toMono))
.doOnTerminate(() -> updatesSender.close(h -> {
if (h.failed()) {
logger.error("Failed to close \"updates\" message sender");
}
}))
.onErrorResume(ex -> {
logger.warn("Undeploying after a fatal error in a served flux", ex);
return td.execute(new TdApi.Close(), false)
.doOnError(ex2 -> logger.error("Unexpected error", ex2))
.then();
})
.subscribeOn(Schedulers.single())
.subscribe();
return served.doOnSubscribe(s -> {
logger.debug("Preparing to serve bot \"" + botAlias + "\" updates flux...");
}).doOnSuccess(v -> {
logger.debug("Ready to serve bot \"" + botAlias + "\" updates flux");
});
return Mono.empty();
}
}

View File

@ -0,0 +1,65 @@
package it.tdlight.utils;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.AsyncFile;
import io.vertx.reactivex.core.file.FileProps;
import io.vertx.reactivex.core.file.FileSystem;
import reactor.core.publisher.Mono;
public class BinlogAsyncFile {
private final FileSystem filesystem;
private final String path;
private final AsyncFile file;
public BinlogAsyncFile(FileSystem fileSystem, String path, AsyncFile file) {
this.filesystem = fileSystem;
this.path = path;
this.file = file;
}
public Mono<Buffer> readFully() {
return filesystem
.rxProps(path)
.map(props -> (int) props.size())
.as(MonoUtils::toMono)
.flatMap(size -> {
var buf = Buffer.buffer(size);
return file.rxRead(buf, 0, 0, size).as(MonoUtils::toMono).thenReturn(buf);
});
}
public Mono<byte[]> readFullyBytes() {
return this.readFully().map(Buffer::getBytes);
}
public AsyncFile getFile() {
return file;
}
public Mono<Void> overwrite(Buffer newData) {
return file.rxWrite(newData, 0)
.andThen(file.rxFlush())
.andThen(filesystem.rxTruncate(path, newData.length()))
.as(MonoUtils::toMono);
}
public Mono<Void> overwrite(byte[] newData) {
return this.overwrite(Buffer.buffer(newData));
}
public FileSystem getFilesystem() {
return filesystem;
}
public String getPath() {
return path;
}
public Mono<Long> getLastModifiedTime() {
return filesystem
.rxProps(path)
.map(FileProps::lastModifiedTime)
.as(MonoUtils::toMono);
}
}

View File

@ -0,0 +1,54 @@
package it.tdlight.utils;
import io.vertx.core.file.OpenOptions;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.FileSystem;
import java.nio.file.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
public class BinlogUtils {
private static final Logger logger = LoggerFactory.getLogger(BinlogUtils.class);
/**
*
* @return optional result
*/
public static Mono<BinlogAsyncFile> retrieveBinlog(FileSystem vertxFilesystem, Path binlogPath) {
var path = binlogPath.toString();
var openOptions = new OpenOptions().setWrite(true).setRead(true).setCreate(false).setDsync(true);
return vertxFilesystem.rxExists(path).filter(exists -> exists)
.flatMapSingle(x -> vertxFilesystem.rxOpen(path, openOptions))
.map(file -> new BinlogAsyncFile(vertxFilesystem, path, file))
.as(MonoUtils::toMono);
}
public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, byte[] data) {
return binlog.overwrite(data);
}
public static Mono<Void> chooseBinlog(FileSystem vertxFilesystem,
Path binlogPath,
byte[] remoteBinlog,
long remoteBinlogDate) {
var path = binlogPath.toString();
return retrieveBinlog(vertxFilesystem, binlogPath)
.flatMap(binlog -> Mono
.just(binlog)
.zipWith(binlog.getLastModifiedTime())
)
// Files older than the remote file will be overwritten
.filter(tuple -> tuple.getT2() < remoteBinlogDate)
.map(Tuple2::getT1)
.switchIfEmpty(Mono
.fromRunnable(() -> logger.info("Overwriting local binlog: " + binlogPath))
.then(vertxFilesystem.rxWriteFile(path, Buffer.buffer(remoteBinlog)).as(MonoUtils::toMono))
.then(retrieveBinlog(vertxFilesystem, binlogPath))
)
.single()
.then();
}
}

View File

@ -1,28 +1,49 @@
package it.tdlight.utils;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Observable;
import io.reactivex.Single;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.reactivex.core.streams.Pipe;
import io.vertx.reactivex.core.streams.ReadStream;
import io.vertx.reactivex.core.streams.WriteStream;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
import java.time.Duration;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.concurrency.future.CompletableFutureUtils;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmissionException;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
public class MonoUtils {
@ -77,6 +98,14 @@ public class MonoUtils {
});
}
public static <T> Mono<T> fromBlockingMaybe(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic());
}
public static <T> Mono<T> fromBlockingSingle(Callable<T> callable) {
return fromBlockingMaybe(callable).single();
}
public static <T> CoreSubscriber<? super T> toSubscriber(Promise<T> promise) {
return new CoreSubscriber<T>() {
@Override
@ -218,4 +247,540 @@ public class MonoUtils {
public static <T> Completable toCompletable(Mono<T> s) {
return Completable.fromPublisher(s);
}
public static Mono<Void> fromEmitResult(EmitResult emitResult) {
return Mono.fromCallable(() -> {
emitResult.orThrow();
return null;
});
}
public static Future<Void> fromEmitResultFuture(EmitResult emitResult) {
if (emitResult.isSuccess()) {
return Future.succeededFuture();
} else {
return Future.failedFuture(new EmissionException(emitResult));
}
}
public static <T> Mono<Void> emitValue(One<T> sink, T value) {
return fromEmitResult(sink.tryEmitValue(value));
}
public static <T> Mono<Void> emitNext(Many<T> sink, T value) {
return fromEmitResult(sink.tryEmitNext(value));
}
public static <T> Mono<Void> emitComplete(Many<T> sink) {
return fromEmitResult(sink.tryEmitComplete());
}
public static <T> Mono<Void> emitError(Empty<T> sink, Throwable value) {
return fromEmitResult(sink.tryEmitError(value));
}
public static <T> Future<Void> emitEmpty(Empty<T> sink) {
return fromEmitResultFuture(sink.tryEmitEmpty());
}
public static <T> Future<Void> emitValueFuture(One<T> sink, T value) {
return fromEmitResultFuture(sink.tryEmitValue(value));
}
public static <T> Future<Void> emitNextFuture(Many<T> sink, T value) {
return fromEmitResultFuture(sink.tryEmitNext(value));
}
public static <T> Future<Void> emitCompleteFuture(Many<T> sink) {
return fromEmitResultFuture(sink.tryEmitComplete());
}
public static <T> Future<Void> emitErrorFuture(Empty<T> sink, Throwable value) {
return fromEmitResultFuture(sink.tryEmitError(value));
}
public static <T> Future<Void> emitEmptyFuture(Empty<T> sink) {
return fromEmitResultFuture(sink.tryEmitEmpty());
}
public static <T> SinkRWStream<T> unicastBackpressureSinkStreak() {
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
return asStream(sink, null, null, 1);
}
/**
* Create a sink that can be written from a writeStream
*/
public static <T> SinkRWStream<T> unicastBackpressureStream(int maxBackpressureQueueSize) {
Queue<T> boundedQueue = Queues.<T>get(maxBackpressureQueueSize).get();
var queueSize = Flux
.interval(Duration.ZERO, Duration.ofMillis(500))
.map(n -> boundedQueue.size());
Empty<Void> termination = Sinks.empty();
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer(boundedQueue, termination::tryEmitEmpty);
return asStream(sink, queueSize, termination, maxBackpressureQueueSize);
}
public static <T> SinkRWStream<T> unicastBackpressureErrorStream() {
Many<T> sink = Sinks.many().unicast().onBackpressureError();
return asStream(sink, null, null, 1);
}
public static <T> SinkRWStream<T> asStream(Many<T> sink,
@Nullable Flux<Integer> backpressureSize,
@Nullable Empty<Void> termination,
int maxBackpressureQueueSize) {
return new SinkRWStream<>(sink, backpressureSize, termination, maxBackpressureQueueSize);
}
private static Future<Void> toVertxFuture(Mono<Void> toTransform) {
var promise = Promise.<Void>promise();
toTransform.subscribeOn(Schedulers.single()).subscribe(next -> {}, promise::fail, promise::complete);
return promise.future();
}
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T> Mono<T> castVoid(Mono<Void> mono) {
return (Mono) mono;
}
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {
private final Many<T> sink;
private final @Nullable Disposable drainSubscription;
private Handler<Throwable> exceptionHandler = e -> {};
private Handler<Void> drainHandler = h -> {};
private final int maxBackpressureQueueSize;
private volatile int writeQueueMaxSize;
private volatile boolean writeQueueFull = false;
public SinkRWStream(Many<T> sink,
@Nullable Flux<Integer> backpressureSize,
@Nullable Empty<Void> termination,
int maxBackpressureQueueSize) {
this.maxBackpressureQueueSize = maxBackpressureQueueSize;
this.writeQueueMaxSize = this.maxBackpressureQueueSize;
this.sink = sink;
if (backpressureSize != null) {
AtomicBoolean drained = new AtomicBoolean(true);
this.drainSubscription = backpressureSize
.subscribeOn(Schedulers.single())
.subscribe(size -> {
writeQueueFull = size >= this.writeQueueMaxSize;
boolean newDrained = size <= this.writeQueueMaxSize / 2;
boolean oldDrained = drained.getAndSet(newDrained);
if (newDrained && !oldDrained) {
drainHandler.handle(null);
}
}, ex -> {
exceptionHandler.handle(ex);
}, () -> {
if (!drained.get()) {
drainHandler.handle(null);
}
});
if (termination != null) {
termination.asMono().subscribeOn(Schedulers.single()).doOnTerminate(drainSubscription::dispose).subscribe();
}
} else {
this.drainSubscription = null;
}
}
public Flux<T> readAsFlux() {
return sink.asFlux();
}
public ReactiveReactorReadStream<T> readAsStream() {
return new ReactiveReactorReadStream<>(this);
}
public Many<T> writeAsSink() {
return sink;
}
public ReactiveSinkWriteStream<T> writeAsStream() {
return new ReactiveSinkWriteStream<>(this);
}
@Override
public SinkRWStream<T> exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
//
// Read stream section
//
private Handler<Void> readEndHandler = v -> {};
private Subscription readCoreSubscription;
private final AtomicBoolean fetchMode = new AtomicBoolean(false);
@Override
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
sink.asFlux().subscribeWith(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
readCoreSubscription = s;
if (!fetchMode.get()) {
readCoreSubscription.request(1);
}
}
@Override
public void onNext(T t) {
handler.handle(t);
if (!fetchMode.get()) {
readCoreSubscription.request(1);
}
}
@Override
public void onError(Throwable t) {
exceptionHandler.handle(t);
}
@Override
public void onComplete() {
readEndHandler.handle(null);
}
});
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> pause() {
fetchMode.set(true);
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> resume() {
if (fetchMode.compareAndSet(true, false)) {
readCoreSubscription.request(1);
}
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> fetch(long amount) {
if (fetchMode.get()) {
readCoreSubscription.request(amount);
}
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> endHandler(@io.vertx.codegen.annotations.Nullable Handler<Void> endHandler) {
this.readEndHandler = endHandler;
return this;
}
//
// Write stream section
//
@Override
public Future<Void> write(T data) {
return MonoUtils.emitNextFuture(sink, data);
}
@Override
public void write(T data, Handler<AsyncResult<Void>> handler) {
write(data).onComplete(handler);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
MonoUtils.emitCompleteFuture(sink).onComplete(h -> {
if (drainSubscription != null) {
drainSubscription.dispose();
}
}).onComplete(handler);
}
@Override
public io.vertx.core.streams.WriteStream<T> setWriteQueueMaxSize(int maxSize) {
if (maxSize <= maxBackpressureQueueSize) {
this.writeQueueMaxSize = maxSize;
} else {
logger.error("Failed to set writeQueueMaxSize to " + maxSize + ", because it's bigger than the max backpressure queue size " + maxBackpressureQueueSize);
}
return this;
}
@Override
public boolean writeQueueFull() {
return writeQueueFull;
}
@Override
public io.vertx.core.streams.WriteStream<T> drainHandler(@Nullable Handler<Void> handler) {
this.drainHandler = handler;
return this;
}
}
public static class FluxReadStream<T> implements io.vertx.core.streams.ReadStream<T> {
private final Flux<T> flux;
private Handler<Throwable> exceptionHandler = e -> {};
public FluxReadStream(Flux<T> flux) {
this.flux = flux;
}
public Flux<T> readAsFlux() {
return flux;
}
public ReactiveReactorReadStream<T> readAsStream() {
return new ReactiveReactorReadStream<>(this);
}
@Override
public FluxReadStream<T> exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
//
// Read stream section
//
private Handler<Void> readEndHandler = v -> {};
private Subscription readCoreSubscription;
private final AtomicBoolean fetchMode = new AtomicBoolean(false);
@Override
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
flux.subscribeWith(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
readCoreSubscription = s;
if (!fetchMode.get()) {
readCoreSubscription.request(1);
}
}
@Override
public void onNext(T t) {
handler.handle(t);
if (!fetchMode.get()) {
readCoreSubscription.request(1);
}
}
@Override
public void onError(Throwable t) {
exceptionHandler.handle(t);
}
@Override
public void onComplete() {
readEndHandler.handle(null);
}
});
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> pause() {
fetchMode.set(true);
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> resume() {
if (fetchMode.compareAndSet(true, false)) {
readCoreSubscription.request(1);
}
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> fetch(long amount) {
if (fetchMode.get()) {
readCoreSubscription.request(amount);
}
return this;
}
@Override
public io.vertx.core.streams.ReadStream<T> endHandler(@io.vertx.codegen.annotations.Nullable Handler<Void> endHandler) {
this.readEndHandler = endHandler;
return this;
}
}
public static class ReactiveSinkWriteStream<T> implements WriteStream<T> {
private final WriteStream<T> ws;
public ReactiveSinkWriteStream(SinkRWStream<T> ws) {
this.ws = WriteStream.newInstance(ws);
}
public io.vertx.core.streams.WriteStream<T> getDelegate() {
//noinspection unchecked
return ws.getDelegate();
}
@Override
public WriteStream<T> exceptionHandler(Handler<Throwable> handler) {
return ws.exceptionHandler(handler);
}
@Override
public void write(T data, Handler<AsyncResult<Void>> handler) {
ws.write(data, handler);
}
@Override
public void write(T data) {
ws.write(data);
}
@Override
public Completable rxWrite(T data) {
return ws.rxWrite(data);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
ws.end(handler);
}
@Override
public void end() {
ws.end();
}
@Override
public Completable rxEnd() {
return ws.rxEnd();
}
@Override
public void end(T data, Handler<AsyncResult<Void>> handler) {
ws.end(data, handler);
}
@Override
public void end(T data) {
ws.end(data);
}
@Override
public Completable rxEnd(T data) {
return ws.rxEnd(data);
}
@Override
public WriteStream<T> setWriteQueueMaxSize(int maxSize) {
return ws.setWriteQueueMaxSize(maxSize);
}
@Override
public boolean writeQueueFull() {
return ws.writeQueueFull();
}
@Override
public WriteStream<T> drainHandler(Handler<Void> handler) {
return ws.drainHandler(handler);
}
}
public static class ReactiveReactorReadStream<T> implements ReadStream<T> {
private final ReadStream<T> rs;
public ReactiveReactorReadStream(SinkRWStream<T> rws) {
this.rs = ReadStream.newInstance(rws);
}
public ReactiveReactorReadStream(FluxReadStream<T> rs) {
this.rs = ReadStream.newInstance(rs);
}
public ReactiveReactorReadStream(Flux<T> s) {
this.rs = ReadStream.newInstance(new FluxReadStream<>(s));
}
@Override
public io.vertx.core.streams.ReadStream<T> getDelegate() {
//noinspection unchecked
return rs.getDelegate();
}
@Override
public ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
return rs.exceptionHandler(handler);
}
@Override
public ReadStream<T> handler(Handler<T> handler) {
return rs.handler(handler);
}
@Override
public ReadStream<T> pause() {
return rs.pause();
}
@Override
public ReadStream<T> resume() {
return rs.resume();
}
@Override
public ReadStream<T> fetch(long amount) {
return rs.fetch(amount);
}
@Override
public ReadStream<T> endHandler(Handler<Void> endHandler) {
return rs.endHandler(endHandler);
}
@Override
public Pipe<T> pipe() {
return rs.pipe();
}
@Override
public void pipeTo(WriteStream<T> dst, Handler<AsyncResult<Void>> handler) {
rs.pipeTo(dst, handler);
}
@Override
public void pipeTo(WriteStream<T> dst) {
rs.pipeTo(dst);
}
@Override
public Completable rxPipeTo(WriteStream<T> dst) {
return rs.rxPipeTo(dst);
}
@Override
public Observable<T> toObservable() {
return rs.toObservable();
}
@Override
public Flowable<T> toFlowable() {
return rs.toFlowable();
}
}
}

View File

@ -1,10 +1,10 @@
package it.tdlight.utils;
import io.vertx.core.buffer.Buffer;
import it.unimi.dsi.fastutil.io.MeasurableInputStream;
import it.unimi.dsi.fastutil.io.RepositionableStream;
import org.warp.commonutils.stream.SafeMeasurableInputStream;
import org.warp.commonutils.stream.SafeRepositionableStream;
public class VertxBufferInputStream extends MeasurableInputStream implements RepositionableStream {
public class VertxBufferInputStream extends SafeMeasurableInputStream implements SafeRepositionableStream {
private final Buffer buffer;
@ -32,6 +32,17 @@ public class VertxBufferInputStream extends MeasurableInputStream implements Rep
this.length = length;
}
/** Creates a new buffer input stream using a given buffer fragment.
*
* @param buffer the backing buffer.
* @param offset the first valid entry of the buffer.
*/
public VertxBufferInputStream(final Buffer buffer, final int offset) {
this.buffer = buffer;
this.offset = offset;
this.length = buffer.length();
}
/** Creates a new buffer input stream using a given buffer.
*
* @param buffer the backing buffer.
@ -40,6 +51,14 @@ public class VertxBufferInputStream extends MeasurableInputStream implements Rep
this(buffer, 0, buffer.length());
}
/** Creates a new buffer input stream using a given buffer.
*
* @param in the backing buffer.
*/
public VertxBufferInputStream(final VertxBufferInputStream in) {
this(in.buffer, in.offset + in.position, in.buffer.length());
}
@Override
public boolean markSupported() {
return true;

View File

@ -2,10 +2,10 @@ package it.tdlight.utils;
import io.vertx.core.buffer.Buffer;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import it.unimi.dsi.fastutil.io.MeasurableOutputStream;
import it.unimi.dsi.fastutil.io.RepositionableStream;
import org.warp.commonutils.stream.SafeMeasurableOutputStream;
import org.warp.commonutils.stream.SafeRepositionableStream;
public class VertxBufferOutputStream extends MeasurableOutputStream implements RepositionableStream {
public class VertxBufferOutputStream extends SafeMeasurableOutputStream implements SafeRepositionableStream {
/** The buffer backing the output stream. */
public Buffer buffer;