From d3f813d8cb4a1613c99a6887a2101ab8263621fd Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 13 Jan 2021 17:22:14 +0100 Subject: [PATCH] Update EventBusFlux.java, SignalMessage.java, and 11 more files... --- .../it/tdlight/tdlibsession/EventBusFlux.java | 164 ++++++++++++ .../tdlight/tdlibsession/SignalMessage.java | 82 ++++++ .../tdlibsession/SignalMessageCodec.java | 99 +++++++ .../it/tdlight/tdlibsession/SignalType.java | 5 + .../remoteclient/TDLibRemoteClient.java | 73 +++--- .../tdlibsession/td/easy/AsyncTdEasy.java | 82 +++--- .../td/middle/TdClusterManager.java | 18 +- ...{TdOptionalList.java => TdResultList.java} | 24 +- ...dec.java => TdResultListMessageCodec.java} | 52 ++-- .../client/AsyncTdMiddleEventBusClient.java | 167 ++++-------- .../server/AsyncTdMiddleEventBusServer.java | 246 ++++++++---------- 11 files changed, 620 insertions(+), 392 deletions(-) create mode 100644 src/main/java/it/tdlight/tdlibsession/EventBusFlux.java create mode 100644 src/main/java/it/tdlight/tdlibsession/SignalMessage.java create mode 100644 src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java create mode 100644 src/main/java/it/tdlight/tdlibsession/SignalType.java rename src/main/java/it/tdlight/tdlibsession/td/middle/{TdOptionalList.java => TdResultList.java} (53%) rename src/main/java/it/tdlight/tdlibsession/td/middle/{TdOptListMessageCodec.java => TdResultListMessageCodec.java} (50%) diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java new file mode 100644 index 0000000..a26a63c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -0,0 +1,164 @@ +package it.tdlight.tdlibsession; + +import io.vertx.core.eventbus.DeliveryOptions; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.MessageCodec; +import io.vertx.core.eventbus.MessageConsumer; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class EventBusFlux { + private static final Logger logger = LoggerFactory.getLogger(EventBusFlux.class); + + private static final byte[] EMPTY = new byte[0]; + + public static void registerFluxCodec(EventBus eventBus, MessageCodec itemsCodec) { + var signalsCodec = new SignalMessageCodec(itemsCodec); + try { + eventBus.registerCodec(signalsCodec); + } catch (IllegalStateException ex) { + if (!ex.getMessage().contains("Already a codec registered with name")) { + throw ex; + } + } + } + + public static Mono serve(Flux flux, + EventBus eventBus, + String fluxAddress, + DeliveryOptions baseDeliveryOptions, + MessageCodec itemsCodec, + Duration connectionTimeout) { + var signalsCodec = new SignalMessageCodec(itemsCodec); + var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) + .setSendTimeout(connectionTimeout.toMillis()); + var signalDeliveryOptions = new DeliveryOptions(deliveryOptions) + .setCodecName(signalsCodec.name()); + AtomicInteger subscriptionsCount = new AtomicInteger(); + return Mono.create(sink -> { + MessageConsumer subscribe = eventBus.consumer(fluxAddress + ".subscribe"); + + subscribe.handler(msg -> { + if (subscriptionsCount.incrementAndGet() > 1) { + subscriptionsCount.decrementAndGet(); + logger.error("Another client tried to connect to the same flux. Rejecting the request."); + msg.fail(500, "This flux is already in use!"); + return; + } + long subscriptionId = 0; + var subscriptionAddress = fluxAddress + "." + subscriptionId; + + MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); + MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); + + var subscription = flux.subscribe(item -> { + eventBus.send(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions); + }, error -> { + eventBus.send(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions); + }, () -> { + eventBus.send(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions); + }); + + cancel.handler(msg3 -> { + if (!subscription.isDisposed()) { + subscription.dispose(); + } + msg3.reply(EMPTY, deliveryOptions); + }); + dispose.handler(msg2 -> { + if (!subscription.isDisposed()) { + subscription.dispose(); + } + cancel.unregister(v -> { + if (v.failed()) { + logger.error("Failed to unregister cancel", v.cause()); + } + dispose.unregister(v2 -> { + if (v.failed()) { + logger.error("Failed to unregister dispose", v2.cause()); + } + subscribe.unregister(v3 -> { + if (v2.failed()) { + logger.error("Failed to unregister subscribe", v3.cause()); + } + msg2.reply(EMPTY); + }); + }); + }); + }); + + cancel.completionHandler(h -> { + if (h.succeeded()) { + dispose.completionHandler(h2 -> { + if (h2.succeeded()) { + sink.success(); + } else { + sink.error(h.cause()); + } + }); + } else { + sink.error(h.cause()); + } + }); + + msg.reply((Long) subscriptionId); + }); + + subscribe.completionHandler(h -> { + if (h.failed()) { + sink.error(h.cause()); + } + }); + }); + } + + public static Flux connect(EventBus eventBus, + String fluxAddress, + DeliveryOptions baseDeliveryOptions, + MessageCodec itemsCodec, + Duration connectionTimeout) { + return Flux.create(emitter -> { + var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) + .setSendTimeout(connectionTimeout.toMillis()); + eventBus.request(fluxAddress + ".subscribe", EMPTY, deliveryOptions, msg -> { + if (msg.succeeded()) { + long subscriptionId = msg.result().body(); + var subscriptionAddress = fluxAddress + "." + subscriptionId; + + var signalConsumer = eventBus.>consumer(subscriptionAddress + ".signal"); + signalConsumer.handler(msg2 -> { + var signal = msg2.body(); + switch (signal.getSignalType()) { + case ITEM: + emitter.next(signal.getItem()); + break; + case ERROR: + emitter.error(new Exception(signal.getErrorMessage())); + break; + case COMPLETE: + emitter.complete(); + break; + } + msg2.reply(EMPTY); + }); + signalConsumer.completionHandler(h -> { + if (h.failed()) { + emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause())); + } + }); + + emitter.onDispose(() -> eventBus.send(subscriptionAddress + ".dispose", EMPTY, deliveryOptions)); + + emitter.onCancel(() -> eventBus.send(subscriptionAddress + ".cancel", EMPTY, deliveryOptions)); + } else { + emitter.error(new IllegalStateException("Subscription failed", msg.cause())); + } + }); + }); + } + +} diff --git a/src/main/java/it/tdlight/tdlibsession/SignalMessage.java b/src/main/java/it/tdlight/tdlibsession/SignalMessage.java new file mode 100644 index 0000000..78d31b9 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/SignalMessage.java @@ -0,0 +1,82 @@ +package it.tdlight.tdlibsession; + +import java.util.Objects; +import java.util.StringJoiner; + +class SignalMessage { + + private final SignalType signalType; + private final T item; + private final String errorMessage; + + private SignalMessage(SignalType signalType, T item, String errorMessage) { + this.signalType = signalType; + this.item = item; + this.errorMessage = errorMessage; + } + + public static SignalMessage onNext(T item) { + return new SignalMessage<>(SignalType.ITEM, Objects.requireNonNull(item), null); + } + + public static SignalMessage onError(Throwable throwable) { + return new SignalMessage(SignalType.ERROR, null, Objects.requireNonNull(throwable.getMessage())); + } + + static SignalMessage onDecodedError(String throwable) { + return new SignalMessage(SignalType.ERROR, null, Objects.requireNonNull(throwable)); + } + + public static SignalMessage onComplete() { + return new SignalMessage(SignalType.COMPLETE, null, null); + } + + public SignalType getSignalType() { + return signalType; + } + + public String getErrorMessage() { + return Objects.requireNonNull(errorMessage); + } + + public T getItem() { + return Objects.requireNonNull(item); + } + + @Override + public String toString() { + return new StringJoiner(", ", SignalMessage.class.getSimpleName() + "[", "]") + .add("signalType=" + signalType) + .add("item=" + item) + .add("errorMessage='" + errorMessage + "'") + .toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SignalMessage that = (SignalMessage) o; + + if (signalType != that.signalType) { + return false; + } + if (!Objects.equals(item, that.item)) { + return false; + } + return Objects.equals(errorMessage, that.errorMessage); + } + + @Override + public int hashCode() { + int result = signalType != null ? signalType.hashCode() : 0; + result = 31 * result + (item != null ? item.hashCode() : 0); + result = 31 * result + (errorMessage != null ? errorMessage.hashCode() : 0); + return result; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java new file mode 100644 index 0000000..2feba1c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/SignalMessageCodec.java @@ -0,0 +1,99 @@ +package it.tdlight.tdlibsession; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; +import it.unimi.dsi.fastutil.io.FastByteArrayInputStream; +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +public class SignalMessageCodec implements MessageCodec, SignalMessage> { + + private final String codecName; + private final MessageCodec typeCodec; + + public SignalMessageCodec(MessageCodec typeCodec) { + super(); + this.codecName = "SignalCodec-" + typeCodec.name(); + this.typeCodec = typeCodec; + } + + @Override + public void encodeToWire(Buffer buffer, SignalMessage t) { + try (var bos = new FastByteArrayOutputStream()) { + try (var dos = new DataOutputStream(bos)) { + switch (t.getSignalType()) { + case ITEM: + dos.writeByte(0x01); + break; + case ERROR: + dos.writeByte(0x02); + break; + case COMPLETE: + dos.writeByte(0x03); + break; + default: + throw new UnsupportedOperationException(); + } + } + bos.trim(); + buffer.appendBytes(bos.array); + switch (t.getSignalType()) { + case ITEM: + typeCodec.encodeToWire(buffer, t.getItem()); + break; + case ERROR: + var stringBytes = t.getErrorMessage().getBytes(StandardCharsets.UTF_8); + buffer.appendInt(stringBytes.length); + buffer.appendBytes(stringBytes); + break; + } + } catch (IOException ex) { + ex.printStackTrace(); + } + } + + @Override + public SignalMessage decodeFromWire(int pos, Buffer buffer) { + try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) { + try (var dis = new DataInputStream(fis)) { + switch (dis.readByte()) { + case 0x01: + return SignalMessage.onNext(typeCodec.decodeFromWire(pos + 1, buffer)); + case 0x02: + var size = buffer.getInt(pos + 1); + return SignalMessage.onDecodedError(new String(buffer.getBytes(pos + 2, pos + 2 + size), + StandardCharsets.UTF_8 + )); + case 0x03: + return SignalMessage.onComplete(); + default: + throw new UnsupportedOperationException(); + } + } + } catch (IOException ex) { + ex.printStackTrace(); + } + return null; + } + + @Override + public SignalMessage transform(SignalMessage t) { + // If a message is sent *locally* across the event bus. + // This sends message just as is + return t; + } + + @Override + public String name() { + return codecName; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/SignalType.java b/src/main/java/it/tdlight/tdlibsession/SignalType.java new file mode 100644 index 0000000..3492115 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/SignalType.java @@ -0,0 +1,5 @@ +package it.tdlight.tdlibsession; + +enum SignalType { + COMPLETE, ERROR, ITEM +} diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index e9026c7..87acbe6 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -7,7 +7,6 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.core.net.JksOptions; import io.vertx.core.shareddata.AsyncMap; -import io.vertx.core.shareddata.Lock; import it.tdlight.common.Init; import it.tdlight.common.utils.CantLoadLibrary; import it.tdlight.tdlibsession.td.middle.TdClusterManager; @@ -19,13 +18,14 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class TDLibRemoteClient implements AutoCloseable { @@ -37,6 +37,7 @@ public class TDLibRemoteClient implements AutoCloseable { private final int port; private final Set membersAddresses; private final Many clusterManager = Sinks.many().replay().latest(); + private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false); public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set membersAddresses) { this.securityInfo = securityInfo; @@ -120,12 +121,11 @@ public class TDLibRemoteClient implements AutoCloseable { .doOnError(clusterManager::tryEmitError) .flatMapMany(clusterManager -> { return Flux.create(sink -> { - var sharedData = clusterManager.getSharedData(); - sharedData.getClusterWideMap("deployableBotAddresses", mapResult -> { + clusterManager.getSharedData().getClusterWideMap("deployableBotAddresses", mapResult -> { if (mapResult.succeeded()) { var deployableBotAddresses = mapResult.result(); - sharedData.getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { + clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { if (lockAcquisitionResult.succeeded()) { var deploymentLock = lockAcquisitionResult.result(); putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult putAllResult) -> { @@ -229,41 +229,41 @@ public class TDLibRemoteClient implements AutoCloseable { private void deployBot(TdClusterManager clusterManager, String botAddress, Handler> deploymentHandler) { AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager); - AtomicReference deploymentLock = new AtomicReference<>(); verticle.onBeforeStop(handler -> { - clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { - if (lockAcquisitionResult.succeeded()) { - deploymentLock.set(lockAcquisitionResult.result()); - var sharedData = clusterManager.getSharedData(); - sharedData.getClusterWideMap("runningBotAddresses", (AsyncResult> mapResult) -> { - if (mapResult.succeeded()) { - var runningBotAddresses = mapResult.result(); - runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> { - if (putResult.succeeded()) { - if (putResult.result() != null) { - handler.complete(); - } else { - handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed"); - } - } else { - handler.fail(putResult.cause()); - } + vertxStatusScheduler.schedule(() -> { + clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { + if (lockAcquisitionResult.succeeded()) { + var deploymentLock = lockAcquisitionResult.result(); + verticle.onAfterStop(handler2 -> { + vertxStatusScheduler.schedule(() -> { + deploymentLock.release(); + handler2.complete(); }); - } else { - handler.fail(mapResult.cause()); - } - }); - } else { - handler.fail(lockAcquisitionResult.cause()); - } + }); + clusterManager.getSharedData().getClusterWideMap("runningBotAddresses", (AsyncResult> mapResult) -> { + if (mapResult.succeeded()) { + var runningBotAddresses = mapResult.result(); + runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> { + if (putResult.succeeded()) { + if (putResult.result() != null) { + handler.complete(); + } else { + handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed"); + } + } else { + handler.fail(putResult.cause()); + } + }); + } else { + handler.fail(mapResult.cause()); + } + }); + } else { + handler.fail(lockAcquisitionResult.cause()); + } + }); }); }); - verticle.onAfterStop(handler -> { - if (deploymentLock.get() != null) { - deploymentLock.get().release(); - } - handler.complete(); - }); clusterManager .getVertx() .deployVerticle(verticle, @@ -307,5 +307,6 @@ public class TDLibRemoteClient implements AutoCloseable { @Override public void close() { clusterManager.asFlux().blockFirst(); + vertxStatusScheduler.dispose(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index ad1d941..e276d30 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -51,12 +51,14 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.One; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; public class AsyncTdEasy { private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class); + private final Scheduler scheduler = Schedulers.newSingle("TdEasyUpdates"); private final ReplayProcessor authState = ReplayProcessor.create(1); private final ReplayProcessor requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false); private final ReplayProcessor settings = ReplayProcessor.cacheLast(); @@ -70,13 +72,9 @@ public class AsyncTdEasy { this.td = td; this.logName = logName; - var sch = Schedulers.newSingle("TdEasyUpdates"); - // todo: use Duration.ZERO instead of 10ms interval this.incomingUpdatesCo = td.receive() .filterWhen(update -> Mono.from(requestedDefinitiveExit).map(requestedDefinitiveExit -> !requestedDefinitiveExit)) - .subscribeOn(sch) - .publishOn(sch) .flatMap(this::preprocessUpdates) .flatMap(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update))) .filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR) @@ -88,6 +86,7 @@ public class AsyncTdEasy { }).doOnComplete(() -> { authState.onNext(new AuthorizationStateClosed()); }) + .subscribeOn(scheduler) .publish().refCount(1); } @@ -104,11 +103,12 @@ public class AsyncTdEasy { } // Register fatal error handler - fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribe(); + fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribeOn(scheduler).subscribe(); return true; }) .subscribeOn(Schedulers.boundedElastic()) + .publishOn(scheduler) .flatMap(_v -> { this.settings.onNext(settings); return Mono.empty(); @@ -119,7 +119,7 @@ public class AsyncTdEasy { * Get TDLib state */ public Flux getState() { - return Flux.from(authState); + return Flux.from(authState).subscribeOn(scheduler); } /** @@ -130,21 +130,21 @@ public class AsyncTdEasy { } private Flux getIncomingUpdates(boolean includePreAuthUpdates) { - return Flux.from(incomingUpdatesCo); + return Flux.from(incomingUpdatesCo).subscribeOn(scheduler); } /** * Get generic error updates from TDLib (When they are not linked to a precise request). */ public Flux getIncomingErrors() { - return Flux.from(globalErrors); + return Flux.from(globalErrors).subscribeOn(scheduler); } /** * Receives fatal errors from TDLib. */ public Mono getFatalErrors() { - return fatalError.asMono(); + return Mono.from(fatalError.asMono()).subscribeOn(scheduler); } /** @@ -156,7 +156,7 @@ public class AsyncTdEasy { } private Mono> sendDirectly(TdApi.Function obj, boolean synchronous) { - return td.execute(obj, synchronous); + return td.execute(obj, synchronous).subscribeOn(scheduler); } /** @@ -164,7 +164,7 @@ public class AsyncTdEasy { * @param i level */ public Mono setVerbosityLevel(int i) { - return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)); + return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).subscribeOn(scheduler); } /** @@ -172,7 +172,7 @@ public class AsyncTdEasy { * @param name option name */ public Mono clearOption(String name) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).subscribeOn(scheduler); } /** @@ -181,7 +181,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionString(String name, String value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).subscribeOn(scheduler); } /** @@ -190,7 +190,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionInteger(String name, long value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).subscribeOn(scheduler); } /** @@ -199,7 +199,7 @@ public class AsyncTdEasy { * @param value option value */ public Mono setOptionBoolean(String name, boolean value) { - return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)); + return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).subscribeOn(scheduler); } /** @@ -218,7 +218,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }); + }).subscribeOn(scheduler); } /** @@ -237,7 +237,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }); + }).subscribeOn(scheduler); } /** @@ -256,7 +256,7 @@ public class AsyncTdEasy { return Mono.error(new UnsupportedOperationException("The option " + name + " is of type " + value.getClass().getSimpleName())); } - }); + }).subscribeOn(scheduler); } /** @@ -267,7 +267,7 @@ public class AsyncTdEasy { * @return The request response. */ public Mono> execute(TdApi.Function request) { - return td.execute(request, true); + return td.execute(request, true).subscribeOn(scheduler); } /** @@ -305,7 +305,8 @@ public class AsyncTdEasy { .doOnNext(ok -> { logger.info("Received AuthorizationStateClosed after TdApi.Close"); }) - .then(); + .then() + .subscribeOn(scheduler); } /** @@ -325,25 +326,27 @@ public class AsyncTdEasy { } private Mono catchErrors(Object obj) { - if (obj.getConstructor() == Error.CONSTRUCTOR) { - var error = (Error) obj; + return Mono.fromCallable(() -> { + if (obj.getConstructor() == Error.CONSTRUCTOR) { + var error = (Error) obj; - switch (error.message) { - case "PHONE_CODE_INVALID": - globalErrors.onNext(error); - return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitCode())); - case "PASSWORD_HASH_INVALID": - globalErrors.onNext(error); - return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitPassword())); - default: - globalErrors.onNext(error); - break; + switch (error.message) { + case "PHONE_CODE_INVALID": + globalErrors.onNext(error); + return new UpdateAuthorizationState(new AuthorizationStateWaitCode()); + case "PASSWORD_HASH_INVALID": + globalErrors.onNext(error); + return new UpdateAuthorizationState(new AuthorizationStateWaitPassword()); + default: + globalErrors.onNext(error); + break; + } + analyzeFatalErrors(error); + return null; + } else { + return (Update) obj; } - analyzeFatalErrors(error); - return Mono.empty(); - } else { - return Mono.just((Update) obj); - } + }).subscribeOn(scheduler); } private void analyzeFatalErrors(Object obj) { @@ -370,7 +373,7 @@ public class AsyncTdEasy { } public Mono isBot() { - return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet); + return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet).subscribeOn(scheduler); } private Publisher preprocessUpdates(TdApi.Object updateObj) { @@ -535,7 +538,8 @@ public class AsyncTdEasy { return Mono.empty(); } }) - .then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj)); + .then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj)) + .subscribeOn(scheduler); } public Mono thenOrFatalError(Mono> optionalMono) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index e5068ba..787b2b4 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -40,13 +40,11 @@ public class TdClusterManager { private final ClusterManager mgr; private final VertxOptions vertxOptions; private final Vertx vertx; - private final EventBus eb; - public TdClusterManager(ClusterManager mgr, VertxOptions vertxOptions, Vertx vertx, EventBus eventBus) { + public TdClusterManager(ClusterManager mgr, VertxOptions vertxOptions, Vertx vertx) { this.mgr = mgr; this.vertxOptions = vertxOptions; this.vertx = vertx; - this.eb = eventBus; } public static Mono ofMaster(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set nodesAddresses) { @@ -150,7 +148,7 @@ public class TdClusterManager { sink.success(Vertx.vertx(vertxOptions)); } }) - .map(vertx -> new TdClusterManager(mgr, vertxOptions, vertx, vertx.eventBus())); + .map(vertx -> new TdClusterManager(mgr, vertxOptions, vertx)); } public Vertx getVertx() { @@ -158,7 +156,7 @@ public class TdClusterManager { } public EventBus getEventBus() { - return eb; + return vertx.eventBus(); } public VertxOptions getVertxOptions() { @@ -178,7 +176,7 @@ public class TdClusterManager { */ public boolean registerDefaultCodec(Class objectClass, MessageCodec messageCodec) { try { - eb.registerDefaultCodec(objectClass, messageCodec); + vertx.eventBus().registerDefaultCodec(objectClass, messageCodec); return true; } catch (IllegalStateException ex) { if (ex.getMessage().startsWith("Already a default codec registered for class")) { @@ -204,9 +202,9 @@ public class TdClusterManager { */ public MessageConsumer consumer(String address, boolean localOnly) { if (localOnly) { - return eb.localConsumer(address); + return vertx.eventBus().localConsumer(address); } else { - return eb.consumer(address); + return vertx.eventBus().consumer(address); } } @@ -221,9 +219,9 @@ public class TdClusterManager { */ public MessageConsumer consumer(String address, boolean localOnly, Handler> handler) { if (localOnly) { - return eb.localConsumer(address, handler); + return vertx.eventBus().localConsumer(address, handler); } else { - return eb.consumer(address, handler); + return vertx.eventBus().consumer(address, handler); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java similarity index 53% rename from src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java rename to src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java index 260bf26..00e6689 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptionalList.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java @@ -6,19 +6,13 @@ import java.util.List; import java.util.Objects; import java.util.StringJoiner; -public class TdOptionalList { - private final boolean isSet; +public class TdResultList { private final List> values; - public TdOptionalList(boolean isSet, List> values) { - this.isSet = isSet; + public TdResultList(List> values) { this.values = values; } - public boolean isSet() { - return isSet; - } - public List> getValues() { return values; } @@ -32,26 +26,18 @@ public class TdOptionalList { return false; } - TdOptionalList that = (TdOptionalList) o; + TdResultList that = (TdResultList) o; - if (isSet != that.isSet) { - return false; - } return Objects.equals(values, that.values); } @Override public int hashCode() { - int result = (isSet ? 1 : 0); - result = 31 * result + (values != null ? values.hashCode() : 0); - return result; + return values != null ? values.hashCode() : 0; } @Override public String toString() { - return new StringJoiner(", ", TdOptionalList.class.getSimpleName() + "[", "]") - .add("isSet=" + isSet) - .add("values=" + values) - .toString(); + return new StringJoiner(", ", TdResultList.class.getSimpleName() + "[", "]").add("values=" + values).toString(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java similarity index 50% rename from src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java rename to src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java index 3bf8fc6..8ad11ea 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdOptListMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java @@ -13,30 +13,26 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -public class TdOptListMessageCodec implements MessageCodec { +public class TdResultListMessageCodec implements MessageCodec { - public TdOptListMessageCodec() { + public TdResultListMessageCodec() { super(); } @Override - public void encodeToWire(Buffer buffer, TdOptionalList ts) { + public void encodeToWire(Buffer buffer, TdResultList ts) { try (var bos = new FastByteArrayOutputStream()) { try (var dos = new DataOutputStream(bos)) { - if (ts.isSet()) { - var t = ts.getValues(); - dos.writeInt(t.size()); - for (TdResult t1 : t) { - if (t1.succeeded()) { - dos.writeBoolean(true); - t1.result().serialize(dos); - } else { - dos.writeBoolean(false); - t1.cause().serialize(dos); - } + var t = ts.getValues(); + dos.writeInt(t.size()); + for (TdResult t1 : t) { + if (t1.succeeded()) { + dos.writeBoolean(true); + t1.result().serialize(dos); + } else { + dos.writeBoolean(false); + t1.cause().serialize(dos); } - } else { - dos.writeInt(-1); } } bos.trim(); @@ -47,32 +43,28 @@ public class TdOptListMessageCodec implements MessageCodec> list = new ArrayList<>(); - for (int i = 0; i < size; i++) { - if (dis.readBoolean()) { - list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis))); - } else { - list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis))); - } + ArrayList> list = new ArrayList<>(); + for (int i = 0; i < size; i++) { + if (dis.readBoolean()) { + list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis))); + } else { + list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis))); } - return new TdOptionalList(true, list); } + return new TdResultList(list); } } catch (IOException | UnsupportedOperationException ex) { ex.printStackTrace(); - return new TdOptionalList(false, Collections.emptyList()); + return new TdResultList(Collections.emptyList()); } } @Override - public TdOptionalList transform(TdOptionalList ts) { + public TdResultList transform(TdResultList ts) { return ts; } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index fdb0719..76cc6cb 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -15,6 +15,7 @@ import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Error; import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.tdlibsession.EventBusFlux; import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.TdResultMessage; @@ -23,22 +24,21 @@ import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec; import it.tdlight.tdlibsession.td.middle.TdMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdOptListMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdOptionalList; +import it.tdlight.tdlibsession.td.middle.TdResultList; +import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.utils.MonoUtils; import java.time.Duration; -import java.util.List; import java.util.Objects; import java.util.StringJoiner; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.warp.commonutils.error.InitializationException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.ReplayProcessor; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle { @@ -47,12 +47,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy public static final boolean OUTPUT_REQUESTS = false; public static final byte[] EMPTY = new byte[0]; - private final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); + private final Many tdClosed = Sinks.many().replay().latestOrDefault(false); private final DeliveryOptions deliveryOptions; private final DeliveryOptions deliveryOptionsWithTimeout; - private ReplayProcessor> incomingUpdatesCo = ReplayProcessor.cacheLast(); - private TdClusterManager cluster; private String botAddress; @@ -64,7 +62,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) { cluster = clusterManager; - if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) { + if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) { cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { @@ -146,7 +144,9 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .getEventBus() .request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> { if (msg.succeeded()) { - this.listen().then(this.pipe()).timeout(Duration.ofSeconds(30)).subscribe(v -> {}, future::fail, future::complete); + this.listen() + .timeout(Duration.ofSeconds(30)) + .subscribe(v -> {}, future::fail, future::complete); } else { future.fail(msg.cause()); } @@ -173,7 +173,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy @Override public void stop(Promise stopPromise) { readyToStartConsumer.unregister(result -> { - tdClosed.onNext(true); + tdClosed.tryEmitNext(true); stopPromise.complete(); }); } @@ -183,55 +183,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy return Mono.empty(); } - private Mono pipe() { - var updates = this.requestUpdatesBatchFromNetwork() - .repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0)) // Repeat when there is one batch with a flux of updates - .takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed - .flatMap(batch -> batch) - .onErrorResume(error -> { - logger.error("Bot updates request failed! Marking as closed.", error); - if (error.getMessage().contains("Timed out")) { - return Flux.just(new Error(444, "CONNECTION_KILLED")); - } else { - return Flux.just(new Error(406, "INVALID_UPDATE")); - } - }) - .flatMap(update -> { - return Mono.create(sink -> { - if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - var state = (UpdateAuthorizationState) update; - if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - - // Send tdClosed early to avoid errors - tdClosed.onNext(true); - - this.getVertx().undeploy(this.deploymentID(), undeployed -> { - if (undeployed.failed()) { - logger.error("Error when undeploying td verticle", undeployed.cause()); - } - sink.success(update); - }); - } else { - sink.success(update); - } - } else { - sink.success(update); - } - }); - }) - .log("TdMiddle", Level.FINEST) - .takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed - .publish() - .autoConnect(1); - - updates.subscribe(t -> incomingUpdatesCo.onNext(Flux.just(t)), - incomingUpdatesCo::onError, - incomingUpdatesCo::onComplete - ); - - return Mono.empty(); - } - private static class UpdatesBatchResult { public final Flux updatesFlux; public final boolean completed; @@ -250,63 +201,47 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy } } - private Mono> requestUpdatesBatchFromNetwork() { - return Mono - .from(tdClosed.distinct()) - .single() - .filter(tdClosed -> !tdClosed) - .flatMap(_x -> Mono.>create(sink -> { - cluster.getEventBus().request(botAddress + ".getNextUpdatesBlock", - EMPTY, - deliveryOptionsWithTimeout, - msg -> { - if (msg.failed()) { - //if (System.currentTimeMillis() - initTime <= 30000) { - // // The serve has not been started - // sink.success(Flux.empty()); - //} else { - // // Timeout - sink.error(msg.cause()); - //} - } else { - var result = msg.result(); - if (result.body() == null) { - sink.success(); - } else { - var resultBody = msg.result().body(); - if (resultBody.isSet()) { - List> updates = resultBody.getValues(); - for (TdResult updateObj : updates) { - if (updateObj.succeeded()) { - if (OUTPUT_REQUESTS) { - System.out.println(" <- " + updateObj.result() - .toString() - .replace("\n", " ") - .replace("\t", "") - .replace(" ", "") - .replace(" = ", "=")); - } - } else { - logger.error("Received an errored update", - ResponseError.newResponseError("incoming update", botAlias, updateObj.cause()) - ); - } - } - sink.success(Flux.fromIterable(updates).filter(TdResult::succeeded).map(TdResult::result)); - } else { - // the stream has ended - sink.success(); - } - } - } - } - ); - })); - } - @Override public Flux receive() { - return incomingUpdatesCo.filter(Objects::nonNull).flatMap(v -> v); + var fluxCodec = new TdResultListMessageCodec(); + EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec); + return Mono.from(tdClosed.asFlux()).single().filter(tdClosed -> !tdClosed).flatMapMany(_closed -> EventBusFlux + .connect(cluster.getEventBus(), + botAddress + ".updates", + deliveryOptions, + fluxCodec, + Duration.ofMillis(deliveryOptionsWithTimeout.getSendTimeout()) + ) + .filter(Objects::nonNull) + .flatMap(block -> Flux.fromIterable(block.getValues())) + .filter(Objects::nonNull) + .onErrorResume(error -> { + logger.error("Bot updates request failed! Marking as closed.", error); + if (error.getMessage().contains("Timed out")) { + return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED"))); + } else { + return Flux.just(TdResult.failed(new Error(406, "INVALID_UPDATE"))); + } + }).flatMap(item -> Mono.fromCallable(item::orElseThrow)) + .filter(Objects::nonNull) + .doOnNext(item -> { + if (OUTPUT_REQUESTS) { + System.out.println(" <- " + item.toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=") + ); + } + if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var state = (UpdateAuthorizationState) item; + if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + + // Send tdClosed early to avoid errors + tdClosed.tryEmitNext(true); + } + } + })); } @Override @@ -321,7 +256,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy .replace(" = ", "=")); } - return Mono.from(tdClosed.distinct()).single().filter(tdClosed -> !tdClosed).>flatMap((_x) -> Mono.create(sink -> { + return Mono.from(tdClosed.asFlux()).single().filter(tdClosed -> !tdClosed).>flatMap((_x) -> Mono.create(sink -> { try { cluster .getEventBus() diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 593c4b5..e46c490 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -11,6 +11,7 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; import it.tdlight.common.ConstructorDetector; import it.tdlight.jni.TdApi; +import it.tdlight.tdlibsession.EventBusFlux; import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; @@ -19,8 +20,8 @@ import it.tdlight.tdlibsession.td.middle.ExecuteObject; import it.tdlight.tdlibsession.td.middle.TdClusterManager; import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec; import it.tdlight.tdlibsession.td.middle.TdMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdOptListMessageCodec; -import it.tdlight.tdlibsession.td.middle.TdOptionalList; +import it.tdlight.tdlibsession.td.middle.TdResultList; +import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.utils.MonoUtils; import java.time.Duration; @@ -32,11 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Empty; -import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -59,6 +56,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { protected AsyncTdDirectImpl td; private final Scheduler tdSrvPoll; + private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false); /** * Value is not important, emits when a request is received */ @@ -66,7 +64,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private final List>> onAfterStopListeners = new CopyOnWriteArrayList<>(); private MessageConsumer startConsumer; private MessageConsumer isWorkingConsumer; - private MessageConsumer getNextUpdatesBlockConsumer; private MessageConsumer executeConsumer; @SuppressWarnings({"unchecked", "rawtypes"}) @@ -74,7 +71,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { this.cluster = clusterManager; this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000); this.tdSrvPoll = Schedulers.newSingle("TdSrvPoll"); - if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) { + if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) { cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { @@ -85,55 +82,57 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @Override public void start(Promise startPromise) { - var botAddress = config().getString("botAddress"); - if (botAddress == null || botAddress.isEmpty()) { - throw new IllegalArgumentException("botAddress is not set!"); - } - this.botAddress = botAddress; - var botAlias = config().getString("botAlias"); - if (botAlias == null || botAlias.isEmpty()) { - throw new IllegalArgumentException("botAlias is not set!"); - } - this.botAlias = botAlias; - var local = config().getBoolean("local"); - if (local == null) { - throw new IllegalArgumentException("local is not set!"); - } - this.local = local; - this.td = new AsyncTdDirectImpl(botAlias); + vertxStatusScheduler.schedule(() -> { + var botAddress = config().getString("botAddress"); + if (botAddress == null || botAddress.isEmpty()) { + throw new IllegalArgumentException("botAddress is not set!"); + } + this.botAddress = botAddress; + var botAlias = config().getString("botAlias"); + if (botAlias == null || botAlias.isEmpty()) { + throw new IllegalArgumentException("botAlias is not set!"); + } + this.botAlias = botAlias; + var local = config().getBoolean("local"); + if (local == null) { + throw new IllegalArgumentException("local is not set!"); + } + this.local = local; + this.td = new AsyncTdDirectImpl(botAlias); - AtomicBoolean alreadyDeployed = new AtomicBoolean(false); - this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { - if (alreadyDeployed.compareAndSet(false, true)) { - this.listen().then(this.pipe()).then(Mono.create(registrationSink -> { - this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { - workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); - }); - this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); - })).subscribeOn(this.tdSrvPoll) - .subscribe(v -> {}, ex -> { - logger.info(botAddress + " server deployed and started. succeeded: false"); - logger.error(ex.getLocalizedMessage(), ex); - msg.fail(500, ex.getLocalizedMessage()); - }, () -> { - logger.info(botAddress + " server deployed and started. succeeded: true"); + AtomicBoolean alreadyDeployed = new AtomicBoolean(false); + this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message msg) -> { + if (alreadyDeployed.compareAndSet(false, true)) { + this.listen().then(this.pipe()).then(Mono.create(registrationSink -> { + this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message workingMsg) -> { + workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local)); + }); + this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink)); + })).subscribeOn(this.tdSrvPoll) + .subscribe(v -> {}, ex -> { + logger.info(botAddress + " server deployed and started. succeeded: false"); + logger.error(ex.getLocalizedMessage(), ex); + msg.fail(500, ex.getLocalizedMessage()); + }, () -> { + logger.info(botAddress + " server deployed and started. succeeded: true"); + msg.reply(EMPTY); + }); + } else { msg.reply(EMPTY); - }); - } else { - msg.reply(EMPTY); - } + } + }); + startConsumer.completionHandler(h -> { + logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); + if (h.succeeded()) { + logger.debug("Sending " + botAddress + ".readyToStart"); + cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> { + startPromise.complete(h.result()); + }); + } else { + startPromise.fail(h.cause()); + } + }); }); - startConsumer.completionHandler(h -> { - logger.info(botAddress + " server deployed. succeeded: " + h.succeeded()); - if (h.succeeded()) { - startPromise.complete(h.result()); - } else { - startPromise.fail(h.cause()); - } - }); - - logger.debug("Sending " + botAddress + ".readyToStart"); - cluster.getEventBus().send(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000)); } public void onBeforeStop(Consumer> r) { @@ -146,23 +145,19 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @Override public void stop(Promise stopPromise) { - runAll(onBeforeStopListeners, onBeforeStopHandler -> { - if (onBeforeStopHandler.failed()) { - logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause()); - } - - Mono.create(sink -> this.isWorkingConsumer.unregister(result -> { - if (result.failed()) { - logger.error("Can't unregister consumer", result.cause()); + vertxStatusScheduler.schedule(() -> { + runAll(onBeforeStopListeners, onBeforeStopHandler -> { + if (onBeforeStopHandler.failed()) { + logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause()); } - this.startConsumer.unregister(result2 -> { - if (result2.failed()) { - logger.error("Can't unregister consumer", result2.cause()); - } - this.getNextUpdatesBlockConsumer.unregister(result3 -> { - if (result3.failed()) { - logger.error("Can't unregister consumer", result3.cause()); + Mono.create(sink -> this.isWorkingConsumer.unregister(result -> { + if (result.failed()) { + logger.error("Can't unregister consumer", result.cause()); + } + this.startConsumer.unregister(result2 -> { + if (result2.failed()) { + logger.error("Can't unregister consumer", result2.cause()); } this.executeConsumer.unregister(result4 -> { @@ -173,20 +168,19 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { sink.success(); }); }); - }); - })).doFinally(signalType -> { - logger.info("TdMiddle verticle \"" + botAddress + "\" stopped"); + })).doFinally(signalType -> { + logger.info("TdMiddle verticle \"" + botAddress + "\" stopped"); - runAll(onAfterStopListeners, onAfterStopHandler -> { - if (onAfterStopHandler.failed()) { - logger.error("An afterStop listener failed: " + onAfterStopHandler.cause()); - } - - stopPromise.complete(); - }); - }).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> { - logger.error("Error when stopping", ex); - }, () -> {}); + runAll(onAfterStopListeners, onAfterStopHandler -> { + if (onAfterStopHandler.failed()) { + logger.error("An afterStop listener failed: " + onAfterStopHandler.cause()); + } + stopPromise.complete(); + }); + }).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> { + logger.error("Error when stopping", ex); + }, () -> {}); + }); }); } @@ -261,66 +255,34 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } private Mono pipe() { - return Mono.create(registeredSink -> { - Many getNextUpdatesBlockTrigger = Sinks.many().replay().latestOrDefault(true); - Flux> getNextUpdatesBlockFlux = Flux.create(sink -> { - this.getNextUpdatesBlockConsumer = cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message msg) -> { - getNextUpdatesBlockTrigger.tryEmitNext(true); - sink.next(msg); + var updatesFlux = td.receive(tdOptions).doOnNext(update -> { + if (OUTPUT_REQUESTS) { + System.out.println("<=: " + update + .toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + }).bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100)) + .windowTimeout(1, Duration.ofSeconds(5)) + .flatMap(w -> w.defaultIfEmpty(Collections.emptyList())) + .map(TdResultList::new).doOnTerminate(() -> { + System.out.println("<=: end (1)"); + }).doOnComplete(() -> { + System.out.println("<=: end (2)"); + }).doFinally(s -> { + System.out.println("<=: end (3)"); + this.undeploy(() -> {}); }); - registeredSink.success(); - }); - - Empty needClose = Sinks.empty(); - Flux updatesFlux = Flux.mergeSequential(td.receive(tdOptions) - .doOnSubscribe(s -> { - // After 60 seconds of not receiving request for updates, dispose td flux assuming the middle client died. - getNextUpdatesBlockTrigger.asFlux().timeout(Duration.ofSeconds(30), timeout -> { - needClose.tryEmitEmpty(); - s.cancel(); - }).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> { - logger.error("Error when signalling that the next update block request has been received", ex); - }, () -> { - needClose.tryEmitEmpty(); - }); - }) - .doOnNext(update -> { - if (OUTPUT_REQUESTS) { - System.out.println("<=: " + update - .toString() - .replace("\n", " ") - .replace("\t", "") - .replace(" ", "") - .replace(" = ", "=")); - } - }) - .bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100)) - .windowTimeout(1, Duration.ofSeconds(5)) - .flatMap(w -> w.defaultIfEmpty(Collections.emptyList())) - .map(updatesGroup -> new TdOptionalList(true, updatesGroup)), - Mono.fromSupplier(() -> { - return new TdOptionalList(false, Collections.emptyList()); - }) - ); - - Flux.zip(updatesFlux, getNextUpdatesBlockFlux) - .subscribeOn(this.tdSrvPoll) - .subscribe(tuple -> { - var results = tuple.getT1(); - var messageHandle = tuple.getT2(); - - if (!results.isSet()) { - System.out.println("<=: end (1)"); - } - - messageHandle.reply(results, cluster.newDeliveryOpts().setLocalOnly(local)); - }, error -> logger.error("Error when receiving or forwarding updates", error), () -> { - needClose.tryEmitEmpty(); - }); - needClose.asMono().subscribeOn(this.tdSrvPoll).subscribe(v_ -> {}, e -> {}, () -> { - this.undeploy(() -> {}); - }); - }); - + var fluxCodec = new TdResultListMessageCodec(); + EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec); + return EventBusFlux.serve(updatesFlux, + cluster.getEventBus(), + botAddress + ".updates", + cluster.newDeliveryOpts().setLocalOnly(local), + fluxCodec, + Duration.ofSeconds(30) + ).subscribeOn(tdSrvPoll); } }