Improve client

This commit is contained in:
Andrea Cavalli 2021-09-30 19:18:25 +02:00
parent 7c9f3e2879
commit f1c6fcf1a0

View File

@ -26,6 +26,7 @@ import it.tdlight.utils.MonoUtils;
import java.net.ConnectException; import java.net.ConnectException;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
import org.warp.commonutils.locks.LockUtils; import org.warp.commonutils.locks.LockUtils;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
@ -56,6 +57,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final Empty<Void> updatesStreamEnd = Sinks.one(); private final Empty<Void> updatesStreamEnd = Sinks.one();
// This will only result in a crash, never completes in other ways // This will only result in a crash, never completes in other ways
private final Empty<Void> crash = Sinks.one(); private final Empty<Void> crash = Sinks.one();
// Crash exception
private final AtomicReference<Throwable> crashException = new AtomicReference<>(null);
// This will only result in a successful completion, never completes in other ways // This will only result in a successful completion, never completes in other ways
private final Empty<Void> pingFail = Sinks.one(); private final Empty<Void> pingFail = Sinks.one();
// This will only result in a successful completion, never completes in other ways. // This will only result in a successful completion, never completes in other ways.
@ -277,6 +280,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
ex.setStackTrace(new StackTraceElement[0]); ex.setStackTrace(new StackTraceElement[0]);
throw ex; throw ex;
})).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> { })).onErrorResume(ex -> MonoUtils.fromBlockingSingle(() -> {
crashException.set(ex);
EmitResult result; EmitResult result;
while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) { while ((result = this.crash.tryEmitError(ex)) == EmitResult.FAIL_NON_SERIALIZED) {
// 10ms // 10ms
@ -325,10 +329,18 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
authStateClosing.tryEmitEmpty(); authStateClosing.tryEmitEmpty();
break; break;
case TdApi.AuthorizationStateClosed.CONSTRUCTOR: case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) return Mono
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib"))
.flatMap(latestBinlogMsg -> Mono.fromCallable(latestBinlogMsg::body).subscribeOn(Schedulers.parallel())) .then(cluster.getEventBus()
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY)
.as(MonoUtils::toMono)
)
.flatMap(latestBinlogMsg -> Mono
.fromCallable(latestBinlogMsg::body)
.subscribeOn(Schedulers.parallel())
)
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: "
+ BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
.doOnSuccess(s -> logger.info("Overwritten binlog from server")) .doOnSuccess(s -> logger.info("Overwritten binlog from server"))
.thenReturn(update); .thenReturn(update);
@ -340,30 +352,32 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
@Override @Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) { public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean executeDirectly) {
var req = new ExecuteObject(executeDirectly, request); var req = new ExecuteObject(executeDirectly, request);
return Mono
.firstWithSignal( var crashMono = crash.asMono()
MonoUtils .doOnSuccess(s -> logger.debug("Failed request {} because the TDLib session was already crashed", request))
.castVoid(crash .then(Mono.<TdResult<T>>empty());
.asMono()
.doOnSuccess(s -> logger var executionMono = Mono
.debug("Failed request {} because the TDLib session was already crashed", request))
),
Mono
.fromRunnable(() -> logger.trace("Executing request {}", request)) .fromRunnable(() -> logger.trace("Executing request {}", request))
.then(cluster.getEventBus().<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono)) .then(cluster.getEventBus()
.<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions)
.as(MonoUtils::toMono)
)
.onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex))
.<TdResult<T>>handle((resp, sink) -> { .<TdResult<T>>handle((resp, sink) -> {
if (resp.body() == null) { if (resp.body() == null) {
sink.error(ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty"))); var tdError = new TdError(500, "Response is empty");
sink.error(ResponseError.newResponseError(request, botAlias, tdError));
} else { } else {
sink.next(resp.body().toTdResult()); sink.next(resp.body().toTdResult());
} }
}) })
.doOnSuccess(s -> logger.trace("Executed request {}", request)) .doOnSuccess(s -> logger.trace("Executed request {}", request))
.doOnError(ex -> logger.debug("Failed request {}: {}", req, ex)) .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex));
)
.switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { return Mono
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); .firstWithSignal(crashMono, executionMono)
}))); .switchIfEmpty(Mono.error(() -> ResponseError.newResponseError(request, botAlias,
new TdError(500, "Client is closed or response is empty"))));
} }
} }