Use AtomicReference when needed
This commit is contained in:
parent
b4d53393fe
commit
c30ac9bec6
@ -12,6 +12,8 @@ import it.tdlight.tdlibsession.td.TdError;
|
|||||||
import it.tdlight.tdlibsession.td.TdResult;
|
import it.tdlight.tdlibsession.td.TdResult;
|
||||||
import it.tdlight.utils.MonoUtils;
|
import it.tdlight.utils.MonoUtils;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.warp.commonutils.log.Logger;
|
import org.warp.commonutils.log.Logger;
|
||||||
import org.warp.commonutils.log.LoggerFactory;
|
import org.warp.commonutils.log.LoggerFactory;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
@ -28,7 +30,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
|
|||||||
private final JsonObject implementationDetails;
|
private final JsonObject implementationDetails;
|
||||||
private final String botAlias;
|
private final String botAlias;
|
||||||
|
|
||||||
private final One<ReactorTelegramClient> td = Sinks.one();
|
private final AtomicReference<ReactorTelegramClient> td = new AtomicReference<>(null);
|
||||||
|
|
||||||
public AsyncTdDirectImpl(TelegramClientFactory telegramClientFactory,
|
public AsyncTdDirectImpl(TelegramClientFactory telegramClientFactory,
|
||||||
JsonObject implementationDetails,
|
JsonObject implementationDetails,
|
||||||
@ -41,43 +43,38 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
|
|||||||
@Override
|
@Override
|
||||||
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean synchronous) {
|
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean synchronous) {
|
||||||
if (synchronous) {
|
if (synchronous) {
|
||||||
return Mono
|
return MonoUtils.fromBlockingSingle(() -> {
|
||||||
.firstWithSignal(td.asMono(), Mono.empty())
|
var td = this.td.get();
|
||||||
.single()
|
|
||||||
.timeout(Duration.ofSeconds(5))
|
|
||||||
.flatMap(td -> MonoUtils.fromBlockingSingle(() -> {
|
|
||||||
logger.trace("Sending execute to TDLib {}", request);
|
logger.trace("Sending execute to TDLib {}", request);
|
||||||
|
Objects.requireNonNull(td, "td is null");
|
||||||
TdResult<T> result = TdResult.of(td.execute(request));
|
TdResult<T> result = TdResult.of(td.execute(request));
|
||||||
logger.trace("Received execute response from TDLib. Request was {}", request);
|
logger.trace("Received execute response from TDLib. Request was {}", request);
|
||||||
return result;
|
return result;
|
||||||
}))
|
|
||||||
.single();
|
|
||||||
} else {
|
|
||||||
return Mono
|
|
||||||
.firstWithSignal(td.asMono(), Mono.empty())
|
|
||||||
.single()
|
|
||||||
.timeout(Duration.ofSeconds(5))
|
|
||||||
.<TdResult<T>>flatMap(td -> {
|
|
||||||
if (td != null) {
|
|
||||||
return Mono
|
|
||||||
.fromRunnable(() -> logger.trace("Sending request to TDLib {}", request))
|
|
||||||
.then(td.send(request, timeout))
|
|
||||||
.single()
|
|
||||||
.<TdResult<T>>map(TdResult::of)
|
|
||||||
.doOnSuccess(s -> logger.trace("Sent request to TDLib {}", request));
|
|
||||||
} else {
|
|
||||||
return Mono.fromCallable(() -> {
|
|
||||||
if (request.getConstructor() == Close.CONSTRUCTOR) {
|
|
||||||
logger.trace("Sending close success to request {}", request);
|
|
||||||
return TdResult.of(new Ok());
|
|
||||||
} else {
|
|
||||||
logger.trace("Sending close error to request {} ", request);
|
|
||||||
throw new IllegalStateException("TDLib client is destroyed");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.single();
|
.single();
|
||||||
|
} else {
|
||||||
|
return Mono.defer(() -> {
|
||||||
|
var td = this.td.get();
|
||||||
|
|
||||||
|
if (td != null) {
|
||||||
|
return Mono
|
||||||
|
.fromRunnable(() -> logger.trace("Sending request to TDLib {}", request))
|
||||||
|
.then(td.send(request, timeout))
|
||||||
|
.single()
|
||||||
|
.<TdResult<T>>map(TdResult::of)
|
||||||
|
.doOnSuccess(s -> logger.trace("Sent request to TDLib {}", request));
|
||||||
|
} else {
|
||||||
|
return Mono.fromCallable(() -> {
|
||||||
|
if (request.getConstructor() == Close.CONSTRUCTOR) {
|
||||||
|
logger.trace("Sending close success to request {}", request);
|
||||||
|
return TdResult.of(new Ok());
|
||||||
|
} else {
|
||||||
|
logger.trace("Sending close error to request {} ", request);
|
||||||
|
throw new IllegalStateException("TDLib client is destroyed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,12 +84,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
|
|||||||
.fromRunnable(() -> logger.trace("Initializing"))
|
.fromRunnable(() -> logger.trace("Initializing"))
|
||||||
.then(telegramClientFactory.create(implementationDetails))
|
.then(telegramClientFactory.create(implementationDetails))
|
||||||
.flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient))
|
.flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient))
|
||||||
.flatMap(client -> {
|
.doOnNext(td::set)
|
||||||
if (td.tryEmitValue(client).isFailure()) {
|
|
||||||
return Mono.error(new TdError(500, "Failed to emit td client"));
|
|
||||||
}
|
|
||||||
return Mono.just(client);
|
|
||||||
})
|
|
||||||
.doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1)))
|
.doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1)))
|
||||||
.doOnSuccess(s -> logger.trace("Initialized"))
|
.doOnSuccess(s -> logger.trace("Initialized"))
|
||||||
.then();
|
.then();
|
||||||
@ -103,9 +95,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
|
|||||||
// If closed it will be either true or false
|
// If closed it will be either true or false
|
||||||
final One<Boolean> closedFromTd = Sinks.one();
|
final One<Boolean> closedFromTd = Sinks.one();
|
||||||
return Mono
|
return Mono
|
||||||
.firstWithSignal(td.asMono(), Mono.empty())
|
.fromCallable(td::get)
|
||||||
.single()
|
.single()
|
||||||
.timeout(Duration.ofSeconds(5))
|
|
||||||
.flatMapMany(ReactorTelegramClient::receive)
|
.flatMapMany(ReactorTelegramClient::receive)
|
||||||
.doOnNext(update -> {
|
.doOnNext(update -> {
|
||||||
// Close the emitter if receive closed state
|
// Close the emitter if receive closed state
|
||||||
|
@ -17,8 +17,8 @@ import it.tdlight.jni.TdApi.CheckAuthenticationCode;
|
|||||||
import it.tdlight.jni.TdApi.CheckAuthenticationPassword;
|
import it.tdlight.jni.TdApi.CheckAuthenticationPassword;
|
||||||
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
||||||
import it.tdlight.jni.TdApi.Error;
|
import it.tdlight.jni.TdApi.Error;
|
||||||
|
import it.tdlight.jni.TdApi.Function;
|
||||||
import it.tdlight.jni.TdApi.Object;
|
import it.tdlight.jni.TdApi.Object;
|
||||||
import it.tdlight.jni.TdApi.OptionValue;
|
|
||||||
import it.tdlight.jni.TdApi.OptionValueBoolean;
|
import it.tdlight.jni.TdApi.OptionValueBoolean;
|
||||||
import it.tdlight.jni.TdApi.OptionValueEmpty;
|
import it.tdlight.jni.TdApi.OptionValueEmpty;
|
||||||
import it.tdlight.jni.TdApi.OptionValueInteger;
|
import it.tdlight.jni.TdApi.OptionValueInteger;
|
||||||
@ -43,6 +43,8 @@ import java.time.Duration;
|
|||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.reactivestreams.Publisher;
|
import org.reactivestreams.Publisher;
|
||||||
import org.warp.commonutils.log.Logger;
|
import org.warp.commonutils.log.Logger;
|
||||||
import org.warp.commonutils.log.LoggerFactory;
|
import org.warp.commonutils.log.LoggerFactory;
|
||||||
@ -55,14 +57,15 @@ import reactor.core.publisher.Sinks.One;
|
|||||||
import reactor.core.scheduler.Scheduler;
|
import reactor.core.scheduler.Scheduler;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public class AsyncTdEasy {
|
public class AsyncTdEasy {
|
||||||
|
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
|
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
|
||||||
|
|
||||||
private final Many<AuthorizationState> authState = Sinks.many().replay().latest();
|
private final Many<AuthorizationState> authState = Sinks.many().replay().latest();
|
||||||
private final Many<Boolean> requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false);
|
private final AtomicBoolean requestedDefinitiveExit = new AtomicBoolean();
|
||||||
private final Many<TdEasySettings> settings = Sinks.many().replay().latest();
|
private final AtomicReference<TdEasySettings> settings = new AtomicReference<>(null);
|
||||||
private final Many<Error> globalErrors = Sinks.many().multicast().onBackpressureBuffer();
|
private final Many<Error> globalErrors = Sinks.many().multicast().onBackpressureBuffer();
|
||||||
private final One<FatalErrorType> fatalError = Sinks.one();
|
private final One<FatalErrorType> fatalError = Sinks.one();
|
||||||
private final AsyncTdMiddle td;
|
private final AsyncTdMiddle td;
|
||||||
@ -94,34 +97,35 @@ public class AsyncTdEasy {
|
|||||||
logger.error(ex.getLocalizedMessage(), ex);
|
logger.error(ex.getLocalizedMessage(), ex);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.doOnComplete(() -> {
|
.doOnComplete(() -> authState.asFlux().take(1, true).single().subscribeOn(scheduler).subscribe(authState -> {
|
||||||
authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> {
|
onUpdatesTerminated();
|
||||||
onUpdatesTerminated();
|
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
||||||
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
logger.warn("Updates stream has closed while"
|
||||||
logger.warn("Updates stream has closed while"
|
+ " the current authorization state is"
|
||||||
+ " the current authorization state is"
|
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
||||||
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED);
|
||||||
this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED);
|
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
||||||
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
}
|
||||||
}
|
})).doOnError(ex -> authState.asFlux()
|
||||||
});
|
.take(1, true)
|
||||||
}).doOnError(ex -> {
|
.single()
|
||||||
authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> {
|
.subscribeOn(scheduler)
|
||||||
onUpdatesTerminated();
|
.subscribe(authState -> {
|
||||||
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
onUpdatesTerminated();
|
||||||
logger.warn("Updates stream has terminated with an error while"
|
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
||||||
+ " the current authorization state is"
|
logger.warn("Updates stream has terminated with an error while"
|
||||||
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
+ " the current authorization state is"
|
||||||
this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED);
|
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
||||||
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED);
|
||||||
}
|
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
||||||
});
|
}
|
||||||
});
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onUpdatesTerminated() {
|
private void onUpdatesTerminated() {
|
||||||
logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true");
|
logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true");
|
||||||
requestedDefinitiveExit.tryEmitNext(true);
|
requestedDefinitiveExit.set(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Mono<Void> create(TdEasySettings settings) {
|
public Mono<Void> create(TdEasySettings settings) {
|
||||||
@ -143,7 +147,7 @@ public class AsyncTdEasy {
|
|||||||
})
|
})
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
.flatMap(_v -> {
|
.flatMap(_v -> {
|
||||||
this.settings.tryEmitNext(settings);
|
this.settings.set(settings);
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
})
|
})
|
||||||
.then(td.initialize());
|
.then(td.initialize());
|
||||||
@ -160,10 +164,6 @@ public class AsyncTdEasy {
|
|||||||
* Get incoming updates from TDLib.
|
* Get incoming updates from TDLib.
|
||||||
*/
|
*/
|
||||||
public Flux<TdApi.Update> getIncomingUpdates() {
|
public Flux<TdApi.Update> getIncomingUpdates() {
|
||||||
return getIncomingUpdates(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) {
|
|
||||||
return incomingUpdates;
|
return incomingUpdates;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,10 +190,8 @@ public class AsyncTdEasy {
|
|||||||
return td.execute(request, timeout, false);
|
return td.execute(request, timeout, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj,
|
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(Function obj, boolean synchronous) {
|
||||||
Duration timeout,
|
return td.execute(obj, AsyncTdEasy.DEFAULT_TIMEOUT, synchronous);
|
||||||
boolean synchronous) {
|
|
||||||
return td.execute(obj, timeout, synchronous);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -201,7 +199,7 @@ public class AsyncTdEasy {
|
|||||||
* @param i level
|
* @param i level
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setVerbosityLevel(int i) {
|
public Mono<Void> setVerbosityLevel(int i) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), DEFAULT_TIMEOUT, true));
|
return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -209,9 +207,7 @@ public class AsyncTdEasy {
|
|||||||
* @param name option name
|
* @param name option name
|
||||||
*/
|
*/
|
||||||
public Mono<Void> clearOption(String name) {
|
public Mono<Void> clearOption(String name) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()),
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false
|
||||||
DEFAULT_TIMEOUT,
|
|
||||||
false
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -221,9 +217,7 @@ public class AsyncTdEasy {
|
|||||||
* @param value option value
|
* @param value option value
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setOptionString(String name, String value) {
|
public Mono<Void> setOptionString(String name, String value) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)),
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false
|
||||||
DEFAULT_TIMEOUT,
|
|
||||||
false
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,9 +227,7 @@ public class AsyncTdEasy {
|
|||||||
* @param value option value
|
* @param value option value
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setOptionInteger(String name, long value) {
|
public Mono<Void> setOptionInteger(String name, long value) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)),
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false
|
||||||
DEFAULT_TIMEOUT,
|
|
||||||
false
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,9 +237,7 @@ public class AsyncTdEasy {
|
|||||||
* @param value option value
|
* @param value option value
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setOptionBoolean(String name, boolean value) {
|
public Mono<Void> setOptionBoolean(String name, boolean value) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)),
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false
|
||||||
DEFAULT_TIMEOUT,
|
|
||||||
false
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,7 +248,7 @@ public class AsyncTdEasy {
|
|||||||
*/
|
*/
|
||||||
public Mono<String> getOptionString(String name) {
|
public Mono<String> getOptionString(String name) {
|
||||||
return this
|
return this
|
||||||
.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false)
|
.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), false)
|
||||||
.<TdApi.OptionValue>handle(MonoUtils::orElseThrow)
|
.<TdApi.OptionValue>handle(MonoUtils::orElseThrow)
|
||||||
.flatMap(value -> {
|
.flatMap(value -> {
|
||||||
switch (value.getConstructor()) {
|
switch (value.getConstructor()) {
|
||||||
@ -280,7 +270,7 @@ public class AsyncTdEasy {
|
|||||||
*/
|
*/
|
||||||
public Mono<Long> getOptionInteger(String name) {
|
public Mono<Long> getOptionInteger(String name) {
|
||||||
return this
|
return this
|
||||||
.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false)
|
.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), false)
|
||||||
.<TdApi.OptionValue>handle(MonoUtils::orElseThrow)
|
.<TdApi.OptionValue>handle(MonoUtils::orElseThrow)
|
||||||
.flatMap(value -> {
|
.flatMap(value -> {
|
||||||
switch (value.getConstructor()) {
|
switch (value.getConstructor()) {
|
||||||
@ -302,7 +292,7 @@ public class AsyncTdEasy {
|
|||||||
*/
|
*/
|
||||||
public Mono<Boolean> getOptionBoolean(String name) {
|
public Mono<Boolean> getOptionBoolean(String name) {
|
||||||
return this
|
return this
|
||||||
.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), DEFAULT_TIMEOUT, false)
|
.<TdApi.OptionValue>sendDirectly(new TdApi.GetOption(name), false)
|
||||||
.<TdApi.OptionValue>handle(MonoUtils::orElseThrow)
|
.<TdApi.OptionValue>handle(MonoUtils::orElseThrow)
|
||||||
.flatMap(value -> {
|
.flatMap(value -> {
|
||||||
switch (value.getConstructor()) {
|
switch (value.getConstructor()) {
|
||||||
@ -343,15 +333,14 @@ public class AsyncTdEasy {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.then(requestedDefinitiveExit.asFlux().take(1).single())
|
.then(Mono.fromCallable(requestedDefinitiveExit::get).single())
|
||||||
.filter(closeRequested -> !closeRequested)
|
.filter(closeRequested -> !closeRequested)
|
||||||
.doOnSuccess(s -> {
|
.doOnSuccess(s -> {
|
||||||
logger.debug("Setting requestedDefinitiveExit: true");
|
logger.debug("Setting requestedDefinitiveExit: true");
|
||||||
requestedDefinitiveExit.tryEmitNext(true);
|
requestedDefinitiveExit.set(true);
|
||||||
})
|
})
|
||||||
.then(td.execute(new TdApi.Close(), DEFAULT_TIMEOUT, false).doOnSubscribe(s -> {
|
.doOnSuccess(s -> logger.debug("Sending TdApi.Close"))
|
||||||
logger.debug("Sending TdApi.Close");
|
.then(td.execute(new TdApi.Close(), DEFAULT_TIMEOUT, false))
|
||||||
}))
|
|
||||||
.doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"",
|
.doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"",
|
||||||
closeResponse.toString().replace('\n', ' ')
|
closeResponse.toString().replace('\n', ' ')
|
||||||
))
|
))
|
||||||
@ -359,12 +348,8 @@ public class AsyncTdEasy {
|
|||||||
.filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR)
|
.filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR)
|
||||||
.take(1)
|
.take(1)
|
||||||
.singleOrEmpty())
|
.singleOrEmpty())
|
||||||
.doOnNext(ok -> {
|
.doOnNext(ok -> logger.debug("Received AuthorizationStateClosed after TdApi.Close"))
|
||||||
logger.debug("Received AuthorizationStateClosed after TdApi.Close");
|
.doOnSuccess(s -> logger.info("AsyncTdEasy closed successfully"))
|
||||||
})
|
|
||||||
.doOnSuccess(s -> {
|
|
||||||
logger.info("AsyncTdEasy closed successfully");
|
|
||||||
})
|
|
||||||
.then();
|
.then();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -415,10 +400,6 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Mono<Boolean> isBot() {
|
|
||||||
return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) {
|
private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) {
|
||||||
return Mono
|
return Mono
|
||||||
.just(updateObj)
|
.just(updateObj)
|
||||||
@ -428,7 +409,7 @@ public class AsyncTdEasy {
|
|||||||
.flatMap(obj -> {
|
.flatMap(obj -> {
|
||||||
switch (obj.getConstructor()) {
|
switch (obj.getConstructor()) {
|
||||||
case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR:
|
case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR:
|
||||||
return thenOrFatalError(Mono.from(this.settings.asFlux()).map(settings -> {
|
return thenOrFatalError(Mono.fromCallable(this.settings::get).single().map(settings -> {
|
||||||
var parameters = new TdlibParameters();
|
var parameters = new TdlibParameters();
|
||||||
parameters.useTestDc = settings.useTestDc;
|
parameters.useTestDc = settings.useTestDc;
|
||||||
parameters.databaseDirectory = settings.databaseDirectory;
|
parameters.databaseDirectory = settings.databaseDirectory;
|
||||||
@ -446,27 +427,27 @@ public class AsyncTdEasy {
|
|||||||
parameters.enableStorageOptimizer = settings.enableStorageOptimizer;
|
parameters.enableStorageOptimizer = settings.enableStorageOptimizer;
|
||||||
parameters.ignoreFileNames = settings.ignoreFileNames;
|
parameters.ignoreFileNames = settings.ignoreFileNames;
|
||||||
return new SetTdlibParameters(parameters);
|
return new SetTdlibParameters(parameters);
|
||||||
}).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, DEFAULT_TIMEOUT, false)));
|
}).flatMap((SetTdlibParameters obj1) -> sendDirectly(obj1, false)));
|
||||||
case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR:
|
case AuthorizationStateWaitEncryptionKey.CONSTRUCTOR:
|
||||||
return thenOrFatalError(sendDirectly(new CheckDatabaseEncryptionKey(), DEFAULT_TIMEOUT, false))
|
return thenOrFatalError(sendDirectly(new CheckDatabaseEncryptionKey(), false))
|
||||||
.onErrorResume((error) -> {
|
.onErrorResume((error) -> {
|
||||||
logger.error("Error while checking TDLib encryption key", error);
|
logger.error("Error while checking TDLib encryption key", error);
|
||||||
return sendDirectly(new TdApi.Close(), DEFAULT_TIMEOUT, false).then();
|
return sendDirectly(new TdApi.Close(), false).then();
|
||||||
});
|
});
|
||||||
case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR:
|
case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR:
|
||||||
return thenOrFatalError(Mono.from(this.settings.asFlux()).flatMap(settings -> {
|
return thenOrFatalError(Mono.fromCallable(this.settings::get).single().flatMap(settings -> {
|
||||||
if (settings.isPhoneNumberSet()) {
|
if (settings.isPhoneNumberSet()) {
|
||||||
return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()),
|
return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()),
|
||||||
new PhoneNumberAuthenticationSettings(false, false, false)
|
new PhoneNumberAuthenticationSettings(false, false, false)
|
||||||
), DEFAULT_TIMEOUT, false);
|
), false);
|
||||||
} else if (settings.isBotTokenSet()) {
|
} else if (settings.isBotTokenSet()) {
|
||||||
return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), DEFAULT_TIMEOUT, false);
|
return sendDirectly(new CheckAuthenticationBotToken(settings.getBotToken()), false);
|
||||||
} else {
|
} else {
|
||||||
return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot"));
|
return Mono.error(new IllegalArgumentException("A bot is neither an user or a bot"));
|
||||||
}
|
}
|
||||||
})).onErrorResume((error) -> {
|
})).onErrorResume((error) -> {
|
||||||
logger.error("Error while waiting for phone number", error);
|
logger.error("Error while waiting for phone number", error);
|
||||||
return sendDirectly(new TdApi.Close(), DEFAULT_TIMEOUT, false).then();
|
return sendDirectly(new TdApi.Close(), false).then();
|
||||||
});
|
});
|
||||||
case AuthorizationStateWaitRegistration.CONSTRUCTOR:
|
case AuthorizationStateWaitRegistration.CONSTRUCTOR:
|
||||||
var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj;
|
var authorizationStateWaitRegistration = (AuthorizationStateWaitRegistration) obj;
|
||||||
@ -477,63 +458,62 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings.asFlux())
|
.fromCallable(this.settings::get)
|
||||||
|
.single()
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler
|
||||||
return MonoUtils.thenOrLogRepeatError(() -> handler
|
.onParameterRequest(Parameter.ASK_FIRST_NAME, new ParameterInfoEmpty())
|
||||||
.onParameterRequest(Parameter.ASK_FIRST_NAME, new ParameterInfoEmpty())
|
.filter(Objects::nonNull)
|
||||||
.filter(Objects::nonNull)
|
.map(String::trim)
|
||||||
.map(String::trim)
|
.filter(firstName -> !firstName.isBlank() && firstName.length() <= 64 && firstName.length() > 0)
|
||||||
.filter(firstName -> !firstName.isBlank() && firstName.length() <= 64 && firstName.length() > 0)
|
.repeatWhen(s -> s.takeWhile(n -> n == 0))
|
||||||
.repeatWhen(s -> s.takeWhile(n -> n == 0))
|
.last()
|
||||||
.last()
|
.doOnNext(firstName -> registerUser.firstName = firstName)
|
||||||
.doOnNext(firstName -> registerUser.firstName = firstName)
|
.then(handler
|
||||||
.then(handler
|
.onParameterRequest(Parameter.ASK_LAST_NAME, new ParameterInfoEmpty())
|
||||||
.onParameterRequest(Parameter.ASK_LAST_NAME, new ParameterInfoEmpty())
|
.filter(Objects::nonNull)
|
||||||
.filter(Objects::nonNull)
|
.map(String::trim)
|
||||||
.map(String::trim)
|
.filter(lastName -> lastName.length() <= 64)
|
||||||
.filter(lastName -> lastName.length() <= 64)
|
.repeatWhen(s -> s.takeWhile(n -> n == 0))
|
||||||
.repeatWhen(s -> s.takeWhile(n -> n == 0))
|
.last()
|
||||||
.last()
|
.defaultIfEmpty("")
|
||||||
.defaultIfEmpty("")
|
.doOnNext(lastName -> registerUser.lastName = lastName)
|
||||||
.doOnNext(lastName -> registerUser.lastName = lastName)
|
)
|
||||||
)
|
.then(sendDirectly(registerUser, false))
|
||||||
.then(sendDirectly(registerUser, DEFAULT_TIMEOUT, false)));
|
));
|
||||||
});
|
|
||||||
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR:
|
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR:
|
||||||
var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj;
|
var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj;
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings.asFlux())
|
.fromCallable(this.settings::get)
|
||||||
|
.single()
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> handler.onParameterRequest(Parameter.NOTIFY_LINK,
|
||||||
return handler.onParameterRequest(Parameter.NOTIFY_LINK,
|
new ParameterInfoNotifyLink(authorizationStateWaitOtherDeviceConfirmation.link)
|
||||||
new ParameterInfoNotifyLink(authorizationStateWaitOtherDeviceConfirmation.link)
|
));
|
||||||
);
|
|
||||||
});
|
|
||||||
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR:
|
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR:
|
||||||
var authorizationStateWaitCode = (AuthorizationStateWaitCode) obj;
|
var authorizationStateWaitCode = (AuthorizationStateWaitCode) obj;
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings.asFlux())
|
.fromCallable(this.settings::get)
|
||||||
|
.single()
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler
|
||||||
return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_CODE,
|
.onParameterRequest(Parameter.ASK_CODE, new ParameterInfoCode(authorizationStateWaitCode.codeInfo.phoneNumber,
|
||||||
new ParameterInfoCode(authorizationStateWaitCode.codeInfo.phoneNumber,
|
authorizationStateWaitCode.codeInfo.nextType,
|
||||||
authorizationStateWaitCode.codeInfo.nextType,
|
authorizationStateWaitCode.codeInfo.timeout,
|
||||||
authorizationStateWaitCode.codeInfo.timeout,
|
authorizationStateWaitCode.codeInfo.type))
|
||||||
authorizationStateWaitCode.codeInfo.type
|
.flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), false))
|
||||||
)
|
));
|
||||||
).flatMap(code -> sendDirectly(new CheckAuthenticationCode(code), DEFAULT_TIMEOUT, false)));
|
|
||||||
});
|
|
||||||
case AuthorizationStateWaitPassword.CONSTRUCTOR:
|
case AuthorizationStateWaitPassword.CONSTRUCTOR:
|
||||||
var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj;
|
var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj;
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings.asFlux())
|
.fromCallable(this.settings::get)
|
||||||
|
.single()
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> MonoUtils.thenOrLogRepeatError(() -> handler
|
||||||
return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD,
|
.onParameterRequest(Parameter.ASK_PASSWORD, new ParameterInfoPasswordHint(
|
||||||
new ParameterInfoPasswordHint(authorizationStateWaitPassword.passwordHint)
|
authorizationStateWaitPassword.passwordHint))
|
||||||
).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), DEFAULT_TIMEOUT, false)));
|
.flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false))
|
||||||
});
|
));
|
||||||
case AuthorizationStateReady.CONSTRUCTOR: {
|
case AuthorizationStateReady.CONSTRUCTOR: {
|
||||||
this.authState.tryEmitNext(new AuthorizationStateReady());
|
this.authState.tryEmitNext(new AuthorizationStateReady());
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
@ -543,17 +523,20 @@ public class AsyncTdEasy {
|
|||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
case AuthorizationStateClosed.CONSTRUCTOR:
|
case AuthorizationStateClosed.CONSTRUCTOR:
|
||||||
logger.debug("Received AuthorizationStateClosed from td");
|
logger.debug("Received AuthorizationStateClosed from td");
|
||||||
return Mono.from(requestedDefinitiveExit.asFlux()).doOnNext(closeRequested -> {
|
return Mono.fromCallable(() -> {
|
||||||
|
var closeRequested = this.requestedDefinitiveExit.get();
|
||||||
if (closeRequested) {
|
if (closeRequested) {
|
||||||
logger.debug("td closed successfully");
|
logger.debug("td closed successfully");
|
||||||
} else {
|
} else {
|
||||||
logger.warn("td closed unexpectedly: {}", logName);
|
logger.warn("td closed unexpectedly: {}", logName);
|
||||||
}
|
}
|
||||||
authState.tryEmitNext(obj);
|
authState.tryEmitNext(obj);
|
||||||
|
return closeRequested;
|
||||||
}).flatMap(closeRequested -> {
|
}).flatMap(closeRequested -> {
|
||||||
if (closeRequested) {
|
if (closeRequested) {
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings.asFlux())
|
.fromCallable(settings::get)
|
||||||
|
.single()
|
||||||
.map(settings -> settings.databaseDirectory)
|
.map(settings -> settings.databaseDirectory)
|
||||||
.map(Path::of)
|
.map(Path::of)
|
||||||
.flatMapIterable(sessionPath -> Set.of(sessionPath.resolve("media"),
|
.flatMapIterable(sessionPath -> Set.of(sessionPath.resolve("media"),
|
||||||
@ -579,9 +562,9 @@ public class AsyncTdEasy {
|
|||||||
logger.error("Can't delete temporary session subdirectory", e);
|
logger.error("Can't delete temporary session subdirectory", e);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.then(Mono.just(closeRequested));
|
.then(Mono.just(true));
|
||||||
} else {
|
} else {
|
||||||
return Mono.just(closeRequested);
|
return Mono.just(false);
|
||||||
}
|
}
|
||||||
}).then();
|
}).then();
|
||||||
default:
|
default:
|
||||||
|
@ -14,6 +14,7 @@ import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer;
|
|||||||
import it.tdlight.utils.MonoUtils;
|
import it.tdlight.utils.MonoUtils;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.warp.commonutils.error.InitializationException;
|
import org.warp.commonutils.error.InitializationException;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
@ -29,7 +30,8 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
|
|||||||
private final Vertx vertx;
|
private final Vertx vertx;
|
||||||
|
|
||||||
private final AsyncTdMiddleEventBusServer srv;
|
private final AsyncTdMiddleEventBusServer srv;
|
||||||
private final One<AsyncTdMiddle> cli = Sinks.one();
|
private final AtomicReference<AsyncTdMiddle> cli = new AtomicReference<>(null);
|
||||||
|
private final AtomicReference<Throwable> startError = new AtomicReference<>(null);
|
||||||
private final JsonObject implementationDetails;
|
private final JsonObject implementationDetails;
|
||||||
|
|
||||||
|
|
||||||
@ -64,23 +66,35 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
|
|||||||
.start(botId, botAlias, true, implementationDetails, tuple.getT2())
|
.start(botId, botAlias, true, implementationDetails, tuple.getT2())
|
||||||
.thenReturn(tuple.getT1()))
|
.thenReturn(tuple.getT1()))
|
||||||
.onErrorMap(InitializationException::new)
|
.onErrorMap(InitializationException::new)
|
||||||
.doOnNext(this.cli::tryEmitValue)
|
.doOnNext(this.cli::set)
|
||||||
.doOnError(this.cli::tryEmitError)
|
.doOnError(this.startError::set)
|
||||||
.thenReturn(this);
|
.thenReturn(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Mono<Void> initialize() {
|
public Mono<Void> initialize() {
|
||||||
return cli.asMono().single().flatMap(AsyncTdMiddle::initialize);
|
var startError = this.startError.get();
|
||||||
|
if (startError != null) {
|
||||||
|
return Mono.error(startError);
|
||||||
|
}
|
||||||
|
return Mono.fromCallable(cli::get).single().flatMap(AsyncTdMiddle::initialize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<TdApi.Object> receive() {
|
public Flux<TdApi.Object> receive() {
|
||||||
return cli.asMono().single().flatMapMany(AsyncTdMiddle::receive);
|
var startError = this.startError.get();
|
||||||
|
if (startError != null) {
|
||||||
|
return Flux.error(startError);
|
||||||
|
}
|
||||||
|
return Mono.fromCallable(cli::get).single().flatMapMany(AsyncTdMiddle::receive);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean executeDirectly) {
|
public <T extends Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean executeDirectly) {
|
||||||
return cli.asMono().single().flatMap(c -> c.execute(request, timeout, executeDirectly));
|
var startError = this.startError.get();
|
||||||
|
if (startError != null) {
|
||||||
|
return Mono.error(startError);
|
||||||
|
}
|
||||||
|
return Mono.fromCallable(cli::get).single().flatMap(c -> c.execute(request, timeout, executeDirectly));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user