Unregister subscription from clusters during shutdown

This commit is contained in:
Andrea Cavalli 2021-03-31 04:34:53 +02:00
parent a0b49f8d15
commit 24f83b5190
6 changed files with 184 additions and 164 deletions

View File

@ -46,7 +46,7 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient {
*/ */
@Override @Override
public Mono<TdApi.Object> send(TdApi.Function query) { public Mono<TdApi.Object> send(TdApi.Function query) {
return Mono.from(reactiveTelegramClient.send(query)); return Mono.from(reactiveTelegramClient.send(query)).single();
} }
/** /**

View File

@ -11,6 +11,7 @@ import it.tdlight.tdlibsession.td.ReactorTelegramClient;
import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
import java.time.Duration;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -40,48 +41,60 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
@Override @Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean synchronous) { public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean synchronous) {
if (synchronous) { if (synchronous) {
return td.asMono().single().flatMap(td -> MonoUtils.fromBlockingSingle(() -> { return Mono
if (td != null) { .firstWithSignal(td.asMono(), Mono.empty())
return TdResult.of(td.execute(request)); .single()
} else { .timeout(Duration.ofSeconds(5))
if (request.getConstructor() == Close.CONSTRUCTOR) { .flatMap(td -> MonoUtils.fromBlockingSingle(() -> {
return TdResult.of(new Ok()); logger.trace("Sending execute to TDLib {}", request);
} TdResult<T> result = TdResult.of(td.execute(request));
throw new IllegalStateException("TDLib client is destroyed"); logger.trace("Received execute response from TDLib. Request was {}", request);
} return result;
})); }))
.single();
} else { } else {
return td.asMono().single().flatMap(td -> Mono.<TdResult<T>>create(sink -> { return Mono
if (td != null) { .firstWithSignal(td.asMono(), Mono.empty())
Mono .single()
.from(td.send(request)) .timeout(Duration.ofSeconds(5))
.subscribeOn(Schedulers.single()) .<TdResult<T>>flatMap(td -> {
.subscribe(v -> sink.success(TdResult.of(v)), sink::error); if (td != null) {
} else { return Mono
if (request.getConstructor() == Close.CONSTRUCTOR) { .fromRunnable(() -> logger.trace("Sending request to TDLib {}", request))
logger.trace("Sending close success to sink " + sink.toString()); .then(td.send(request))
sink.success(TdResult.of(new Ok())); .single()
} else { .<TdResult<T>>map(TdResult::of)
logger.trace("Sending close error to sink " + sink.toString()); .doOnSuccess(s -> logger.trace("Sent request to TDLib {}", request));
sink.error(new IllegalStateException("TDLib client is destroyed")); } else {
} return Mono.fromCallable(() -> {
} if (request.getConstructor() == Close.CONSTRUCTOR) {
})).single(); logger.trace("Sending close success to request {}", request);
return TdResult.of(new Ok());
} else {
logger.trace("Sending close error to request {} ", request);
throw new IllegalStateException("TDLib client is destroyed");
}
});
}
})
.single();
} }
} }
@Override @Override
public Mono<Void> initialize() { public Mono<Void> initialize() {
return telegramClientFactory return Mono
.create(implementationDetails) .fromRunnable(() -> logger.trace("Initializing"))
.then(telegramClientFactory.create(implementationDetails))
.flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient)) .flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient))
.doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1)))
.flatMap(client -> { .flatMap(client -> {
if (td.tryEmitValue(client).isFailure()) { if (td.tryEmitValue(client).isFailure()) {
return Mono.error(new TdError(500, "Failed to emit td client")); return Mono.error(new TdError(500, "Failed to emit td client"));
} }
return Mono.just(client); return Mono.just(client);
}) })
.doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1)))
.doOnSuccess(s -> logger.trace("Initialized"))
.then(); .then();
} }
@ -89,8 +102,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) { public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) {
// If closed it will be either true or false // If closed it will be either true or false
final One<Boolean> closedFromTd = Sinks.one(); final One<Boolean> closedFromTd = Sinks.one();
return td return Mono
.asMono() .firstWithSignal(td.asMono(), Mono.empty())
.single()
.timeout(Duration.ofSeconds(5))
.flatMapMany(ReactorTelegramClient::receive) .flatMapMany(ReactorTelegramClient::receive)
.doOnNext(update -> { .doOnNext(update -> {
// Close the emitter if receive closed state // Close the emitter if receive closed state

View File

@ -1,11 +1,8 @@
package it.tdlight.tdlibsession.td.middle; package it.tdlight.tdlibsession.td.middle;
import com.hazelcast.config.Config; import com.hazelcast.config.Config;
import com.hazelcast.config.EvictionConfig;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.MapConfig; import com.hazelcast.config.MapConfig;
import com.hazelcast.config.MaxSizePolicy; import com.hazelcast.config.MultiMapConfig;
import com.hazelcast.config.MergePolicyConfig;
import com.hazelcast.config.cp.SemaphoreConfig; import com.hazelcast.config.cp.SemaphoreConfig;
import io.vertx.core.DeploymentOptions; import io.vertx.core.DeploymentOptions;
import io.vertx.core.Handler; import io.vertx.core.Handler;
@ -120,17 +117,13 @@ public class TdClusterManager {
cfg.getNetworkConfig().setPort(port); cfg.getNetworkConfig().setPort(port);
cfg.getNetworkConfig().setPortAutoIncrement(false); cfg.getNetworkConfig().setPortAutoIncrement(false);
cfg.getPartitionGroupConfig().setEnabled(false); cfg.getPartitionGroupConfig().setEnabled(false);
cfg.addMapConfig(new MapConfig() cfg.addMapConfig(new MapConfig().setName("__vertx.haInfo").setBackupCount(1));
.setName("__vertx.subs") cfg.addMapConfig(new MapConfig().setName("__vertx.nodeInfo").setBackupCount(1));
.setBackupCount(1) cfg
.setTimeToLiveSeconds(0) .getCPSubsystemConfig()
.setMaxIdleSeconds(0) .setCPMemberCount(0)
.setEvictionConfig(new EvictionConfig() .setSemaphoreConfigs(Map.of("__vertx.*", new SemaphoreConfig().setInitialPermits(1).setJDKCompatible(false)));
.setMaxSizePolicy(MaxSizePolicy.PER_NODE) cfg.addMultiMapConfig(new MultiMapConfig().setName("__vertx.subs").setBackupCount(1).setValueCollectionType("SET"));
.setEvictionPolicy(EvictionPolicy.NONE)
.setSize(0))
.setMergePolicyConfig(new MergePolicyConfig().setPolicy("com.hazelcast.map.merge.LatestUpdateMapMergePolicy")));
cfg.getCPSubsystemConfig().setSemaphoreConfigs(Map.of("__vertx.*", new SemaphoreConfig().setInitialPermits(1)));
cfg.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); cfg.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
cfg.getNetworkConfig().getJoin().getAwsConfig().setEnabled(false); cfg.getNetworkConfig().getJoin().getAwsConfig().setEnabled(false);
cfg.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true); cfg.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true);

View File

@ -8,6 +8,7 @@ import io.vertx.reactivex.core.eventbus.MessageConsumer;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdError;
@ -196,11 +197,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private Mono<Void> setupUpdatesListener() { private Mono<Void> setupUpdatesListener() {
return Mono return Mono
.fromRunnable(() -> logger.trace("Setting up updates listener...")) .fromRunnable(() -> logger.trace("Setting up updates listener..."))
.then(MonoUtils.<MessageConsumer<TdResultList>>fromBlockingSingle(() -> { .then(MonoUtils.<MessageConsumer<TdResultList>>fromBlockingSingle(() -> MessageConsumer
return MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(botAddress + ".updates") .newInstance(cluster.getEventBus().<TdResultList>consumer(botAddress + ".updates")
.setMaxBufferedMessages(5000) .setMaxBufferedMessages(5000)
.getDelegate()); .getDelegate()
})) ))
)
.flatMap(updateConsumer -> { .flatMap(updateConsumer -> {
// Return when the registration of all the consumers has been done across the cluster // Return when the registration of all the consumers has been done across the cluster
return Mono return Mono
@ -224,66 +226,68 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.then(updates.asMono()) .then(updates.asMono())
.publishOn(Schedulers.parallel()) .publishOn(Schedulers.parallel())
.timeout(Duration.ofSeconds(30)) .timeout(Duration.ofSeconds(30))
.flatMap(MonoUtils::fromMessageConsumer) .flatMapMany(updatesMessageConsumer -> MonoUtils
.flatMapMany(registration -> Mono .fromMessageConsumer(updatesMessageConsumer)
.fromRunnable(() -> logger.trace("Registering updates flux")) .flatMapMany(registration -> Mono
.then(registration.getT1()) .fromRunnable(() -> logger.trace("Registering updates flux"))
.doOnSuccess(s -> logger.trace("Registered updates flux")) .then(registration.getT1())
.doOnSuccess(s -> logger.trace("Sending ready-to-receive")) .doOnSuccess(s -> logger.trace("Registered updates flux"))
.then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive", .doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
EMPTY, .then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive",
deliveryOptionsWithTimeout EMPTY,
).as(MonoUtils::toMono)) deliveryOptionsWithTimeout
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) ).as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("About to read updates flux")) .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.thenMany(registration.getT2()) .doOnSuccess(s -> logger.trace("About to read updates flux"))
) .thenMany(registration.getT2())
.takeUntilOther(Flux
.merge(
crash.asMono()
.onErrorResume(ex -> {
logger.error("TDLib crashed", ex);
return Mono.empty();
}),
pingFail.asMono()
.then(Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to ping");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}).onErrorResume(ex -> MonoUtils.emitError(crash, ex)))
.takeUntilOther(Mono
.firstWithSignal(crash.asMono(), authStateClosing.asMono())
.onErrorResume(e -> Mono.empty())
)
) )
.doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) .takeUntilOther(Flux
) .merge(
.takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { crash.asMono()
if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { .onErrorResume(ex -> {
return ((UpdateAuthorizationState) item).authorizationState.getConstructor() logger.error("TDLib crashed", ex);
== AuthorizationStateClosed.CONSTRUCTOR; return Mono.empty();
} }),
return false; pingFail.asMono()
})) .then(Mono.fromCallable(() -> {
.flatMapSequential(updates -> { var ex = new ConnectException("Server did not respond to ping");
if (updates.succeeded()) { ex.setStackTrace(new StackTraceElement[0]);
return Flux.fromIterable(updates.value()); throw ex;
} else { }).onErrorResume(ex -> MonoUtils.emitError(crash, ex)))
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); .takeUntilOther(Mono
} .firstWithSignal(crash.asMono(), authStateClosing.asMono())
}) .onErrorResume(e -> Mono.empty())
.flatMapSequential(this::interceptUpdate) )
// Redirect errors to crash sink )
.doOnError(error -> crash.tryEmitError(error)) .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end"))
.onErrorResume(ex -> { )
logger.trace("Absorbing the error, the error has been published using the crash sink", ex); .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> {
return Mono.empty(); if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
}) return ((UpdateAuthorizationState) item).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR;
}
return false;
}))
.flatMapSequential(updates -> {
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());
} else {
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
}
})
.flatMapSequential(update -> interceptUpdate(updatesMessageConsumer, update))
// Redirect errors to crash sink
.doOnError(error -> crash.tryEmitError(error))
.onErrorResume(ex -> {
logger.trace("Absorbing the error, the error has been published using the crash sink", ex);
return Mono.empty();
})
.doOnTerminate(updatesStreamEnd::tryEmitEmpty); .doOnTerminate(updatesStreamEnd::tryEmitEmpty)
);
} }
private Mono<TdApi.Object> interceptUpdate(TdApi.Object update) { private Mono<TdApi.Object> interceptUpdate(MessageConsumer<TdResultList> updatesMessageConsumer, Object update) {
logger.trace("Received update {}", update.getClass().getSimpleName()); logger.trace("Received update {}", update.getClass().getSimpleName());
switch (update.getConstructor()) { switch (update.getConstructor()) {
case TdApi.UpdateAuthorizationState.CONSTRUCTOR: case TdApi.UpdateAuthorizationState.CONSTRUCTOR:
@ -291,8 +295,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
switch (updateAuthorizationState.authorizationState.getConstructor()) { switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateClosing.CONSTRUCTOR: case TdApi.AuthorizationStateClosing.CONSTRUCTOR:
authStateClosing.tryEmitEmpty(); authStateClosing.tryEmitEmpty();
break;
case TdApi.AuthorizationStateClosed.CONSTRUCTOR: case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib"))
.then(updatesMessageConsumer.rxUnregister().as(MonoUtils::toMono))
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
.flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel()))
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
@ -310,21 +316,25 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
var req = new ExecuteObject(executeDirectly, request); var req = new ExecuteObject(executeDirectly, request);
return Mono return Mono
.firstWithSignal( .firstWithSignal(
MonoUtils.castVoid(crash.asMono()), MonoUtils
.castVoid(crash
.asMono()
.doOnSuccess(s -> logger
.debug("Failed request {} because the TDLib session was already crashed", request))
),
Mono Mono
.fromRunnable(() -> logger.trace("Executing request {}", request)) .fromRunnable(() -> logger.trace("Executing request {}", request))
.then(cluster.getEventBus().<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono)) .then(cluster.getEventBus().<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono))
.onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex))
.<TdResult<T>>flatMap(resp -> Mono .<TdResult<T>>handle((resp, sink) -> {
.<TdResult<T>>fromCallable(() -> { if (resp.body() == null) {
if (resp.body() == null) { sink.error(ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty")));
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty")); } else {
} else { sink.next(resp.body().toTdResult());
return resp.body().toTdResult(); }
} })
}).subscribeOn(Schedulers.parallel()) .doOnSuccess(s -> logger.trace("Executed request {}", request))
) .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex))
.doOnSuccess(s -> logger.trace("Executed request"))
) )
.switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> {
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty"));

View File

@ -113,9 +113,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
private Mono<Void> onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { private Mono<Void> onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
return this return td
.listen(td, botAddress, botAlias, botId, local) .initialize()
.then(this.pipe(td, botAddress, botAlias, botId, local)) .then(this.pipe(td, botAddress, botAlias, botId, local))
.then(this.listen(td, botAddress, botAlias, botId, local))
.doOnSuccess(s -> { .doOnSuccess(s -> {
logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded"); logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded");
}) })
@ -138,39 +139,39 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
executeConsumer.handler(sink::next); executeConsumer.handler(sink::next);
executeConsumer.endHandler(h -> sink.complete()); executeConsumer.endHandler(h -> sink.complete());
}) })
.flatMapSequential(msg -> Mono .flatMap(msg -> {
.fromCallable(() -> Tuples.of(msg, msg.body())) var body = msg.body();
.subscribeOn(Schedulers.parallel())
)
.flatMapSequential(tuple -> {
var msg = tuple.getT1();
var body = tuple.getT2();
logger.trace("Received execute request {}", body.getRequest().getClass().getSimpleName());
var request = overrideRequest(body.getRequest(), botId); var request = overrideRequest(body.getRequest(), botId);
if (logger.isTraceEnabled()) {
logger.trace("Received execute request {}", request);
}
return td return td
.execute(request, body.isExecuteDirectly()) .execute(request, body.isExecuteDirectly())
.map(result -> Tuples.of(msg, result)) .single()
.doOnSuccess(s -> logger.trace("Executed successfully")); .timeout(Duration.ofSeconds(60 + 30))
.doOnSuccess(s -> logger.trace("Executed successfully. Request was {}", request))
.onErrorResume(ex -> Mono.fromRunnable(() -> {
msg.fail(500, ex.getLocalizedMessage());
}))
.flatMap(response -> Mono.fromCallable(() -> {
var replyOpts = new DeliveryOptions().setLocalOnly(local);
var replyValue = new TdResultMessage(response.result(), response.cause());
try {
logger.trace("Replying with success response. Request was {}", request);
msg.reply(replyValue, replyOpts);
return response;
} catch (Exception ex) {
logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(), request);
msg.fail(500, ex.getLocalizedMessage());
throw ex;
}
}).subscribeOn(Schedulers.boundedElastic()));
}) })
.flatMapSequential(tuple -> Mono.fromCallable(() -> {
var msg = tuple.getT1();
var response = tuple.getT2();
var replyOpts = new DeliveryOptions().setLocalOnly(local);
var replyValue = new TdResultMessage(response.result(), response.cause());
try {
logger.trace("Replying with success response");
msg.reply(replyValue, replyOpts);
return response;
} catch (Exception ex) {
logger.debug("Replying with error response: {}", ex.getLocalizedMessage());
msg.fail(500, ex.getLocalizedMessage());
throw ex;
}
}).subscribeOn(Schedulers.boundedElastic()))
.then() .then()
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(v -> {}, .subscribe(v -> {},
ex -> logger.error("Error when processing an execute request", ex), ex -> logger.error("Fatal error when processing an execute request."
+ " Can't process further requests since the subscription has been broken", ex),
() -> logger.trace("Finished handling execute requests") () -> logger.trace("Finished handling execute requests")
); );
@ -235,7 +236,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
pingConsumer.handler(sink::next); pingConsumer.handler(sink::next);
pingConsumer.endHandler(h -> sink.complete()); pingConsumer.endHandler(h -> sink.complete());
}) })
.flatMapSequential(msg -> Mono.fromCallable(() -> { .concatMap(msg -> Mono.fromCallable(() -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
msg.reply(EMPTY, opts); msg.reply(EMPTY, opts);
return null; return null;
@ -286,7 +287,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.then(executeConsumer .then(executeConsumer
.asMono() .asMono()
.timeout(Duration.ofSeconds(5), Mono.empty()) .timeout(Duration.ofSeconds(5), Mono.empty())
.flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("Unregistered execute consumer"))
)
.then(readBinlogConsumer .then(readBinlogConsumer
.asMono() .asMono()
.timeout(Duration.ofSeconds(10), Mono.empty()) .timeout(Duration.ofSeconds(10), Mono.empty())
@ -316,24 +319,20 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { private Mono<Void> pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
logger.trace("Preparing to pipe requests"); logger.trace("Preparing to pipe requests");
Flux<TdResultList> updatesFlux = td Flux<TdResultList> updatesFlux = td.receive(tdOptions)
.initialize()
.thenMany(td.receive(tdOptions))
.takeUntil(item -> { .takeUntil(item -> {
if (item instanceof Update) { if (item instanceof Update) {
var tdUpdate = (Update) item; var tdUpdate = (Update) item;
if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var updateAuthorizationState = (UpdateAuthorizationState) tdUpdate; var updateAuthorizationState = (UpdateAuthorizationState) tdUpdate;
if (updateAuthorizationState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { return updateAuthorizationState.authorizationState.getConstructor()
return true; == AuthorizationStateClosed.CONSTRUCTOR;
}
} }
} else if (item instanceof Error) { } else
return true; return item instanceof Error;
}
return false; return false;
}) })
.flatMapSequential(update -> MonoUtils.fromBlockingSingle(() -> { .flatMap(update -> Mono.fromCallable(() -> {
if (update.getConstructor() == TdApi.Error.CONSTRUCTOR) { if (update.getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = (Error) update; var error = (Error) update;
throw new TdError(error.code, error.message); throw new TdError(error.code, error.message);
@ -359,14 +358,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.sender(botAddress + ".updates", opts); .sender(botAddress + ".updates", opts);
var pipeFlux = updatesFlux var pipeFlux = updatesFlux
.flatMapSequential(updatesList -> updatesSender .concatMap(updatesList -> updatesSender
.rxWrite(updatesList) .rxWrite(updatesList)
.as(MonoUtils::toMono) .as(MonoUtils::toMono)
.thenReturn(updatesList) .thenReturn(updatesList)
) )
.flatMapSequential(updatesList -> Flux .concatMap(updatesList -> Flux
.fromIterable(updatesList.value()) .fromIterable(updatesList.value())
.flatMapSequential(item -> { .concatMap(item -> {
if (item instanceof Update) { if (item instanceof Update) {
var tdUpdate = (Update) item; var tdUpdate = (Update) item;
if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {

View File

@ -403,9 +403,12 @@ public class MonoUtils {
messageConsumer.<Message<T>>handler(messages::tryEmitNext); messageConsumer.<Message<T>>handler(messages::tryEmitNext);
Flux<Message<T>> dataFlux = Flux Flux<Message<T>> dataFlux = Flux
.<Message<T>>concatDelayError( .concatDelayError(
messages.asFlux(), messages.asFlux(),
messageConsumer.rxUnregister().as(MonoUtils::toMono) messageConsumer
.rxUnregister()
.as(MonoUtils::<Message<T>>toMono)
.doOnSuccess(s -> logger.trace("Unregistered message consumer"))
) )
.doOnSubscribe(s -> registrationRequested.tryEmitEmpty()); .doOnSubscribe(s -> registrationRequested.tryEmitEmpty());