From 1b2fc7e4c165d63c9aac2a3cdb9d2472fcd5f023 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 26 Jan 2021 16:29:45 +0100 Subject: [PATCH] Bugfixes --- .../remoteclient/TDLibRemoteClient.java | 21 +++-- .../tdlibsession/td/TdResultMessage.java | 33 ------- .../tdlibsession/td/middle/ExecuteObject.java | 32 ++++++- .../LazyTdExecuteObjectMessageCodec.java | 41 +++++++++ .../middle/LazyTdResultListMessageCodec.java | 44 ++++++++++ .../td/middle/LazyTdResultMessageCodec.java | 44 ++++++++++ .../td/middle/TdClusterManager.java | 7 +- .../tdlibsession/td/middle/TdResultList.java | 31 ++++++- .../td/middle/TdResultMessage.java | 57 ++++++++++++ .../td/middle/TdResultMessageCodec.java | 1 - .../client/AsyncTdMiddleEventBusClient.java | 42 +++++---- .../server/AsyncTdMiddleEventBusServer.java | 8 +- src/main/java/it/tdlight/utils/MonoUtils.java | 86 +++++++++++++++---- 13 files changed, 362 insertions(+), 85 deletions(-) delete mode 100644 src/main/java/it/tdlight/tdlibsession/td/TdResultMessage.java create mode 100644 src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java create mode 100644 src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultListMessageCodec.java create mode 100644 src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultMessageCodec.java create mode 100644 src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessage.java diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index f7d2c91..1a0dfef 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -53,16 +53,19 @@ public class TDLibRemoteClient implements AutoCloseable { String netInterface, int port, Set membersAddresses, - boolean enableStacktraces) { + boolean enableAsyncStacktraces, + boolean enableFullAsyncStacktraces) { this.securityInfo = securityInfo; this.masterHostname = masterHostname; this.netInterface = netInterface; this.port = port; this.membersAddresses = membersAddresses; - if (enableStacktraces && !runningFromIntelliJ()) { + if (enableAsyncStacktraces && !runningFromIntelliJ()) { ReactorDebugAgent.init(); - RxJava2Debug.enableRxJava2AssemblyTracking(new String[] {"it.tdlight.utils", "it.tdlight.tdlibsession"}); + } + if (enableAsyncStacktraces && enableFullAsyncStacktraces) { + RxJava2Debug.enableRxJava2AssemblyTracking(new String[]{"it.tdlight.utils", "it.tdlight.tdlibsession"}); } try { @@ -91,14 +94,22 @@ public class TDLibRemoteClient implements AutoCloseable { Path keyStorePasswordPath = Paths.get(args[4]); Path trustStorePath = Paths.get(args[5]); Path trustStorePasswordPath = Paths.get(args[6]); - boolean enableStacktraces = Boolean.parseBoolean(args[7]); + boolean enableAsyncStacktraces = Boolean.parseBoolean(args[7]); + boolean enableFullAsyncStacktraces = Boolean.parseBoolean(args[8]); var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false); loggerContext.setConfigLocation(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml").toURI()); var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath); - var client = new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses, enableStacktraces); + var client = new TDLibRemoteClient(securityInfo, + masterHostname, + netInterface, + port, + membersAddresses, + enableAsyncStacktraces, + enableFullAsyncStacktraces + ); client .start() diff --git a/src/main/java/it/tdlight/tdlibsession/td/TdResultMessage.java b/src/main/java/it/tdlight/tdlibsession/td/TdResultMessage.java deleted file mode 100644 index c7af65b..0000000 --- a/src/main/java/it/tdlight/tdlibsession/td/TdResultMessage.java +++ /dev/null @@ -1,33 +0,0 @@ -package it.tdlight.tdlibsession.td; - -import it.tdlight.jni.TdApi; -import it.tdlight.jni.TdApi.Error; -import it.tdlight.jni.TdApi.Object; -import java.util.StringJoiner; - -public class TdResultMessage { - public final TdApi.Object value; - public final TdApi.Error cause; - - public TdResultMessage(Object value, Error cause) { - this.value = value; - this.cause = cause; - } - - public TdResult toTdResult() { - if (value != null) { - //noinspection unchecked - return TdResult.succeeded((T) value); - } else { - return TdResult.failed(cause); - } - } - - @Override - public String toString() { - return new StringJoiner(", ", TdResultMessage.class.getSimpleName() + "[", "]") - .add("value=" + value) - .add("cause=" + cause) - .toString(); - } -} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java b/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java index 9766758..3b62466 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java @@ -1,28 +1,53 @@ package it.tdlight.tdlibsession.td.middle; +import io.vertx.core.buffer.Buffer; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Function; import java.util.Objects; import java.util.StringJoiner; public class ExecuteObject { - private final boolean executeDirectly; - private final TdApi.Function request; - public ExecuteObject(boolean executeDirectly, TdApi.Function request) { + private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec(); + + private boolean executeDirectly; + private TdApi.Function request; + private int pos; + private Buffer buffer; + + public ExecuteObject(boolean executeDirectly, Function request) { this.executeDirectly = executeDirectly; this.request = request; + if (request == null) throw new NullPointerException(); + } + + public ExecuteObject(int pos, Buffer buffer) { + this.pos = pos; + this.buffer = buffer; + } + + private void tryDecode() { + if (request == null) { + var data = realCodec.decodeFromWire(pos, buffer); + this.executeDirectly = data.executeDirectly; + this.request = data.request; + this.buffer = null; + } } public boolean isExecuteDirectly() { + tryDecode(); return executeDirectly; } public TdApi.Function getRequest() { + tryDecode(); return request; } @Override public boolean equals(Object o) { + tryDecode(); if (this == o) { return true; } @@ -40,6 +65,7 @@ public class ExecuteObject { @Override public int hashCode() { + tryDecode(); int result = (executeDirectly ? 1 : 0); result = 31 * result + (request != null ? request.hashCode() : 0); return result; diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java new file mode 100644 index 0000000..cfcad7c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java @@ -0,0 +1,41 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; + +public class LazyTdExecuteObjectMessageCodec implements MessageCodec { + + private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec(); + + public LazyTdExecuteObjectMessageCodec() { + super(); + } + + @Override + public void encodeToWire(Buffer buffer, ExecuteObject t) { + realCodec.encodeToWire(buffer, t); + } + + @Override + public ExecuteObject decodeFromWire(int pos, Buffer buffer) { + return new ExecuteObject(pos, buffer); + } + + @Override + public ExecuteObject transform(ExecuteObject t) { + // If a message is sent *locally* across the event bus. + // This sends message just as is + return t; + } + + @Override + public String name() { + return "ExecuteObjectCodec"; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultListMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultListMessageCodec.java new file mode 100644 index 0000000..400000e --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultListMessageCodec.java @@ -0,0 +1,44 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; + +@SuppressWarnings("rawtypes") +public class LazyTdResultListMessageCodec implements MessageCodec { + + private final String codecName; + private static final TdResultListMessageCodec realCodec = new TdResultListMessageCodec(); + + public LazyTdResultListMessageCodec() { + super(); + this.codecName = "TdOptListCodec"; + } + + @Override + public void encodeToWire(Buffer buffer, TdResultList t) { + realCodec.encodeToWire(buffer, t); + } + + @Override + public TdResultList decodeFromWire(int pos, Buffer buffer) { + return new TdResultList(pos, buffer); + } + + @Override + public TdResultList transform(TdResultList t) { + // If a message is sent *locally* across the event bus. + // This sends message just as is + return t; + } + + @Override + public String name() { + return codecName; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultMessageCodec.java new file mode 100644 index 0000000..c48b191 --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdResultMessageCodec.java @@ -0,0 +1,44 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.MessageCodec; + +@SuppressWarnings("rawtypes") +public class LazyTdResultMessageCodec implements MessageCodec { + + private final String codecName; + private static final TdResultMessageCodec realCodec = new TdResultMessageCodec(); + + public LazyTdResultMessageCodec() { + super(); + this.codecName = "TdResultCodec"; + } + + @Override + public void encodeToWire(Buffer buffer, TdResultMessage t) { + realCodec.encodeToWire(buffer, t); + } + + @Override + public TdResultMessage decodeFromWire(int pos, Buffer buffer) { + return new TdResultMessage(pos, buffer); + } + + @Override + public TdResultMessage transform(TdResultMessage t) { + // If a message is sent *locally* across the event bus. + // This sends message just as is + return t; + } + + @Override + public String name() { + return codecName; + } + + @Override + public byte systemCodecID() { + // Always -1 + return -1; + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index af2be5a..a7ab54e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -23,7 +23,6 @@ import io.vertx.reactivex.core.eventbus.MessageConsumer; import io.vertx.reactivex.core.shareddata.SharedData; import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager; import it.tdlight.common.ConstructorDetector; -import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.utils.MonoUtils; import java.nio.channels.AlreadyBoundException; import java.util.ArrayList; @@ -55,9 +54,9 @@ public class TdClusterManager { vertx .eventBus() .getDelegate() - .registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec()) - .registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()) - .registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()) + .registerDefaultCodec(TdResultList.class, new LazyTdResultListMessageCodec()) + .registerDefaultCodec(ExecuteObject.class, new LazyTdExecuteObjectMessageCodec()) + .registerDefaultCodec(TdResultMessage.class, new LazyTdResultMessageCodec()) .registerDefaultCodec(StartSessionMessage.class, new StartSessionMessageCodec()) .registerDefaultCodec(EndSessionMessage.class, new EndSessionMessageCodec()); for (Class value : ConstructorDetector.getTDConstructorsUnsafe().values()) { diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java index 8a4abab..2ab589d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java @@ -1,5 +1,6 @@ package it.tdlight.tdlibsession.td.middle; +import io.vertx.core.buffer.Buffer; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; import java.util.List; @@ -7,33 +8,58 @@ import java.util.Objects; import java.util.StringJoiner; public class TdResultList { - private final List values; - private final Error error; + + private static final TdResultListMessageCodec realCodec = new TdResultListMessageCodec(); + + private List values; + private Error error; + private int pos; + private Buffer buffer; public TdResultList(List values) { this.values = values; this.error = null; + if (values == null) throw new NullPointerException("Null message"); } public TdResultList(TdApi.Error error) { this.values = null; this.error = error; + if (error == null) throw new NullPointerException("Null message"); + } + + public TdResultList(int pos, Buffer buffer) { + this.pos = pos; + this.buffer = buffer; + } + + private void tryDecode() { + if (error == null && values == null) { + var value = realCodec.decodeFromWire(pos, buffer); + this.values = value.values; + this.error = value.error; + this.buffer = null; + } } public List value() { + tryDecode(); return values; } public TdApi.Error error() { + tryDecode(); return error; } public boolean succeeded() { + tryDecode(); return error == null && values != null; } @Override public boolean equals(Object o) { + tryDecode(); if (this == o) { return true; } @@ -51,6 +77,7 @@ public class TdResultList { @Override public int hashCode() { + tryDecode(); int result = values != null ? values.hashCode() : 0; result = 31 * result + (error != null ? error.hashCode() : 0); return result; diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessage.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessage.java new file mode 100644 index 0000000..8d6147c --- /dev/null +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessage.java @@ -0,0 +1,57 @@ +package it.tdlight.tdlibsession.td.middle; + +import io.vertx.core.buffer.Buffer; +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Error; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.tdlibsession.td.TdResult; +import java.util.StringJoiner; + +public class TdResultMessage { + + private static final TdResultMessageCodec realCodec = new TdResultMessageCodec(); + + public TdApi.Object value; + public TdApi.Error cause; + private int pos; + private Buffer buffer; + + public TdResultMessage(Object value, Error cause) { + this.value = value; + this.cause = cause; + if (value == null && cause == null) throw new NullPointerException("Null message"); + } + + public TdResultMessage(int pos, Buffer buffer) { + this.pos = pos; + this.buffer = buffer; + } + + public TdResult toTdResult() { + if (value != null) { + //noinspection unchecked + return TdResult.succeeded((T) value); + } else if (cause != null) { + return TdResult.failed(cause); + } else { + var data = realCodec.decodeFromWire(pos, buffer); + this.value = data.value; + this.cause = data.cause; + this.buffer = null; + if (value != null) { + //noinspection unchecked + return TdResult.succeeded((T) value); + } else { + return TdResult.failed(cause); + } + } + } + + @Override + public String toString() { + return new StringJoiner(", ", TdResultMessage.class.getSimpleName() + "[", "]") + .add("value=" + value) + .add("cause=" + cause) + .toString(); + } +} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java index b562123..5cf568b 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultMessageCodec.java @@ -3,7 +3,6 @@ package it.tdlight.tdlibsession.td.middle; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import it.tdlight.jni.TdApi; -import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.utils.VertxBufferInputStream; import it.tdlight.utils.VertxBufferOutputStream; import java.io.IOException; diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 2af2e44..8b96b51 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -9,7 +9,7 @@ import it.tdlight.jni.TdApi.Function; import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; -import it.tdlight.tdlibsession.td.TdResultMessage; +import it.tdlight.tdlibsession.td.middle.TdResultMessage; import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle; import it.tdlight.tdlibsession.td.middle.EndSessionMessage; import it.tdlight.tdlibsession.td.middle.ExecuteObject; @@ -43,7 +43,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final One binlog = Sinks.one(); - private final One> updates = Sinks.one(); + private final One> updates = Sinks.one(); // This will only result in a successful completion, never completes in other ways private final Empty updatesStreamEnd = Sinks.one(); // This will only result in a crash, never completes in other ways @@ -157,7 +157,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { deliveryOptionsWithTimeout ).as(MonoUtils::toMono); }) - .flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) + .flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic())) .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) .takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> { logger.trace("About to kill pinger because updates stream ended"); @@ -188,16 +188,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .getDelegate()); })) .flatMap(updateConsumer -> { - // Here the updates will be piped from the server to the client - var updateConsumerFlux = MonoUtils.fromConsumer(updateConsumer); - // Return when the registration of all the consumers has been done across the cluster return Mono .fromRunnable(() -> logger.trace("Emitting updates flux to sink")) - .then(MonoUtils.emitValue(updates, updateConsumerFlux)) + .then(MonoUtils.emitValue(updates, updateConsumer)) .doOnSuccess(s -> logger.trace("Emitted updates flux to sink")) .doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster")) - .then(updateConsumer.rxCompletionHandler().as(MonoUtils::toMono)) .doOnSuccess(s -> logger.trace("Registered update consumer across the cluster")); }) .doOnSuccess(s ->logger.trace("Set up updates listener")) @@ -207,15 +203,23 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { @Override public Flux receive() { // Here the updates will be received + return Mono .fromRunnable(() -> logger.trace("Called receive() from parent")) - .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) - .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) - .doOnSuccess(s -> logger.trace("About to read updates flux")) .then(updates.asMono().publishOn(Schedulers.single())) .timeout(Duration.ofSeconds(5)) .publishOn(Schedulers.single()) - .flatMapMany(updatesFlux -> updatesFlux.publishOn(Schedulers.single())) + .flatMap(MonoUtils::fromConsumerAdvanced) + .flatMapMany(registration -> Mono + .fromRunnable(() -> logger.trace("Registering updates flux")) + .then(registration.getT1()) + .doOnSuccess(s -> logger.trace("Registered updates flux")) + .doOnSuccess(s -> logger.trace("Sending ready-to-receive")) + .then(cluster.getEventBus().rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply")) + .doOnSuccess(s -> logger.trace("About to read updates flux")) + .thenMany(registration.getT2()) + ) .takeUntilOther(Flux .merge( crash.asMono() @@ -227,6 +231,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { throw ex; }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) ) + .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) ) .flatMapSequential(updates -> { if (updates.succeeded()) { @@ -239,13 +244,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .flatMapSequential(this::interceptUpdate) // Redirect errors to crash sink .doOnError(crash::tryEmitError) - .onErrorResume(ex -> Mono.empty()) + .onErrorResume(ex -> { + logger.trace("Absorbing the error, the error has been published using the crash sink", ex); + return Mono.empty(); + }) .doOnTerminate(updatesStreamEnd::tryEmitEmpty) .publishOn(Schedulers.single()); } private Mono interceptUpdate(TdApi.Object update) { + logger.trace("Received update {}", update.getClass().getSimpleName()); switch (update.getConstructor()) { case TdApi.UpdateAuthorizationState.CONSTRUCTOR: var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; @@ -253,8 +262,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { case TdApi.AuthorizationStateClosed.CONSTRUCTOR: return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib")) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) - .doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length))) - .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog())) + .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.boundedElastic())) + .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length))) + .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) .publishOn(Schedulers.boundedElastic()) .thenReturn(update); @@ -281,7 +291,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { } else { return resp.body().toTdResult(); } - }).publishOn(Schedulers.boundedElastic()) + }).subscribeOn(Schedulers.boundedElastic()) ) .doOnSuccess(s -> logger.trace("Executed request")) ) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 3c077ee..8245279 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -17,7 +17,7 @@ import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient; import it.tdlight.tdlibsession.td.TdError; -import it.tdlight.tdlibsession.td.TdResultMessage; +import it.tdlight.tdlibsession.td.middle.TdResultMessage; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; import it.tdlight.tdlibsession.td.direct.TelegramClientFactory; @@ -148,7 +148,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .flatMapSequential(tuple -> { var msg = tuple.getT1(); var body = tuple.getT2(); - logger.trace("Received execute request {}", body); + logger.trace("Received execute request {}", body.getRequest().getClass().getSimpleName()); var request = overrideRequest(body.getRequest(), botId); return td .execute(request, body.isExecuteDirectly()) @@ -202,17 +202,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .take(1) .limitRequest(1) .single() + .doOnNext(s -> logger.trace("Received ready-to-receive request from client")) .flatMap(msg -> this.pipeFlux .asMono() .timeout(Duration.ofSeconds(5)) .map(pipeFlux -> Tuples.of(msg, pipeFlux))) .doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex)) + .doOnNext(s -> logger.trace("Replying to ready-to-receive request")) .flatMapMany(tuple -> { var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); tuple.getT1().reply(EMPTY, opts); logger.trace("Replied to ready-to-receive"); + logger.trace("Start piping data"); + // Start piping the data return tuple.getT2().doOnSubscribe(s -> { logger.trace("Subscribed to updates pipe"); diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 2fcc832..420c69e 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -43,9 +43,12 @@ import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.One; import reactor.core.publisher.SynchronousSink; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; import reactor.util.context.Context; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; public class MonoUtils { @@ -300,34 +303,35 @@ public class MonoUtils { return fromEmitResultFuture(sink.tryEmitEmpty()); } - public static Mono> unicastBackpressureSinkStream() { + public static Mono> unicastBackpressureSinkStream(Scheduler scheduler) { Many sink = Sinks.many().unicast().onBackpressureBuffer(); - return asStream(sink, null, null, 1); + return asStream(sink, scheduler, null, null, 1); } /** * Create a sink that can be written from a writeStream */ - public static Mono> unicastBackpressureStream(int maxBackpressureQueueSize) { + public static Mono> unicastBackpressureStream(Scheduler scheduler, int maxBackpressureQueueSize) { Queue boundedQueue = Queues.get(maxBackpressureQueueSize).get(); var queueSize = Flux .interval(Duration.ZERO, Duration.ofMillis(500)) .map(n -> boundedQueue.size()); Empty termination = Sinks.empty(); Many sink = Sinks.many().unicast().onBackpressureBuffer(boundedQueue, termination::tryEmitEmpty); - return asStream(sink, queueSize, termination, maxBackpressureQueueSize); + return asStream(sink, scheduler, queueSize, termination, maxBackpressureQueueSize); } - public static Mono> unicastBackpressureErrorStream() { + public static Mono> unicastBackpressureErrorStream(Scheduler scheduler) { Many sink = Sinks.many().unicast().onBackpressureError(); - return asStream(sink, null, null, 1); + return asStream(sink, scheduler, null, null, 1); } public static Mono> asStream(Many sink, + Scheduler scheduler, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { - return SinkRWStream.create(sink, backpressureSize, termination, maxBackpressureQueueSize); + return SinkRWStream.create(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize); } private static Future toVertxFuture(Mono toTransform) { @@ -341,20 +345,59 @@ public class MonoUtils { return (Mono) mono; } + /** + * This method fails to guarantee that the consumer gets registered on all clusters before returning. + * Use fromConsumerAdvanced if you want better stability. + */ + @Deprecated public static Flux fromConsumer(MessageConsumer messageConsumer) { return Flux.>create(sink -> { - messageConsumer.handler(sink::next); - messageConsumer.endHandler(e -> sink.complete()); - sink.onDispose(messageConsumer::unregister); - }) - .startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))) + messageConsumer.endHandler(e -> sink.complete()); + sink.onDispose(messageConsumer::unregister); + }) + //.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))) .flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) .subscribeOn(Schedulers.boundedElastic()); } + public static Mono, Flux>> fromConsumerAdvanced(MessageConsumer messageConsumer) { + return Mono., Flux>>fromCallable(() -> { + Many> messages = Sinks.many().unicast().onBackpressureError(); + Empty registrationRequested = Sinks.empty(); + Empty registrationCompletion = Sinks.empty(); + messageConsumer.endHandler(e -> { + messages.tryEmitComplete(); + registrationCompletion.tryEmitEmpty(); + }); + messageConsumer.handler(messages::tryEmitNext); + + Flux dataFlux = Flux + .concatDelayError( + messages.asFlux() + .flatMap(msg -> Mono + .fromCallable(msg::body) + .subscribeOn(Schedulers.boundedElastic()) + ), + messageConsumer.rxUnregister().as(MonoUtils::toMono) + ) + .doOnSubscribe(s -> registrationRequested.tryEmitEmpty()); + + Mono registrationCompletionMono = Mono.empty() + .doOnSubscribe(s -> registrationRequested.tryEmitEmpty()) + .then(registrationRequested.asMono()) + .doOnSuccess(s -> logger.trace("Subscribed to registration completion mono")) + .doOnSuccess(s -> logger.trace("Waiting for consumer registration completion...")) + .then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)) + .doOnSuccess(s -> logger.trace("Consumer registered")) + .share(); + return Tuples.of(registrationCompletionMono, dataFlux); + }).subscribeOn(Schedulers.boundedElastic()); + } + public static class SinkRWStream implements io.vertx.core.streams.WriteStream, io.vertx.core.streams.ReadStream { private final Many sink; + private final Scheduler scheduler; private final Flux backpressureSize; private final Empty termination; private Handler exceptionHandler = e -> {}; @@ -364,6 +407,7 @@ public class MonoUtils { private volatile boolean writeQueueFull = false; private SinkRWStream(Many sink, + Scheduler scheduler, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { @@ -372,6 +416,7 @@ public class MonoUtils { this.backpressureSize = backpressureSize; this.termination = termination; this.sink = sink; + this.scheduler = scheduler; } public Mono> initialize() { @@ -409,14 +454,15 @@ public class MonoUtils { } public static Mono> create(Many sink, + Scheduler scheduler, @Nullable Flux backpressureSize, @Nullable Empty termination, int maxBackpressureQueueSize) { - return new SinkRWStream(sink, backpressureSize, termination, maxBackpressureQueueSize).initialize(); + return new SinkRWStream(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize).initialize(); } public Flux readAsFlux() { - return sink.asFlux().publishOn(Schedulers.parallel()); + return sink.asFlux(); } public ReactiveReactorReadStream readAsStream() { @@ -449,7 +495,7 @@ public class MonoUtils { @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribe(new CoreSubscriber() { + sink.asFlux().publishOn(scheduler).subscribe(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) { @@ -554,10 +600,12 @@ public class MonoUtils { public static class FluxReadStream implements io.vertx.core.streams.ReadStream { private final Flux flux; + private final Scheduler scheduler; private Handler exceptionHandler = e -> {}; - public FluxReadStream(Flux flux) { + public FluxReadStream(Flux flux, Scheduler scheduler) { this.flux = flux; + this.scheduler = scheduler; } public Flux readAsFlux() { @@ -587,7 +635,7 @@ public class MonoUtils { @SuppressWarnings("DuplicatedCode") @Override public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - flux.publishOn(Schedulers.boundedElastic()).subscribe(new CoreSubscriber() { + flux.publishOn(scheduler).subscribe(new CoreSubscriber() { @Override public void onSubscribe(@NotNull Subscription s) { @@ -740,8 +788,8 @@ public class MonoUtils { this.rs = ReadStream.newInstance(rs); } - public ReactiveReactorReadStream(Flux s) { - this.rs = ReadStream.newInstance(new FluxReadStream<>(s)); + public ReactiveReactorReadStream(Flux s, Scheduler scheduler) { + this.rs = ReadStream.newInstance(new FluxReadStream<>(s, scheduler)); } @Override