Add test-client
This commit is contained in:
parent
8b71b9e2cb
commit
6bd8f6281b
@ -92,10 +92,11 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
|
|||||||
|
|
||||||
// Send close if the stream is disposed before tdlib is closed
|
// Send close if the stream is disposed before tdlib is closed
|
||||||
updatesSink.onDispose(() -> {
|
updatesSink.onDispose(() -> {
|
||||||
// Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false.
|
|
||||||
closedFromTd.tryEmitValue(false);
|
|
||||||
|
|
||||||
closedFromTd.asMono().filter(isClosedFromTd -> !isClosedFromTd).doOnNext(x -> {
|
Mono.firstWithValue(closedFromTd.asMono(), Mono.empty()).switchIfEmpty(Mono.defer(() -> Mono.fromRunnable(() -> {
|
||||||
|
// Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false.
|
||||||
|
closedFromTd.tryEmitValue(false);
|
||||||
|
}))).filter(isClosedFromTd -> !isClosedFromTd).doOnNext(x -> {
|
||||||
logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()...");
|
logger.warn("The stream has been disposed without closing tdlib. Sending TdApi.Close()...");
|
||||||
client.send(new Close(),
|
client.send(new Close(),
|
||||||
result -> logger.warn("Close result: {}", result),
|
result -> logger.warn("Close result: {}", result),
|
||||||
|
@ -19,7 +19,7 @@ public class TelegramClientFactory {
|
|||||||
case "native-client":
|
case "native-client":
|
||||||
return ClientManager.create();
|
return ClientManager.create();
|
||||||
case "test-client":
|
case "test-client":
|
||||||
//todo: create a noop test client with optional behaviours
|
return new TestClient(implementationDetails.getJsonObject("test-client-settings"));
|
||||||
default:
|
default:
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
106
src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java
Normal file
106
src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
package it.tdlight.tdlibsession.td.direct;
|
||||||
|
|
||||||
|
import io.vertx.core.json.JsonArray;
|
||||||
|
import io.vertx.core.json.JsonObject;
|
||||||
|
import it.tdlight.common.ExceptionHandler;
|
||||||
|
import it.tdlight.common.ResultHandler;
|
||||||
|
import it.tdlight.common.TelegramClient;
|
||||||
|
import it.tdlight.common.UpdatesHandler;
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.jni.TdApi.AuthorizationStateWaitTdlibParameters;
|
||||||
|
import it.tdlight.jni.TdApi.Error;
|
||||||
|
import it.tdlight.jni.TdApi.Function;
|
||||||
|
import it.tdlight.jni.TdApi.Object;
|
||||||
|
import it.tdlight.jni.TdApi.Ok;
|
||||||
|
import it.tdlight.jni.TdApi.SetLogTagVerbosityLevel;
|
||||||
|
import it.tdlight.jni.TdApi.SetLogVerbosityLevel;
|
||||||
|
import it.tdlight.jni.TdApi.SetOption;
|
||||||
|
import it.tdlight.jni.TdApi.SetTdlibParameters;
|
||||||
|
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
|
||||||
|
import it.tdlight.tdlibsession.td.TdError;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.publisher.Sinks;
|
||||||
|
import reactor.core.publisher.Sinks.Many;
|
||||||
|
import reactor.core.scheduler.Scheduler;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
|
public class TestClient implements TelegramClient {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(TestClient.class);
|
||||||
|
|
||||||
|
private final Many<Object> updates = Sinks.many().unicast().onBackpressureError();
|
||||||
|
private final Scheduler testClientScheduler = Schedulers.newSingle("test-client", true);
|
||||||
|
private final List<String> features;
|
||||||
|
private UpdatesHandler updatesHandler;
|
||||||
|
private ExceptionHandler updateExceptionHandler;
|
||||||
|
private ExceptionHandler defaultExceptionHandler;
|
||||||
|
|
||||||
|
public TestClient(JsonObject testClientSettings) {
|
||||||
|
JsonArray features = testClientSettings.getJsonArray("features", new JsonArray());
|
||||||
|
this.features = new ArrayList<>();
|
||||||
|
for (java.lang.Object feature : features) {
|
||||||
|
var featureName = (String) feature;
|
||||||
|
this.features.add(featureName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initialize(UpdatesHandler updatesHandler,
|
||||||
|
ExceptionHandler updateExceptionHandler,
|
||||||
|
ExceptionHandler defaultExceptionHandler) {
|
||||||
|
this.updatesHandler = updatesHandler;
|
||||||
|
this.updateExceptionHandler = updateExceptionHandler;
|
||||||
|
this.defaultExceptionHandler = defaultExceptionHandler;
|
||||||
|
|
||||||
|
updates
|
||||||
|
.asFlux()
|
||||||
|
.buffer(50)
|
||||||
|
.doOnNext(ub -> logger.trace("Received update block of size {}", ub.size()))
|
||||||
|
.subscribeOn(testClientScheduler)
|
||||||
|
.subscribe(updatesHandler::onUpdates, updateExceptionHandler::onException);
|
||||||
|
|
||||||
|
for (String featureName : features) {
|
||||||
|
switch (featureName) {
|
||||||
|
case "infinite-status-update":
|
||||||
|
Mono.<TdApi.Object>just(new UpdateAuthorizationState(new AuthorizationStateWaitTdlibParameters()))
|
||||||
|
.repeat()
|
||||||
|
.buffer(100)
|
||||||
|
.doOnNext(updatesHandler::onUpdates)
|
||||||
|
.subscribeOn(testClientScheduler)
|
||||||
|
.subscribe();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unknown feature name: " + featureName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void send(Function query, ResultHandler resultHandler, ExceptionHandler exceptionHandler) {
|
||||||
|
switch (query.getConstructor()) {
|
||||||
|
case SetLogVerbosityLevel.CONSTRUCTOR:
|
||||||
|
case SetLogTagVerbosityLevel.CONSTRUCTOR:
|
||||||
|
case SetTdlibParameters.CONSTRUCTOR:
|
||||||
|
case SetOption.CONSTRUCTOR:
|
||||||
|
resultHandler.onResult(new Ok());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
exceptionHandler.onException(new TdError(500, "Unsupported"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object execute(Function query) {
|
||||||
|
switch (query.getConstructor()) {
|
||||||
|
case SetLogVerbosityLevel.CONSTRUCTOR:
|
||||||
|
case SetLogTagVerbosityLevel.CONSTRUCTOR:
|
||||||
|
case SetTdlibParameters.CONSTRUCTOR:
|
||||||
|
case SetOption.CONSTRUCTOR:
|
||||||
|
return new Ok();
|
||||||
|
}
|
||||||
|
return new Error(500, "Unsupported");
|
||||||
|
}
|
||||||
|
}
|
@ -46,11 +46,10 @@ import org.reactivestreams.Publisher;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.warp.commonutils.error.InitializationException;
|
import org.warp.commonutils.error.InitializationException;
|
||||||
import reactor.core.publisher.EmitterProcessor;
|
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.ReplayProcessor;
|
|
||||||
import reactor.core.publisher.Sinks;
|
import reactor.core.publisher.Sinks;
|
||||||
|
import reactor.core.publisher.Sinks.Many;
|
||||||
import reactor.core.publisher.Sinks.One;
|
import reactor.core.publisher.Sinks.One;
|
||||||
import reactor.core.scheduler.Scheduler;
|
import reactor.core.scheduler.Scheduler;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
@ -59,11 +58,14 @@ public class AsyncTdEasy {
|
|||||||
|
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
|
|
||||||
private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false);
|
private static final Scheduler scheduler = Schedulers.newParallel("AsyncTdEasy",
|
||||||
private final ReplayProcessor<AuthorizationState> authState = ReplayProcessor.create(1);
|
Runtime.getRuntime().availableProcessors(),
|
||||||
private final ReplayProcessor<Boolean> requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false);
|
false
|
||||||
private final ReplayProcessor<TdEasySettings> settings = ReplayProcessor.cacheLast();
|
);
|
||||||
private final EmitterProcessor<Error> globalErrors = EmitterProcessor.create();
|
private final Many<AuthorizationState> authState = Sinks.many().replay().latest();
|
||||||
|
private final Many<Boolean> requestedDefinitiveExit = Sinks.many().replay().latestOrDefault(false);
|
||||||
|
private final Many<TdEasySettings> settings = Sinks.many().replay().latest();
|
||||||
|
private final Many<Error> globalErrors = Sinks.many().multicast().onBackpressureBuffer();
|
||||||
private final One<FatalErrorType> fatalError = Sinks.one();
|
private final One<FatalErrorType> fatalError = Sinks.one();
|
||||||
private final AsyncTdMiddle td;
|
private final AsyncTdMiddle td;
|
||||||
private final String logName;
|
private final String logName;
|
||||||
@ -78,7 +80,6 @@ public class AsyncTdEasy {
|
|||||||
this.incomingUpdates = td.receive()
|
this.incomingUpdates = td.receive()
|
||||||
.flatMap(this::preprocessUpdates)
|
.flatMap(this::preprocessUpdates)
|
||||||
.flatMap(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update)))
|
.flatMap(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update)))
|
||||||
.filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR)
|
|
||||||
.map(upd -> (TdApi.Update) upd.getUpdate())
|
.map(upd -> (TdApi.Update) upd.getUpdate())
|
||||||
.doOnError(ex -> {
|
.doOnError(ex -> {
|
||||||
if (ex instanceof TdError) {
|
if (ex instanceof TdError) {
|
||||||
@ -96,33 +97,31 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
.doOnComplete(() -> {
|
.doOnComplete(() -> {
|
||||||
authState.asFlux().take(1).single().subscribeOn(Schedulers.single()).subscribe(authState -> {
|
authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> {
|
||||||
onUpdatesTerminated();
|
onUpdatesTerminated();
|
||||||
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
||||||
logger.warn("Updates stream has closed while"
|
logger.warn("Updates stream has closed while"
|
||||||
+ " the current authorization state is"
|
+ " the current authorization state is"
|
||||||
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
||||||
this.authState.onNext(new AuthorizationStateClosed());
|
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}).doOnError(ex -> {
|
}).doOnError(ex -> {
|
||||||
authState.asFlux().take(1).single().subscribeOn(Schedulers.single()).subscribe(authState -> {
|
authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> {
|
||||||
onUpdatesTerminated();
|
onUpdatesTerminated();
|
||||||
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
|
||||||
logger.warn("Updates stream has terminated with an error while"
|
logger.warn("Updates stream has terminated with an error while"
|
||||||
+ " the current authorization state is"
|
+ " the current authorization state is"
|
||||||
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
||||||
this.authState.onNext(new AuthorizationStateClosed());
|
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
})
|
});
|
||||||
.subscribeOn(scheduler)
|
|
||||||
.publish().refCount(1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onUpdatesTerminated() {
|
private void onUpdatesTerminated() {
|
||||||
logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true");
|
logger.debug("Incoming updates flux terminated. Setting requestedDefinitiveExit: true");
|
||||||
requestedDefinitiveExit.onNext(true);
|
requestedDefinitiveExit.tryEmitNext(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Mono<Void> create(TdEasySettings settings) {
|
public Mono<Void> create(TdEasySettings settings) {
|
||||||
@ -138,13 +137,13 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Register fatal error handler
|
// Register fatal error handler
|
||||||
fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribeOn(scheduler).subscribe();
|
fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).publishOn(scheduler).subscribe();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
})
|
})
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
.flatMap(_v -> {
|
.flatMap(_v -> {
|
||||||
this.settings.onNext(settings);
|
this.settings.tryEmitNext(settings);
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -153,7 +152,7 @@ public class AsyncTdEasy {
|
|||||||
* Get TDLib state
|
* Get TDLib state
|
||||||
*/
|
*/
|
||||||
public Flux<AuthorizationState> getState() {
|
public Flux<AuthorizationState> getState() {
|
||||||
return authState.distinct().subscribeOn(scheduler);
|
return authState.asFlux().distinct().publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -164,21 +163,21 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) {
|
private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) {
|
||||||
return incomingUpdates.subscribeOn(scheduler);
|
return incomingUpdates.publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get generic error updates from TDLib (When they are not linked to a precise request).
|
* Get generic error updates from TDLib (When they are not linked to a precise request).
|
||||||
*/
|
*/
|
||||||
public Flux<TdApi.Error> getIncomingErrors() {
|
public Flux<TdApi.Error> getIncomingErrors() {
|
||||||
return Flux.from(globalErrors).subscribeOn(scheduler);
|
return Flux.from(globalErrors.asFlux()).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Receives fatal errors from TDLib.
|
* Receives fatal errors from TDLib.
|
||||||
*/
|
*/
|
||||||
public Mono<FatalErrorType> getFatalErrors() {
|
public Mono<FatalErrorType> getFatalErrors() {
|
||||||
return Mono.from(fatalError.asMono()).subscribeOn(scheduler);
|
return Mono.from(fatalError.asMono()).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -190,7 +189,7 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj, boolean synchronous) {
|
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj, boolean synchronous) {
|
||||||
return td.<T>execute(obj, synchronous).subscribeOn(scheduler);
|
return td.<T>execute(obj, synchronous).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -198,7 +197,7 @@ public class AsyncTdEasy {
|
|||||||
* @param i level
|
* @param i level
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setVerbosityLevel(int i) {
|
public Mono<Void> setVerbosityLevel(int i) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).subscribeOn(scheduler);
|
return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -206,7 +205,7 @@ public class AsyncTdEasy {
|
|||||||
* @param name option name
|
* @param name option name
|
||||||
*/
|
*/
|
||||||
public Mono<Void> clearOption(String name) {
|
public Mono<Void> clearOption(String name) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).subscribeOn(scheduler);
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -215,7 +214,7 @@ public class AsyncTdEasy {
|
|||||||
* @param value option value
|
* @param value option value
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setOptionString(String name, String value) {
|
public Mono<Void> setOptionString(String name, String value) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).subscribeOn(scheduler);
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -224,7 +223,7 @@ public class AsyncTdEasy {
|
|||||||
* @param value option value
|
* @param value option value
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setOptionInteger(String name, long value) {
|
public Mono<Void> setOptionInteger(String name, long value) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).subscribeOn(scheduler);
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -233,7 +232,7 @@ public class AsyncTdEasy {
|
|||||||
* @param value option value
|
* @param value option value
|
||||||
*/
|
*/
|
||||||
public Mono<Void> setOptionBoolean(String name, boolean value) {
|
public Mono<Void> setOptionBoolean(String name, boolean value) {
|
||||||
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).subscribeOn(scheduler);
|
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -252,7 +251,7 @@ public class AsyncTdEasy {
|
|||||||
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
|
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
|
||||||
+ value.getClass().getSimpleName()));
|
+ value.getClass().getSimpleName()));
|
||||||
}
|
}
|
||||||
}).subscribeOn(scheduler);
|
}).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -271,7 +270,7 @@ public class AsyncTdEasy {
|
|||||||
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
|
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
|
||||||
+ value.getClass().getSimpleName()));
|
+ value.getClass().getSimpleName()));
|
||||||
}
|
}
|
||||||
}).subscribeOn(scheduler);
|
}).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -290,7 +289,7 @@ public class AsyncTdEasy {
|
|||||||
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
|
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
|
||||||
+ value.getClass().getSimpleName()));
|
+ value.getClass().getSimpleName()));
|
||||||
}
|
}
|
||||||
}).subscribeOn(scheduler);
|
}).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -301,7 +300,7 @@ public class AsyncTdEasy {
|
|||||||
* @return The request response.
|
* @return The request response.
|
||||||
*/
|
*/
|
||||||
public <T extends Object> Mono<TdResult<T>> execute(TdApi.Function request) {
|
public <T extends Object> Mono<TdResult<T>> execute(TdApi.Function request) {
|
||||||
return td.<T>execute(request, true).subscribeOn(scheduler);
|
return td.<T>execute(request, true).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -329,7 +328,7 @@ public class AsyncTdEasy {
|
|||||||
.filter(closeRequested -> !closeRequested)
|
.filter(closeRequested -> !closeRequested)
|
||||||
.doOnSuccess(s -> {
|
.doOnSuccess(s -> {
|
||||||
logger.debug("Setting requestedDefinitiveExit: true");
|
logger.debug("Setting requestedDefinitiveExit: true");
|
||||||
requestedDefinitiveExit.onNext(true);
|
requestedDefinitiveExit.tryEmitNext(true);
|
||||||
})
|
})
|
||||||
.then(td.execute(new TdApi.Close(), false).doOnSubscribe(s -> {
|
.then(td.execute(new TdApi.Close(), false).doOnSubscribe(s -> {
|
||||||
logger.debug("Sending TdApi.Close");
|
logger.debug("Sending TdApi.Close");
|
||||||
@ -337,7 +336,7 @@ public class AsyncTdEasy {
|
|||||||
.doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"",
|
.doOnNext(closeResponse -> logger.debug("TdApi.Close response is: \"{}\"",
|
||||||
closeResponse.toString().replace('\n', ' ')
|
closeResponse.toString().replace('\n', ' ')
|
||||||
))
|
))
|
||||||
.then(authState
|
.then(authState.asFlux()
|
||||||
.filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR)
|
.filter(authorizationState -> authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR)
|
||||||
.take(1)
|
.take(1)
|
||||||
.singleOrEmpty())
|
.singleOrEmpty())
|
||||||
@ -348,7 +347,7 @@ public class AsyncTdEasy {
|
|||||||
logger.info("AsyncTdEasy closed successfully");
|
logger.info("AsyncTdEasy closed successfully");
|
||||||
})
|
})
|
||||||
.then()
|
.then()
|
||||||
.subscribeOn(scheduler);
|
.publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -374,13 +373,13 @@ public class AsyncTdEasy {
|
|||||||
|
|
||||||
switch (error.message) {
|
switch (error.message) {
|
||||||
case "PHONE_CODE_INVALID":
|
case "PHONE_CODE_INVALID":
|
||||||
globalErrors.onNext(error);
|
globalErrors.tryEmitNext(error);
|
||||||
return new UpdateAuthorizationState(new AuthorizationStateWaitCode());
|
return new UpdateAuthorizationState(new AuthorizationStateWaitCode());
|
||||||
case "PASSWORD_HASH_INVALID":
|
case "PASSWORD_HASH_INVALID":
|
||||||
globalErrors.onNext(error);
|
globalErrors.tryEmitNext(error);
|
||||||
return new UpdateAuthorizationState(new AuthorizationStateWaitPassword());
|
return new UpdateAuthorizationState(new AuthorizationStateWaitPassword());
|
||||||
default:
|
default:
|
||||||
globalErrors.onNext(error);
|
globalErrors.tryEmitNext(error);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
analyzeFatalErrors(error);
|
analyzeFatalErrors(error);
|
||||||
@ -388,7 +387,7 @@ public class AsyncTdEasy {
|
|||||||
} else {
|
} else {
|
||||||
return (Update) obj;
|
return (Update) obj;
|
||||||
}
|
}
|
||||||
}).subscribeOn(scheduler);
|
}).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void analyzeFatalErrors(Object obj) {
|
private void analyzeFatalErrors(Object obj) {
|
||||||
@ -415,7 +414,7 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public Mono<Boolean> isBot() {
|
public Mono<Boolean> isBot() {
|
||||||
return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet).subscribeOn(scheduler);
|
return Mono.from(settings.asFlux()).single().map(TdEasySettings::isBotTokenSet).publishOn(scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) {
|
private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) {
|
||||||
@ -427,7 +426,7 @@ public class AsyncTdEasy {
|
|||||||
.flatMap(obj -> {
|
.flatMap(obj -> {
|
||||||
switch (obj.getConstructor()) {
|
switch (obj.getConstructor()) {
|
||||||
case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR:
|
case AuthorizationStateWaitTdlibParameters.CONSTRUCTOR:
|
||||||
return thenOrFatalError(Mono.from(this.settings).map(settings -> {
|
return thenOrFatalError(Mono.from(this.settings.asFlux()).map(settings -> {
|
||||||
var parameters = new TdlibParameters();
|
var parameters = new TdlibParameters();
|
||||||
parameters.useTestDc = settings.useTestDc;
|
parameters.useTestDc = settings.useTestDc;
|
||||||
parameters.databaseDirectory = settings.databaseDirectory;
|
parameters.databaseDirectory = settings.databaseDirectory;
|
||||||
@ -453,7 +452,7 @@ public class AsyncTdEasy {
|
|||||||
return sendDirectly(new TdApi.Close(), false).then();
|
return sendDirectly(new TdApi.Close(), false).then();
|
||||||
});
|
});
|
||||||
case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR:
|
case AuthorizationStateWaitPhoneNumber.CONSTRUCTOR:
|
||||||
return thenOrFatalError(Mono.from(this.settings).flatMap(settings -> {
|
return thenOrFatalError(Mono.from(this.settings.asFlux()).flatMap(settings -> {
|
||||||
if (settings.isPhoneNumberSet()) {
|
if (settings.isPhoneNumberSet()) {
|
||||||
return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()),
|
return sendDirectly(new SetAuthenticationPhoneNumber(String.valueOf(settings.getPhoneNumber()),
|
||||||
new PhoneNumberAuthenticationSettings(false, false, false)
|
new PhoneNumberAuthenticationSettings(false, false, false)
|
||||||
@ -476,7 +475,7 @@ public class AsyncTdEasy {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings)
|
.from(settings.asFlux())
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> {
|
||||||
return MonoUtils.thenOrLogRepeatError(() -> handler
|
return MonoUtils.thenOrLogRepeatError(() -> handler
|
||||||
@ -502,7 +501,7 @@ public class AsyncTdEasy {
|
|||||||
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR:
|
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR:
|
||||||
var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj;
|
var authorizationStateWaitOtherDeviceConfirmation = (AuthorizationStateWaitOtherDeviceConfirmation) obj;
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings)
|
.from(settings.asFlux())
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> {
|
||||||
return handler.onParameterRequest(Parameter.NOTIFY_LINK,
|
return handler.onParameterRequest(Parameter.NOTIFY_LINK,
|
||||||
@ -512,7 +511,7 @@ public class AsyncTdEasy {
|
|||||||
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR:
|
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR:
|
||||||
var authorizationStateWaitCode = (AuthorizationStateWaitCode) obj;
|
var authorizationStateWaitCode = (AuthorizationStateWaitCode) obj;
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings)
|
.from(settings.asFlux())
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> {
|
||||||
return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_CODE,
|
return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_CODE,
|
||||||
@ -526,7 +525,7 @@ public class AsyncTdEasy {
|
|||||||
case AuthorizationStateWaitPassword.CONSTRUCTOR:
|
case AuthorizationStateWaitPassword.CONSTRUCTOR:
|
||||||
var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj;
|
var authorizationStateWaitPassword = (AuthorizationStateWaitPassword) obj;
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings)
|
.from(settings.asFlux())
|
||||||
.map(TdEasySettings::getParameterRequestHandler)
|
.map(TdEasySettings::getParameterRequestHandler)
|
||||||
.flatMap(handler -> {
|
.flatMap(handler -> {
|
||||||
return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD,
|
return MonoUtils.thenOrLogRepeatError(() -> handler.onParameterRequest(Parameter.ASK_PASSWORD,
|
||||||
@ -534,7 +533,7 @@ public class AsyncTdEasy {
|
|||||||
).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false)));
|
).flatMap(password -> sendDirectly(new CheckAuthenticationPassword(password), false)));
|
||||||
});
|
});
|
||||||
case AuthorizationStateReady.CONSTRUCTOR: {
|
case AuthorizationStateReady.CONSTRUCTOR: {
|
||||||
this.authState.onNext(new AuthorizationStateReady());
|
this.authState.tryEmitNext(new AuthorizationStateReady());
|
||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
case AuthorizationStateClosing.CONSTRUCTOR:
|
case AuthorizationStateClosing.CONSTRUCTOR:
|
||||||
@ -542,17 +541,17 @@ public class AsyncTdEasy {
|
|||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
case AuthorizationStateClosed.CONSTRUCTOR:
|
case AuthorizationStateClosed.CONSTRUCTOR:
|
||||||
logger.debug("Received AuthorizationStateClosed from td");
|
logger.debug("Received AuthorizationStateClosed from td");
|
||||||
return Mono.from(requestedDefinitiveExit).doOnNext(closeRequested -> {
|
return Mono.from(requestedDefinitiveExit.asFlux()).doOnNext(closeRequested -> {
|
||||||
if (closeRequested) {
|
if (closeRequested) {
|
||||||
logger.debug("td closed successfully");
|
logger.debug("td closed successfully");
|
||||||
} else {
|
} else {
|
||||||
logger.warn("td closed unexpectedly: {}", logName);
|
logger.warn("td closed unexpectedly: {}", logName);
|
||||||
}
|
}
|
||||||
authState.onNext(obj);
|
authState.tryEmitNext(obj);
|
||||||
}).flatMap(closeRequested -> {
|
}).flatMap(closeRequested -> {
|
||||||
if (closeRequested) {
|
if (closeRequested) {
|
||||||
return Mono
|
return Mono
|
||||||
.from(settings)
|
.from(settings.asFlux())
|
||||||
.map(settings -> settings.databaseDirectory)
|
.map(settings -> settings.databaseDirectory)
|
||||||
.map(Path::of)
|
.map(Path::of)
|
||||||
.flatMapIterable(sessionPath -> Set.of(sessionPath.resolve("media"),
|
.flatMapIterable(sessionPath -> Set.of(sessionPath.resolve("media"),
|
||||||
@ -587,8 +586,7 @@ public class AsyncTdEasy {
|
|||||||
return Mono.empty();
|
return Mono.empty();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj))
|
.then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj));
|
||||||
.subscribeOn(scheduler);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public <T extends TdApi.Object> Mono<Void> thenOrFatalError(Mono<TdResult<T>> optionalMono) {
|
public <T extends TdApi.Object> Mono<Void> thenOrFatalError(Mono<TdResult<T>> optionalMono) {
|
||||||
|
@ -21,6 +21,7 @@ import it.tdlight.utils.BinlogAsyncFile;
|
|||||||
import it.tdlight.utils.BinlogUtils;
|
import it.tdlight.utils.BinlogUtils;
|
||||||
import it.tdlight.utils.MonoUtils;
|
import it.tdlight.utils.MonoUtils;
|
||||||
import it.tdlight.utils.MonoUtils.SinkRWStream;
|
import it.tdlight.utils.MonoUtils.SinkRWStream;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -44,7 +45,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
|
|
||||||
private final One<BinlogAsyncFile> binlog = Sinks.one();
|
private final One<BinlogAsyncFile> binlog = Sinks.one();
|
||||||
|
|
||||||
SinkRWStream<Message<TdResultList>> updates = MonoUtils.unicastBackpressureStream(10000);
|
SinkRWStream<Message<TdResultList>> updates = MonoUtils.unicastBackpressureSinkStreak();
|
||||||
// This will only result in a successful completion, never completes in other ways
|
// This will only result in a successful completion, never completes in other ways
|
||||||
private final Empty<Void> updatesStreamEnd = Sinks.one();
|
private final Empty<Void> updatesStreamEnd = Sinks.one();
|
||||||
// This will only result in a crash, never completes in other ways
|
// This will only result in a crash, never completes in other ways
|
||||||
@ -131,7 +132,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
|
|
||||||
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
|
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
|
||||||
private Mono<Void> setupUpdatesListener() {
|
private Mono<Void> setupUpdatesListener() {
|
||||||
MessageConsumer<TdResultList> updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate());
|
MessageConsumer<TdResultList> updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(
|
||||||
|
botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate());
|
||||||
updateConsumer.endHandler(h -> {
|
updateConsumer.endHandler(h -> {
|
||||||
logger.error("<<<<<<<<<<<<<<<<EndHandler?>>>>>>>>>>>>>");
|
logger.error("<<<<<<<<<<<<<<<<EndHandler?>>>>>>>>>>>>>");
|
||||||
});
|
});
|
||||||
@ -152,14 +154,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
return Mono
|
return Mono
|
||||||
.fromRunnable(() -> logger.trace("Called receive() from parent"))
|
.fromRunnable(() -> logger.trace("Called receive() from parent"))
|
||||||
.doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
|
.doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
|
||||||
.then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono))
|
.then()
|
||||||
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
|
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
|
||||||
.doOnSuccess(s -> logger.trace("About to read updates flux"))
|
.doOnSuccess(s -> logger.trace("About to read updates flux"))
|
||||||
.thenMany(updates.readAsFlux())
|
.thenMany(updates.readAsFlux())
|
||||||
.cast(io.vertx.core.eventbus.Message.class)
|
.cast(io.vertx.core.eventbus.Message.class)
|
||||||
.timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> {
|
.timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> {
|
||||||
throw new IllegalStateException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)");
|
var ex = new ConnectException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)");
|
||||||
|
ex.setStackTrace(new StackTraceElement[0]);
|
||||||
|
throw ex;
|
||||||
}))
|
}))
|
||||||
|
.doOnSubscribe(s -> cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout))
|
||||||
.flatMap(updates -> {
|
.flatMap(updates -> {
|
||||||
var result = (TdResultList) updates.body();
|
var result = (TdResultList) updates.body();
|
||||||
if (result.succeeded()) {
|
if (result.succeeded()) {
|
||||||
|
@ -2,9 +2,9 @@ package it.tdlight.tdlibsession.td.middle.direct;
|
|||||||
|
|
||||||
import static it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer.WAIT_DURATION;
|
import static it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer.WAIT_DURATION;
|
||||||
|
|
||||||
import io.vertx.core.AbstractVerticle;
|
import io.reactivex.Completable;
|
||||||
import io.vertx.core.Promise;
|
|
||||||
import io.vertx.core.json.JsonObject;
|
import io.vertx.core.json.JsonObject;
|
||||||
|
import io.vertx.reactivex.core.AbstractVerticle;
|
||||||
import it.tdlight.jni.TdApi;
|
import it.tdlight.jni.TdApi;
|
||||||
import it.tdlight.jni.TdApi.Function;
|
import it.tdlight.jni.TdApi.Function;
|
||||||
import it.tdlight.jni.TdApi.Object;
|
import it.tdlight.jni.TdApi.Object;
|
||||||
@ -22,6 +22,7 @@ import reactor.core.publisher.Flux;
|
|||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.Sinks;
|
import reactor.core.publisher.Sinks;
|
||||||
import reactor.core.publisher.Sinks.Empty;
|
import reactor.core.publisher.Sinks.Empty;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMiddle {
|
public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMiddle {
|
||||||
|
|
||||||
@ -55,7 +56,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start(Promise<Void> startPromise) {
|
public Completable rxStart() {
|
||||||
var botAddress = config().getString("botAddress");
|
var botAddress = config().getString("botAddress");
|
||||||
if (botAddress == null || botAddress.isEmpty()) {
|
if (botAddress == null || botAddress.isEmpty()) {
|
||||||
throw new IllegalArgumentException("botAddress is not set!");
|
throw new IllegalArgumentException("botAddress is not set!");
|
||||||
@ -73,13 +74,13 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
|
|||||||
|
|
||||||
this.td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias);
|
this.td = new AsyncTdDirectImpl(clientFactory, implementationDetails, botAlias);
|
||||||
|
|
||||||
startPromise.complete();
|
return Completable.complete();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(Promise<Void> stopPromise) {
|
public Completable rxStop() {
|
||||||
closeRequest.tryEmitEmpty();
|
closeRequest.tryEmitEmpty();
|
||||||
stopPromise.complete();
|
return Completable.complete();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -87,8 +88,10 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
|
|||||||
return td
|
return td
|
||||||
.receive(new AsyncTdDirectOptions(WAIT_DURATION, 100))
|
.receive(new AsyncTdDirectOptions(WAIT_DURATION, 100))
|
||||||
.takeUntilOther(closeRequest.asMono())
|
.takeUntilOther(closeRequest.asMono())
|
||||||
|
.doOnNext(s -> logger.trace("Received update from tdlib: {}", s))
|
||||||
.doOnError(ex -> logger.info("TdMiddle verticle error", ex))
|
.doOnError(ex -> logger.info("TdMiddle verticle error", ex))
|
||||||
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"));
|
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"))
|
||||||
|
.publishOn(Schedulers.single());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -2,6 +2,8 @@ package it.tdlight.tdlibsession.td.middle.server;
|
|||||||
|
|
||||||
import io.reactivex.Completable;
|
import io.reactivex.Completable;
|
||||||
import io.vertx.core.eventbus.DeliveryOptions;
|
import io.vertx.core.eventbus.DeliveryOptions;
|
||||||
|
import io.vertx.core.eventbus.ReplyException;
|
||||||
|
import io.vertx.core.eventbus.ReplyFailure;
|
||||||
import io.vertx.reactivex.core.AbstractVerticle;
|
import io.vertx.reactivex.core.AbstractVerticle;
|
||||||
import io.vertx.reactivex.core.eventbus.Message;
|
import io.vertx.reactivex.core.eventbus.Message;
|
||||||
import io.vertx.reactivex.core.eventbus.MessageConsumer;
|
import io.vertx.reactivex.core.eventbus.MessageConsumer;
|
||||||
@ -24,6 +26,7 @@ import it.tdlight.tdlibsession.td.middle.TdResultList;
|
|||||||
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
|
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
|
||||||
import it.tdlight.utils.BinlogUtils;
|
import it.tdlight.utils.BinlogUtils;
|
||||||
import it.tdlight.utils.MonoUtils;
|
import it.tdlight.utils.MonoUtils;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -377,11 +380,26 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
|||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
.onErrorResume(ex -> {
|
.onErrorResume(ex -> {
|
||||||
logger.warn("Undeploying after a fatal error in a served flux", ex);
|
boolean printDefaultException = true;
|
||||||
|
if (ex instanceof ReplyException) {
|
||||||
|
ReplyException replyException = (ReplyException) ex;
|
||||||
|
if (replyException.failureCode() == -1 && replyException.failureType() == ReplyFailure.NO_HANDLERS) {
|
||||||
|
logger.warn("Undeploying, the flux has been terminated because no more handlers are available on the event bus. {}", replyException.getMessage());
|
||||||
|
printDefaultException = false;
|
||||||
|
}
|
||||||
|
} else if (ex instanceof ConnectException || ex instanceof java.nio.channels.ClosedChannelException) {
|
||||||
|
logger.warn("Undeploying, the flux has been terminated because the consumer disconnected from the event bus. {}", ex.getMessage());
|
||||||
|
printDefaultException = false;
|
||||||
|
}
|
||||||
|
if (printDefaultException) {
|
||||||
|
logger.warn("Undeploying after a fatal error in a served flux", ex);
|
||||||
|
}
|
||||||
return td.execute(new TdApi.Close(), false)
|
return td.execute(new TdApi.Close(), false)
|
||||||
.doOnError(ex2 -> logger.error("Unexpected error", ex2))
|
.doOnError(ex2 -> logger.error("Unexpected error", ex2))
|
||||||
.then();
|
.doOnSuccess(s -> logger.debug("Emergency Close() signal has been sent successfully"))
|
||||||
|
.then(rxStop().as(MonoUtils::toMono));
|
||||||
});
|
});
|
||||||
|
|
||||||
return MonoUtils.emitValue(this.pipeFlux, pipeFlux)
|
return MonoUtils.emitValue(this.pipeFlux, pipeFlux)
|
||||||
.doOnSuccess(s -> logger.trace("Prepared piping requests successfully"));
|
.doOnSuccess(s -> logger.trace("Prepared piping requests successfully"));
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user