From 24f83b51907871f4cef9c341a56074cf21b16e10 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 31 Mar 2021 04:34:53 +0200 Subject: [PATCH] Unregister subscription from clusters during shutdown --- .../td/WrappedReactorTelegramClient.java | 2 +- .../td/direct/AsyncTdDirectImpl.java | 77 +++++---- .../td/middle/TdClusterManager.java | 23 +-- .../client/AsyncTdMiddleEventBusClient.java | 154 ++++++++++-------- .../server/AsyncTdMiddleEventBusServer.java | 85 +++++----- src/main/java/it/tdlight/utils/MonoUtils.java | 7 +- 6 files changed, 184 insertions(+), 164 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java index 417fb35..7170f67 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -46,7 +46,7 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { */ @Override public Mono send(TdApi.Function query) { - return Mono.from(reactiveTelegramClient.send(query)); + return Mono.from(reactiveTelegramClient.send(query)).single(); } /** diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 7ce67dc..d4eba7a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -11,6 +11,7 @@ import it.tdlight.tdlibsession.td.ReactorTelegramClient; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.utils.MonoUtils; +import java.time.Duration; import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.LoggerFactory; import reactor.core.publisher.Flux; @@ -40,48 +41,60 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { @Override public Mono> execute(Function request, boolean synchronous) { if (synchronous) { - return td.asMono().single().flatMap(td -> MonoUtils.fromBlockingSingle(() -> { - if (td != null) { - return TdResult.of(td.execute(request)); - } else { - if (request.getConstructor() == Close.CONSTRUCTOR) { - return TdResult.of(new Ok()); - } - throw new IllegalStateException("TDLib client is destroyed"); - } - })); + return Mono + .firstWithSignal(td.asMono(), Mono.empty()) + .single() + .timeout(Duration.ofSeconds(5)) + .flatMap(td -> MonoUtils.fromBlockingSingle(() -> { + logger.trace("Sending execute to TDLib {}", request); + TdResult result = TdResult.of(td.execute(request)); + logger.trace("Received execute response from TDLib. Request was {}", request); + return result; + })) + .single(); } else { - return td.asMono().single().flatMap(td -> Mono.>create(sink -> { - if (td != null) { - Mono - .from(td.send(request)) - .subscribeOn(Schedulers.single()) - .subscribe(v -> sink.success(TdResult.of(v)), sink::error); - } else { - if (request.getConstructor() == Close.CONSTRUCTOR) { - logger.trace("Sending close success to sink " + sink.toString()); - sink.success(TdResult.of(new Ok())); - } else { - logger.trace("Sending close error to sink " + sink.toString()); - sink.error(new IllegalStateException("TDLib client is destroyed")); - } - } - })).single(); + return Mono + .firstWithSignal(td.asMono(), Mono.empty()) + .single() + .timeout(Duration.ofSeconds(5)) + .>flatMap(td -> { + if (td != null) { + return Mono + .fromRunnable(() -> logger.trace("Sending request to TDLib {}", request)) + .then(td.send(request)) + .single() + .>map(TdResult::of) + .doOnSuccess(s -> logger.trace("Sent request to TDLib {}", request)); + } else { + return Mono.fromCallable(() -> { + if (request.getConstructor() == Close.CONSTRUCTOR) { + logger.trace("Sending close success to request {}", request); + return TdResult.of(new Ok()); + } else { + logger.trace("Sending close error to request {} ", request); + throw new IllegalStateException("TDLib client is destroyed"); + } + }); + } + }) + .single(); } } @Override public Mono initialize() { - return telegramClientFactory - .create(implementationDetails) + return Mono + .fromRunnable(() -> logger.trace("Initializing")) + .then(telegramClientFactory.create(implementationDetails)) .flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient)) - .doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1))) .flatMap(client -> { if (td.tryEmitValue(client).isFailure()) { return Mono.error(new TdError(500, "Failed to emit td client")); } return Mono.just(client); }) + .doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1))) + .doOnSuccess(s -> logger.trace("Initialized")) .then(); } @@ -89,8 +102,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { public Flux receive(AsyncTdDirectOptions options) { // If closed it will be either true or false final One closedFromTd = Sinks.one(); - return td - .asMono() + return Mono + .firstWithSignal(td.asMono(), Mono.empty()) + .single() + .timeout(Duration.ofSeconds(5)) .flatMapMany(ReactorTelegramClient::receive) .doOnNext(update -> { // Close the emitter if receive closed state diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index 7342a11..b611171 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -1,11 +1,8 @@ package it.tdlight.tdlibsession.td.middle; import com.hazelcast.config.Config; -import com.hazelcast.config.EvictionConfig; -import com.hazelcast.config.EvictionPolicy; import com.hazelcast.config.MapConfig; -import com.hazelcast.config.MaxSizePolicy; -import com.hazelcast.config.MergePolicyConfig; +import com.hazelcast.config.MultiMapConfig; import com.hazelcast.config.cp.SemaphoreConfig; import io.vertx.core.DeploymentOptions; import io.vertx.core.Handler; @@ -120,17 +117,13 @@ public class TdClusterManager { cfg.getNetworkConfig().setPort(port); cfg.getNetworkConfig().setPortAutoIncrement(false); cfg.getPartitionGroupConfig().setEnabled(false); - cfg.addMapConfig(new MapConfig() - .setName("__vertx.subs") - .setBackupCount(1) - .setTimeToLiveSeconds(0) - .setMaxIdleSeconds(0) - .setEvictionConfig(new EvictionConfig() - .setMaxSizePolicy(MaxSizePolicy.PER_NODE) - .setEvictionPolicy(EvictionPolicy.NONE) - .setSize(0)) - .setMergePolicyConfig(new MergePolicyConfig().setPolicy("com.hazelcast.map.merge.LatestUpdateMapMergePolicy"))); - cfg.getCPSubsystemConfig().setSemaphoreConfigs(Map.of("__vertx.*", new SemaphoreConfig().setInitialPermits(1))); + cfg.addMapConfig(new MapConfig().setName("__vertx.haInfo").setBackupCount(1)); + cfg.addMapConfig(new MapConfig().setName("__vertx.nodeInfo").setBackupCount(1)); + cfg + .getCPSubsystemConfig() + .setCPMemberCount(0) + .setSemaphoreConfigs(Map.of("__vertx.*", new SemaphoreConfig().setInitialPermits(1).setJDKCompatible(false))); + cfg.addMultiMapConfig(new MultiMapConfig().setName("__vertx.subs").setBackupCount(1).setValueCollectionType("SET")); cfg.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false); cfg.getNetworkConfig().getJoin().getAwsConfig().setEnabled(false); cfg.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 9b3131e..1c0e745 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -8,6 +8,7 @@ import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.Object; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdError; @@ -196,11 +197,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private Mono setupUpdatesListener() { return Mono .fromRunnable(() -> logger.trace("Setting up updates listener...")) - .then(MonoUtils.>fromBlockingSingle(() -> { - return MessageConsumer.newInstance(cluster.getEventBus().consumer(botAddress + ".updates") - .setMaxBufferedMessages(5000) - .getDelegate()); - })) + .then(MonoUtils.>fromBlockingSingle(() -> MessageConsumer + .newInstance(cluster.getEventBus().consumer(botAddress + ".updates") + .setMaxBufferedMessages(5000) + .getDelegate() + )) + ) .flatMap(updateConsumer -> { // Return when the registration of all the consumers has been done across the cluster return Mono @@ -224,66 +226,68 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .then(updates.asMono()) .publishOn(Schedulers.parallel()) .timeout(Duration.ofSeconds(30)) - .flatMap(MonoUtils::fromMessageConsumer) - .flatMapMany(registration -> Mono - .fromRunnable(() -> logger.trace("Registering updates flux")) - .then(registration.getT1()) - .doOnSuccess(s -> logger.trace("Registered updates flux")) - .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) - .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", - EMPTY, - deliveryOptionsWithTimeout - ).as(MonoUtils::toMono)) - .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) - .doOnSuccess(s -> logger.trace("About to read updates flux")) - .thenMany(registration.getT2()) - ) - .takeUntilOther(Flux - .merge( - crash.asMono() - .onErrorResume(ex -> { - logger.error("TDLib crashed", ex); - return Mono.empty(); - }), - pingFail.asMono() - .then(Mono.fromCallable(() -> { - var ex = new ConnectException("Server did not respond to ping"); - ex.setStackTrace(new StackTraceElement[0]); - throw ex; - }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) - .takeUntilOther(Mono - .firstWithSignal(crash.asMono(), authStateClosing.asMono()) - .onErrorResume(e -> Mono.empty()) - ) + .flatMapMany(updatesMessageConsumer -> MonoUtils + .fromMessageConsumer(updatesMessageConsumer) + .flatMapMany(registration -> Mono + .fromRunnable(() -> logger.trace("Registering updates flux")) + .then(registration.getT1()) + .doOnSuccess(s -> logger.trace("Registered updates flux")) + .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) + .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", + EMPTY, + deliveryOptionsWithTimeout + ).as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) + .doOnSuccess(s -> logger.trace("About to read updates flux")) + .thenMany(registration.getT2()) ) - .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) - ) - .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { - if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - return ((UpdateAuthorizationState) item).authorizationState.getConstructor() - == AuthorizationStateClosed.CONSTRUCTOR; - } - return false; - })) - .flatMapSequential(updates -> { - if (updates.succeeded()) { - return Flux.fromIterable(updates.value()); - } else { - return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); - } - }) - .flatMapSequential(this::interceptUpdate) - // Redirect errors to crash sink - .doOnError(error -> crash.tryEmitError(error)) - .onErrorResume(ex -> { - logger.trace("Absorbing the error, the error has been published using the crash sink", ex); - return Mono.empty(); - }) + .takeUntilOther(Flux + .merge( + crash.asMono() + .onErrorResume(ex -> { + logger.error("TDLib crashed", ex); + return Mono.empty(); + }), + pingFail.asMono() + .then(Mono.fromCallable(() -> { + var ex = new ConnectException("Server did not respond to ping"); + ex.setStackTrace(new StackTraceElement[0]); + throw ex; + }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) + .takeUntilOther(Mono + .firstWithSignal(crash.asMono(), authStateClosing.asMono()) + .onErrorResume(e -> Mono.empty()) + ) + ) + .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) + ) + .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { + if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + return ((UpdateAuthorizationState) item).authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR; + } + return false; + })) + .flatMapSequential(updates -> { + if (updates.succeeded()) { + return Flux.fromIterable(updates.value()); + } else { + return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow()); + } + }) + .flatMapSequential(update -> interceptUpdate(updatesMessageConsumer, update)) + // Redirect errors to crash sink + .doOnError(error -> crash.tryEmitError(error)) + .onErrorResume(ex -> { + logger.trace("Absorbing the error, the error has been published using the crash sink", ex); + return Mono.empty(); + }) - .doOnTerminate(updatesStreamEnd::tryEmitEmpty); + .doOnTerminate(updatesStreamEnd::tryEmitEmpty) + ); } - private Mono interceptUpdate(TdApi.Object update) { + private Mono interceptUpdate(MessageConsumer updatesMessageConsumer, Object update) { logger.trace("Received update {}", update.getClass().getSimpleName()); switch (update.getConstructor()) { case TdApi.UpdateAuthorizationState.CONSTRUCTOR: @@ -291,8 +295,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { switch (updateAuthorizationState.authorizationState.getConstructor()) { case TdApi.AuthorizationStateClosing.CONSTRUCTOR: authStateClosing.tryEmitEmpty(); + break; case TdApi.AuthorizationStateClosed.CONSTRUCTOR: return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) + .then(updatesMessageConsumer.rxUnregister().as(MonoUtils::toMono)) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) @@ -310,21 +316,25 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { var req = new ExecuteObject(executeDirectly, request); return Mono .firstWithSignal( - MonoUtils.castVoid(crash.asMono()), + MonoUtils + .castVoid(crash + .asMono() + .doOnSuccess(s -> logger + .debug("Failed request {} because the TDLib session was already crashed", request)) + ), Mono .fromRunnable(() -> logger.trace("Executing request {}", request)) .then(cluster.getEventBus().rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono)) .onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex)) - .>flatMap(resp -> Mono - .>fromCallable(() -> { - if (resp.body() == null) { - throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty")); - } else { - return resp.body().toTdResult(); - } - }).subscribeOn(Schedulers.parallel()) - ) - .doOnSuccess(s -> logger.trace("Executed request")) + .>handle((resp, sink) -> { + if (resp.body() == null) { + sink.error(ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty"))); + } else { + sink.next(resp.body().toTdResult()); + } + }) + .doOnSuccess(s -> logger.trace("Executed request {}", request)) + .doOnError(ex -> logger.debug("Failed request {}: {}", req, ex)) ) .switchIfEmpty(Mono.defer(() -> Mono.fromCallable(() -> { throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Client is closed or response is empty")); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index a5429c2..1b3655c 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -113,9 +113,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } private Mono onSuccessfulStartRequest(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { - return this - .listen(td, botAddress, botAlias, botId, local) + return td + .initialize() .then(this.pipe(td, botAddress, botAlias, botId, local)) + .then(this.listen(td, botAddress, botAlias, botId, local)) .doOnSuccess(s -> { logger.info("Deploy and start of bot \"" + botAlias + "\": ✅ Succeeded"); }) @@ -138,39 +139,39 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { executeConsumer.handler(sink::next); executeConsumer.endHandler(h -> sink.complete()); }) - .flatMapSequential(msg -> Mono - .fromCallable(() -> Tuples.of(msg, msg.body())) - .subscribeOn(Schedulers.parallel()) - ) - .flatMapSequential(tuple -> { - var msg = tuple.getT1(); - var body = tuple.getT2(); - logger.trace("Received execute request {}", body.getRequest().getClass().getSimpleName()); + .flatMap(msg -> { + var body = msg.body(); var request = overrideRequest(body.getRequest(), botId); + if (logger.isTraceEnabled()) { + logger.trace("Received execute request {}", request); + } return td .execute(request, body.isExecuteDirectly()) - .map(result -> Tuples.of(msg, result)) - .doOnSuccess(s -> logger.trace("Executed successfully")); + .single() + .timeout(Duration.ofSeconds(60 + 30)) + .doOnSuccess(s -> logger.trace("Executed successfully. Request was {}", request)) + .onErrorResume(ex -> Mono.fromRunnable(() -> { + msg.fail(500, ex.getLocalizedMessage()); + })) + .flatMap(response -> Mono.fromCallable(() -> { + var replyOpts = new DeliveryOptions().setLocalOnly(local); + var replyValue = new TdResultMessage(response.result(), response.cause()); + try { + logger.trace("Replying with success response. Request was {}", request); + msg.reply(replyValue, replyOpts); + return response; + } catch (Exception ex) { + logger.debug("Replying with error response: {}. Request was {}", ex.getLocalizedMessage(), request); + msg.fail(500, ex.getLocalizedMessage()); + throw ex; + } + }).subscribeOn(Schedulers.boundedElastic())); }) - .flatMapSequential(tuple -> Mono.fromCallable(() -> { - var msg = tuple.getT1(); - var response = tuple.getT2(); - var replyOpts = new DeliveryOptions().setLocalOnly(local); - var replyValue = new TdResultMessage(response.result(), response.cause()); - try { - logger.trace("Replying with success response"); - msg.reply(replyValue, replyOpts); - return response; - } catch (Exception ex) { - logger.debug("Replying with error response: {}", ex.getLocalizedMessage()); - msg.fail(500, ex.getLocalizedMessage()); - throw ex; - } - }).subscribeOn(Schedulers.boundedElastic())) .then() .subscribeOn(Schedulers.parallel()) .subscribe(v -> {}, - ex -> logger.error("Error when processing an execute request", ex), + ex -> logger.error("Fatal error when processing an execute request." + + " Can't process further requests since the subscription has been broken", ex), () -> logger.trace("Finished handling execute requests") ); @@ -235,7 +236,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { pingConsumer.handler(sink::next); pingConsumer.endHandler(h -> sink.complete()); }) - .flatMapSequential(msg -> Mono.fromCallable(() -> { + .concatMap(msg -> Mono.fromCallable(() -> { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); msg.reply(EMPTY, opts); return null; @@ -286,7 +287,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .then(executeConsumer .asMono() .timeout(Duration.ofSeconds(5), Mono.empty()) - .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono))) + .flatMap(ec -> ec.rxUnregister().as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Unregistered execute consumer")) + ) .then(readBinlogConsumer .asMono() .timeout(Duration.ofSeconds(10), Mono.empty()) @@ -316,24 +319,20 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private Mono pipe(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) { logger.trace("Preparing to pipe requests"); - Flux updatesFlux = td - .initialize() - .thenMany(td.receive(tdOptions)) + Flux updatesFlux = td.receive(tdOptions) .takeUntil(item -> { if (item instanceof Update) { var tdUpdate = (Update) item; if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { var updateAuthorizationState = (UpdateAuthorizationState) tdUpdate; - if (updateAuthorizationState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - return true; - } + return updateAuthorizationState.authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR; } - } else if (item instanceof Error) { - return true; - } + } else + return item instanceof Error; return false; }) - .flatMapSequential(update -> MonoUtils.fromBlockingSingle(() -> { + .flatMap(update -> Mono.fromCallable(() -> { if (update.getConstructor() == TdApi.Error.CONSTRUCTOR) { var error = (Error) update; throw new TdError(error.code, error.message); @@ -359,14 +358,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .sender(botAddress + ".updates", opts); var pipeFlux = updatesFlux - .flatMapSequential(updatesList -> updatesSender + .concatMap(updatesList -> updatesSender .rxWrite(updatesList) .as(MonoUtils::toMono) .thenReturn(updatesList) ) - .flatMapSequential(updatesList -> Flux + .concatMap(updatesList -> Flux .fromIterable(updatesList.value()) - .flatMapSequential(item -> { + .concatMap(item -> { if (item instanceof Update) { var tdUpdate = (Update) item; if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index d149d17..a14fdde 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -403,9 +403,12 @@ public class MonoUtils { messageConsumer.>handler(messages::tryEmitNext); Flux> dataFlux = Flux - .>concatDelayError( + .concatDelayError( messages.asFlux(), - messageConsumer.rxUnregister().as(MonoUtils::toMono) + messageConsumer + .rxUnregister() + .as(MonoUtils::>toMono) + .doOnSuccess(s -> logger.trace("Unregistered message consumer")) ) .doOnSubscribe(s -> registrationRequested.tryEmitEmpty());