Better state management

This commit is contained in:
Andrea Cavalli 2021-01-13 04:00:43 +01:00
parent c9fa243a92
commit 7d6122e777
9 changed files with 238 additions and 336 deletions

View File

@ -1,28 +1,23 @@
package it.tdlight.tdlibsession.td.direct;
import io.vertx.core.AsyncResult;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.tdlibsession.td.TdResult;
import java.time.Duration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface AsyncTdDirect {
/**
* Receives incoming updates and request responses from TDLib. May be called from any thread, but
* shouldn't be called simultaneously from two different threads.
* Receives incoming updates and request responses from TDLib.
* Can be called only once.
*
* @param receiveDuration Maximum number of seconds allowed for this function to wait for new records. Default: 1 sec
* @param eventsSize Maximum number of events allowed in list. Default: 350 events
* @return An incoming update or request response list. The object returned in the response may be
* an empty list if the timeout expires.
*/
Flux<AsyncResult<TdResult<TdApi.Object>>> getUpdates(Duration receiveDuration, int eventsSize);
Flux<TdResult<TdApi.Object>> receive(AsyncTdDirectOptions options);
/**
* Sends request to TDLib. May be called from any thread.
* Sends request to TDLib.
* Should be called after receive.
*
* @param request Request to TDLib.
* @param synchronous Execute synchronously.
@ -30,13 +25,4 @@ public interface AsyncTdDirect {
*/
<T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean synchronous);
/**
* Initializes the client and TDLib instance.
*/
Mono<Void> initializeClient();
/**
* Destroys the client and TDLib instance.
*/
Mono<Void> destroyClient();
}

View File

@ -1,7 +1,5 @@
package it.tdlight.tdlibsession.td.direct;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import it.tdlight.common.TelegramClient;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
@ -13,11 +11,12 @@ import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlight.ClientManager;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -25,14 +24,9 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class);
private final AtomicReference<TelegramClient> td = new AtomicReference<>();
private final One<TelegramClient> td = Sinks.one();
private final Scheduler tdScheduler = Schedulers.newSingle("TdMain");
private final Scheduler tdPollScheduler = Schedulers.newSingle("TdPoll");
private final Scheduler tdResponsesScheduler = Schedulers.newSingle("TdResponse");
private final Scheduler tdExecScheduler = Schedulers.newSingle("TdExec");
private final Scheduler tdResponsesOutputScheduler = Schedulers.boundedElastic();
private Flux<AsyncResult<TdResult<TdApi.Object>>> updatesProcessor;
private final String botAlias;
public AsyncTdDirectImpl(String botAlias) {
@ -42,86 +36,61 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
@Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean synchronous) {
if (synchronous) {
return Mono
.fromCallable(() -> {
var td = this.td.get();
if (td == null) {
if (request.getConstructor() == Close.CONSTRUCTOR) {
return TdResult.<T>of(new Ok());
}
throw new IllegalStateException("TDLib client is destroyed");
}
return TdResult.<T>of(td.execute(request));
})
.subscribeOn(tdResponsesScheduler)
.publishOn(tdExecScheduler);
} else {
return Mono.<TdResult<T>>create(sink -> {
try {
var td = this.td.get();
if (td == null) {
if (request.getConstructor() == Close.CONSTRUCTOR) {
sink.success(TdResult.<T>of(new Ok()));
}
sink.error(new IllegalStateException("TDLib client is destroyed"));
} else {
td.send(request, v -> {
sink.success(TdResult.of(v));
}, sink::error);
return td.asMono().flatMap(td -> Mono.fromCallable(() -> {
if (td != null) {
return TdResult.<T>of(td.execute(request));
} else {
if (request.getConstructor() == Close.CONSTRUCTOR) {
return TdResult.<T>of(new Ok());
}
} catch (Throwable t) {
sink.error(t);
throw new IllegalStateException("TDLib client is destroyed");
}
}).subscribeOn(tdResponsesScheduler).publishOn(tdResponsesOutputScheduler);
}).publishOn(Schedulers.boundedElastic()).single().subscribeOn(tdScheduler));
} else {
return td.asMono().flatMap(td -> Mono.<TdResult<T>>create(sink -> {
if (td != null) {
try {
td.send(request, v -> sink.success(TdResult.of(v)), sink::error);
} catch (Throwable t) {
sink.error(t);
}
} else {
if (request.getConstructor() == Close.CONSTRUCTOR) {
sink.success(TdResult.<T>of(new Ok()));
}
sink.error(new IllegalStateException("TDLib client is destroyed"));
}
})).single().subscribeOn(tdScheduler);
}
}
@Override
public Flux<AsyncResult<TdResult<TdApi.Object>>> getUpdates(Duration receiveDuration, int eventsSize) {
return updatesProcessor;
}
public Flux<TdResult<TdApi.Object>> receive(AsyncTdDirectOptions options) {
return Flux.<TdResult<TdApi.Object>>create(emitter -> {
One<java.lang.Object> closedFromTd = Sinks.one();
var client = ClientManager.create((Object object) -> {
emitter.next(TdResult.of(object));
// Close the emitter if receive closed state
if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR
&& ((UpdateAuthorizationState) object).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) {
closedFromTd.tryEmitValue(new java.lang.Object());
emitter.complete();
}
}, emitter::error, emitter::error);
try {
this.td.tryEmitValue(client).orThrow();
} catch (Exception ex) {
emitter.error(ex);
}
@Override
public Mono<Void> initializeClient() {
return Mono.<Boolean>create(sink -> {
var updatesConnectableFlux = Flux.<AsyncResult<TdResult<TdApi.Object>>>create(emitter -> {
var client = ClientManager.create((Object object) -> {
emitter.next(Future.succeededFuture(TdResult.of(object)));
// Close the emitter if receive closed state
if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR
&& ((UpdateAuthorizationState) object).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) {
emitter.complete();
}
}, updateError -> {
emitter.next(Future.failedFuture(updateError));
}, error -> {
emitter.next(Future.failedFuture(error));
});
this.td.set(client);
emitter.onDispose(() -> {
this.td.set(null);
});
}).subscribeOn(tdPollScheduler).publish();
// Complete initialization when receiving first update
updatesConnectableFlux.take(1).single()
.doOnSuccess(_v -> sink.success(true)).doOnError(sink::error).subscribe();
// Pass updates to UpdatesProcessor
updatesProcessor = updatesConnectableFlux.publish().refCount();
updatesConnectableFlux.connect();
}).single().then().subscribeOn(tdScheduler).publishOn(tdResponsesOutputScheduler);
}
@Override
public Mono<Void> destroyClient() {
return this
.execute(new TdApi.Close(), false)
.then()
.subscribeOn(tdScheduler)
.publishOn(tdResponsesOutputScheduler);
// Send close if the stream is disposed before tdlib is closed
emitter.onDispose(() -> {
closedFromTd.asMono().take(Duration.ofMillis(10)).switchIfEmpty(Mono.fromRunnable(() -> client.send(new Close(),
result -> logger.trace("Close result: {}", result),
ex -> logger.trace("Error when disposing td client", ex)
))).subscribe();
});
}).subscribeOn(tdScheduler);
}
}

View File

@ -0,0 +1,36 @@
package it.tdlight.tdlibsession.td.direct;
import java.time.Duration;
import java.util.StringJoiner;
public class AsyncTdDirectOptions {
private final Duration receiveDuration;
private final int eventsSize;
/**
*
* @param receiveDuration Maximum number of seconds allowed for this function to wait for new records. Default: 1 sec
* @param eventsSize Maximum number of events allowed in list. Default: 350 events
*/
public AsyncTdDirectOptions(Duration receiveDuration, int eventsSize) {
this.receiveDuration = receiveDuration;
this.eventsSize = eventsSize;
}
public Duration getReceiveDuration() {
return receiveDuration;
}
public int getEventsSize() {
return eventsSize;
}
@Override
public String toString() {
return new StringJoiner(", ", AsyncTdDirectOptions.class.getSimpleName() + "[", "]")
.add("receiveDuration=" + receiveDuration)
.add("eventsSize=" + eventsSize)
.toString();
}
}

View File

@ -49,6 +49,8 @@ import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
public class AsyncTdEasy {
@ -59,7 +61,7 @@ public class AsyncTdEasy {
private final ReplayProcessor<Boolean> requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false);
private final ReplayProcessor<TdEasySettings> settings = ReplayProcessor.cacheLast();
private final EmitterProcessor<Error> globalErrors = EmitterProcessor.create();
private final EmitterProcessor<FatalErrorType> fatalErrors = EmitterProcessor.create();
private final One<FatalErrorType> fatalError = Sinks.one();
private final AsyncTdMiddle td;
private final String logName;
private final Flux<Update> incomingUpdatesCo;
@ -71,7 +73,7 @@ public class AsyncTdEasy {
var sch = Schedulers.newSingle("TdEasyUpdates");
// todo: use Duration.ZERO instead of 10ms interval
this.incomingUpdatesCo = td.getUpdates()
this.incomingUpdatesCo = td.receive()
.filterWhen(update -> Mono.from(requestedDefinitiveExit).map(requestedDefinitiveExit -> !requestedDefinitiveExit))
.subscribeOn(sch)
.publishOn(sch)
@ -102,7 +104,7 @@ public class AsyncTdEasy {
}
// Register fatal error handler
fatalErrors.flatMap(settings.getFatalErrorHandler()::onFatalError).subscribe();
fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribe();
return true;
})
@ -141,8 +143,8 @@ public class AsyncTdEasy {
/**
* Receives fatal errors from TDLib.
*/
public Flux<FatalErrorType> getFatalErrors() {
return Flux.from(fatalErrors).publishOn(Schedulers.boundedElastic());
public Mono<FatalErrorType> getFatalErrors() {
return fatalError.asMono();
}
/**
@ -349,19 +351,19 @@ public class AsyncTdEasy {
var error = (Error) obj;
switch (error.message) {
case "PHONE_NUMBER_INVALID":
fatalErrors.onNext(FatalErrorType.PHONE_NUMBER_INVALID);
fatalError.tryEmitValue(FatalErrorType.PHONE_NUMBER_INVALID);
break;
case "ACCESS_TOKEN_INVALID":
fatalErrors.onNext(FatalErrorType.ACCESS_TOKEN_INVALID);
fatalError.tryEmitValue(FatalErrorType.ACCESS_TOKEN_INVALID);
break;
case "CONNECTION_KILLED":
fatalErrors.onNext(FatalErrorType.CONNECTION_KILLED);
fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED);
break;
case "INVALID_UPDATE":
fatalErrors.onNext(FatalErrorType.INVALID_UPDATE);
fatalError.tryEmitValue(FatalErrorType.INVALID_UPDATE);
break;
case "PHONE_NUMBER_BANNED":
fatalErrors.onNext(FatalErrorType.PHONE_NUMBER_BANNED);
fatalError.tryEmitValue(FatalErrorType.PHONE_NUMBER_BANNED);
break;
}
}

View File

@ -12,7 +12,7 @@ public interface AsyncTdMiddle {
*
* @return Updates (or Error if received a fatal error. A fatal error means that the client is no longer working)
*/
Flux<TdApi.Object> getUpdates();
Flux<TdApi.Object> receive();
/**
* Sends request to TDLib. May be called from any thread.

View File

@ -305,7 +305,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
@Override
public Flux<TdApi.Object> getUpdates() {
public Flux<TdApi.Object> receive() {
return incomingUpdatesCo.filter(Objects::nonNull).flatMap(v -> v);
}

View File

@ -11,6 +11,7 @@ import it.tdlight.jni.TdApi.Object;
import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.utils.MonoUtils;
@ -19,17 +20,17 @@ import org.slf4j.LoggerFactory;
import org.warp.commonutils.error.InitializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMiddle {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleDirect.class);
protected final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
protected AsyncTdDirectImpl td;
private String botAddress;
private String botAlias;
private Flux<TdApi.Object> updatesFluxCo;
private Empty<Object> closeRequest = Sinks.empty();
public AsyncTdMiddleDirect() {
}
@ -67,44 +68,34 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
this.td = new AsyncTdDirectImpl(botAlias);
td.initializeClient().doOnSuccess(v -> {
updatesFluxCo = Mono.from(tdClosed).filter(closed -> !closed).flatMapMany(_x -> td.getUpdates(WAIT_DURATION, 1000).flatMap(result -> {
if (result.succeeded()) {
if (result.result().succeeded()) {
return Mono.just(result.result().result());
} else {
logger.error("Received an errored update",
ResponseError.newResponseError("incoming update", botAlias, result.result().cause())
);
return Mono.<TdApi.Object>empty();
}
} else {
logger.error("Received an errored update", result.cause());
return Mono.<TdApi.Object>empty();
}
})).publish().refCount(1);
startPromise.complete();
}).subscribe(success -> {
}, (ex) -> {
logger.error("Failure when starting bot " + botAlias + ", address " + botAddress, ex);
startPromise.fail(new InitializationException("Can't connect tdlib middle client to tdlib middle server!"));
}, () -> {});
startPromise.complete();
}
@Override
public void stop(Promise<Void> stopPromise) {
tdClosed.onNext(true);
td.destroyClient().onErrorResume(ex -> {
logger.error("Can't destroy client", ex);
return Mono.empty();
}).doOnTerminate(() -> {
logger.debug("TdMiddle verticle stopped");
}).subscribe(MonoUtils.toSubscriber(stopPromise));
closeRequest.tryEmitEmpty();
stopPromise.complete();
}
@Override
public Flux<TdApi.Object> getUpdates() {
return Flux.from(updatesFluxCo);
public Flux<TdApi.Object> receive() {
return td
.receive(new AsyncTdDirectOptions(WAIT_DURATION, 1000))
.takeUntilOther(closeRequest.asMono())
.doOnError(ex -> {
logger.info("TdMiddle verticle error", ex);
})
.doOnTerminate(() -> {
logger.debug("TdMiddle verticle stopped");
}).flatMap(result -> {
if (result.succeeded()) {
return Mono.just(result.result());
} else {
logger.error("Received an errored update",
ResponseError.newResponseError("incoming update", botAlias, result.cause()));
return Mono.<TdApi.Object>empty();
}
});
}
@Override

View File

@ -55,8 +55,8 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
}
@Override
public Flux<TdApi.Object> getUpdates() {
return cli.filter(Objects::nonNull).single().flatMapMany(AsyncTdMiddleEventBusClient::getUpdates);
public Flux<TdApi.Object> receive() {
return cli.filter(Objects::nonNull).single().flatMapMany(AsyncTdMiddleEventBusClient::receive);
}
@Override

View File

@ -11,11 +11,10 @@ import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import it.tdlight.common.ConstructorDetector;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec;
@ -29,15 +28,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -52,17 +51,19 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private static final boolean ENABLE_MINIMUM_POLL_WAIT_INTERVAL = false;
private final TdClusterManager cluster;
private final AsyncTdDirectOptions tdOptions;
private String botAlias;
private String botAddress;
private boolean local;
protected final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
protected AsyncTdDirectImpl td;
protected final LinkedBlockingQueue<AsyncResult<TdResult<TdApi.Object>>> queue = new LinkedBlockingQueue<>();
private final Scheduler tdSrvPoll;
private List<Consumer<Promise<Void>>> onBeforeStopListeners = new CopyOnWriteArrayList<>();
private List<Consumer<Promise<Void>>> onAfterStopListeners = new CopyOnWriteArrayList<>();
/**
* Value is not important, emits when a request is received
*/
private final List<Consumer<Promise<Void>>> onBeforeStopListeners = new CopyOnWriteArrayList<>();
private final List<Consumer<Promise<Void>>> onAfterStopListeners = new CopyOnWriteArrayList<>();
private MessageConsumer<?> startConsumer;
private MessageConsumer<byte[]> isWorkingConsumer;
private MessageConsumer<byte[]> getNextUpdatesBlockConsumer;
@ -71,6 +72,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
this.cluster = clusterManager;
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000);
this.tdSrvPoll = Schedulers.newSingle("TdSrvPoll");
if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) {
cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec());
@ -103,25 +105,20 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) {
td.initializeClient()
.then(this.listen())
.then(this.pipe())
.then(Mono.<Void>create(registrationSink -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
}))
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
msg.reply(EMPTY);
});
this.listen().then(this.pipe()).then(Mono.<Void>create(registrationSink -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})).subscribeOn(this.tdSrvPoll)
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
msg.reply(EMPTY);
});
} else {
msg.reply(EMPTY);
}
@ -137,37 +134,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().send(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000));
var clientDeadCheckThread = new Thread(() -> {
Throwable ex = null;
try {
while (!Thread.interrupted()) {
Thread.sleep(5000);
Promise<Void> promise = Promise.promise();
cluster
.getEventBus()
.request(botAddress + ".readyToStart",
EMPTY,
cluster.newDeliveryOpts().setSendTimeout(30000),
r -> promise.handle(r.mapEmpty())
);
promise.future().toCompletionStage().toCompletableFuture().join();
}
} catch (Throwable e) {
ex = e;
}
var closed = tdClosed.blockFirst();
if (closed == null || !closed) {
if (ex != null && !ex.getMessage().contains("NO_HANDLERS")) {
logger.error(ex.getLocalizedMessage(), ex);
}
logger.error("TDLib client disconnected unexpectedly! Closing the server...");
undeploy(() -> {});
}
});
clientDeadCheckThread.setName("Client " + botAddress + " dead check");
clientDeadCheckThread.setDaemon(true);
clientDeadCheckThread.start();
}
public void onBeforeStop(Consumer<Promise<Void>> r) {
@ -185,35 +151,26 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause());
}
td.destroyClient().onErrorResume(ex -> {
logger.error("Can't destroy client", ex);
return Mono.empty();
}).doOnError(err -> {
logger.error("TdMiddle verticle failed during stop", err);
}).then(Mono.create(sink -> {
this.isWorkingConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Can't unregister consumer", result.cause());
Mono.create(sink -> this.isWorkingConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Can't unregister consumer", result.cause());
}
this.startConsumer.unregister(result2 -> {
if (result2.failed()) {
logger.error("Can't unregister consumer", result2.cause());
}
this.startConsumer.unregister(result2 -> {
if (result2.failed()) {
logger.error("Can't unregister consumer", result2.cause());
this.getNextUpdatesBlockConsumer.unregister(result3 -> {
if (result3.failed()) {
logger.error("Can't unregister consumer", result3.cause());
}
tdClosed.onNext(true);
this.getNextUpdatesBlockConsumer.unregister(result3 -> {
if (result3.failed()) {
logger.error("Can't unregister consumer", result3.cause());
this.executeConsumer.unregister(result4 -> {
if (result4.failed()) {
logger.error("Can't unregister consumer", result4.cause());
}
this.executeConsumer.unregister(result4 -> {
if (result4.failed()) {
logger.error("Can't unregister consumer", result4.cause());
}
sink.success();
});
sink.success();
});
});
});
@ -227,7 +184,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
stopPromise.complete();
});
}).subscribe();
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when stopping", ex);
}, () -> {});
});
}
@ -250,99 +209,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private Mono<Void> listen() {
return Mono.<Void>create(registrationSink -> {
this.getNextUpdatesBlockConsumer = cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message<byte[]> msg) -> {
// Run only if tdlib is not closed
Mono.from(tdClosed).single().filter(tdClosedVal -> !tdClosedVal)
// Get a list of updates
.flatMap(_v -> Mono
.<List<AsyncResult<TdResult<TdApi.Object>>>>fromSupplier(() -> {
// When a request is asked, read up to 1000 available updates in the queue
long requestTime = System.currentTimeMillis();
ArrayList<AsyncResult<TdResult<TdApi.Object>>> updatesBatch = new ArrayList<>();
try {
// Block until an update is found or 5 seconds passed
var item = queue.poll(5, TimeUnit.SECONDS);
if (item != null) {
updatesBatch.add(item);
queue.drainTo(updatesBatch, local ? 999 : 998);
if (ENABLE_MINIMUM_POLL_WAIT_INTERVAL) {
if (!local) {
var item2 = queue.poll(100, TimeUnit.MILLISECONDS);
if (item2 != null) {
updatesBatch.add(item2);
queue.drainTo(updatesBatch, Math.max(0, 1000 - updatesBatch.size()));
}
}
}
}
} catch (InterruptedException ex) {
// polling cancelled, expected sometimes
}
// Return the updates found, can be an empty list
return updatesBatch;
})
// Subscribe on td server poll scheduler
.subscribeOn(tdSrvPoll)
// Filter out empty updates lists
.filter(updates -> !updates.isEmpty())
.repeatWhen(s -> s
// Take until an update is received
.takeWhile(n -> n == 0)
// Take until tdClosed is true
.takeUntilOther(tdClosed.filter(closed -> closed).take(1).single())
)
// Take only one update list
.take(1)
// If 5 seconds pass, return a list with 0 updates
.timeout(Duration.ofSeconds(5), Mono.just(List.of()))
// Return 1 list or 0 lists
.singleOrEmpty()
)
.flatMap(receivedList -> {
return Flux.fromIterable(receivedList).flatMap(result -> {
if (result.succeeded()) {
var received = result.result();
if (OUTPUT_REQUESTS) {
System.out.println("<=: " + received
.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
return Mono.create(sink -> {
if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var authState = (UpdateAuthorizationState) received.result();
if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
undeploy(sink::success);
} else {
sink.success();
}
} else {
sink.success();
}
}).then(Mono.<TdResult<TdApi.Object>>create(sink -> {
sink.success(received);
}));
} else {
logger.error("Received an error update", result.cause());
return Mono.empty();
}
}).collectList().map(list -> new TdOptionalList(true, list));
})
.defaultIfEmpty(new TdOptionalList(false, Collections.emptyList()))
.subscribeOn(tdSrvPoll)
.subscribe(v -> {
msg.reply(v);
}, ex -> {
logger.error("Error when processing a 'receiveUpdates' request", ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {});
});
getNextUpdatesBlockConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
}).then(Mono.<Void>create(registrationSink -> {
this.executeConsumer = cluster.getEventBus().<ExecuteObject>consumer(botAddress + ".execute", (Message<ExecuteObject> msg) -> {
try {
@ -370,7 +236,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} catch (Exception ex) {
sink.error(ex);
}
})
}).subscribeOn(this.tdSrvPoll)
.subscribe(response -> {}, ex -> {
logger.error("Error when processing a request", ex);
msg.fail(500, ex.getLocalizedMessage());
@ -382,7 +248,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
});
executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
}));
});
}
private void undeploy(Runnable whenUndeployed) {
@ -395,14 +261,66 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
private Mono<Void> pipe() {
return Mono.fromCallable(() -> {
td
.getUpdates(WAIT_DURATION, 1000)
.bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100))
.subscribe(nextItems -> {
queue.addAll(nextItems);
return Mono.create(registeredSink -> {
Many<Boolean> getNextUpdatesBlockTrigger = Sinks.many().replay().latestOrDefault(true);
Flux<Message<byte[]>> getNextUpdatesBlockFlux = Flux.create(sink -> {
this.getNextUpdatesBlockConsumer = cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message<byte[]> msg) -> {
getNextUpdatesBlockTrigger.tryEmitNext(true);
sink.next(msg);
});
registeredSink.success();
});
Empty<Boolean> needClose = Sinks.empty();
Flux<TdOptionalList> updatesFlux = Flux.mergeSequential(td.receive(tdOptions)
.doOnSubscribe(s -> {
// After 60 seconds of not receiving request for updates, dispose td flux assuming the middle client died.
getNextUpdatesBlockTrigger.asFlux().timeout(Duration.ofSeconds(30), timeout -> {
needClose.tryEmitEmpty();
s.cancel();
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when signalling that the next update block request has been received", ex);
}, () -> {
needClose.tryEmitEmpty();
});
})
.doOnNext(update -> {
if (OUTPUT_REQUESTS) {
System.out.println("<=: " + update
.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
})
.bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100))
.windowTimeout(1, Duration.ofSeconds(5))
.flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))
.map(updatesGroup -> new TdOptionalList(true, updatesGroup)),
Mono.fromSupplier(() -> {
return new TdOptionalList(false, Collections.emptyList());
})
);
Flux.zip(updatesFlux, getNextUpdatesBlockFlux)
.subscribeOn(this.tdSrvPoll)
.subscribe(tuple -> {
var results = tuple.getT1();
var messageHandle = tuple.getT2();
if (!results.isSet()) {
System.out.println("<=: end (1)");
}
messageHandle.reply(results, cluster.newDeliveryOpts().setLocalOnly(local));
}, error -> logger.error("Error when receiving or forwarding updates", error), () -> {
needClose.tryEmitEmpty();
});
return (Void) null;
needClose.asMono().subscribeOn(this.tdSrvPoll).subscribe(v_ -> {}, e -> {}, () -> {
this.undeploy(() -> {});
});
});
}
}