This commit is contained in:
Andrea Cavalli 2021-01-26 16:29:45 +01:00
parent 9cea2cc5e0
commit 1b2fc7e4c1
13 changed files with 362 additions and 85 deletions

View File

@ -53,16 +53,19 @@ public class TDLibRemoteClient implements AutoCloseable {
String netInterface,
int port,
Set<String> membersAddresses,
boolean enableStacktraces) {
boolean enableAsyncStacktraces,
boolean enableFullAsyncStacktraces) {
this.securityInfo = securityInfo;
this.masterHostname = masterHostname;
this.netInterface = netInterface;
this.port = port;
this.membersAddresses = membersAddresses;
if (enableStacktraces && !runningFromIntelliJ()) {
if (enableAsyncStacktraces && !runningFromIntelliJ()) {
ReactorDebugAgent.init();
RxJava2Debug.enableRxJava2AssemblyTracking(new String[] {"it.tdlight.utils", "it.tdlight.tdlibsession"});
}
if (enableAsyncStacktraces && enableFullAsyncStacktraces) {
RxJava2Debug.enableRxJava2AssemblyTracking(new String[]{"it.tdlight.utils", "it.tdlight.tdlibsession"});
}
try {
@ -91,14 +94,22 @@ public class TDLibRemoteClient implements AutoCloseable {
Path keyStorePasswordPath = Paths.get(args[4]);
Path trustStorePath = Paths.get(args[5]);
Path trustStorePasswordPath = Paths.get(args[6]);
boolean enableStacktraces = Boolean.parseBoolean(args[7]);
boolean enableAsyncStacktraces = Boolean.parseBoolean(args[7]);
boolean enableFullAsyncStacktraces = Boolean.parseBoolean(args[8]);
var loggerContext = (org.apache.logging.log4j.core.LoggerContext) LogManager.getContext(false);
loggerContext.setConfigLocation(TDLibRemoteClient.class.getResource("/tdlib-session-container-log4j2.xml").toURI());
var securityInfo = new SecurityInfo(keyStorePath, keyStorePasswordPath, trustStorePath, trustStorePasswordPath);
var client = new TDLibRemoteClient(securityInfo, masterHostname, netInterface, port, membersAddresses, enableStacktraces);
var client = new TDLibRemoteClient(securityInfo,
masterHostname,
netInterface,
port,
membersAddresses,
enableAsyncStacktraces,
enableFullAsyncStacktraces
);
client
.start()

View File

@ -1,33 +0,0 @@
package it.tdlight.tdlibsession.td;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Object;
import java.util.StringJoiner;
public class TdResultMessage {
public final TdApi.Object value;
public final TdApi.Error cause;
public TdResultMessage(Object value, Error cause) {
this.value = value;
this.cause = cause;
}
public <T extends Object> TdResult<T> toTdResult() {
if (value != null) {
//noinspection unchecked
return TdResult.succeeded((T) value);
} else {
return TdResult.failed(cause);
}
}
@Override
public String toString() {
return new StringJoiner(", ", TdResultMessage.class.getSimpleName() + "[", "]")
.add("value=" + value)
.add("cause=" + cause)
.toString();
}
}

View File

@ -1,28 +1,53 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import java.util.Objects;
import java.util.StringJoiner;
public class ExecuteObject {
private final boolean executeDirectly;
private final TdApi.Function request;
public ExecuteObject(boolean executeDirectly, TdApi.Function request) {
private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec();
private boolean executeDirectly;
private TdApi.Function request;
private int pos;
private Buffer buffer;
public ExecuteObject(boolean executeDirectly, Function request) {
this.executeDirectly = executeDirectly;
this.request = request;
if (request == null) throw new NullPointerException();
}
public ExecuteObject(int pos, Buffer buffer) {
this.pos = pos;
this.buffer = buffer;
}
private void tryDecode() {
if (request == null) {
var data = realCodec.decodeFromWire(pos, buffer);
this.executeDirectly = data.executeDirectly;
this.request = data.request;
this.buffer = null;
}
}
public boolean isExecuteDirectly() {
tryDecode();
return executeDirectly;
}
public TdApi.Function getRequest() {
tryDecode();
return request;
}
@Override
public boolean equals(Object o) {
tryDecode();
if (this == o) {
return true;
}
@ -40,6 +65,7 @@ public class ExecuteObject {
@Override
public int hashCode() {
tryDecode();
int result = (executeDirectly ? 1 : 0);
result = 31 * result + (request != null ? request.hashCode() : 0);
return result;

View File

@ -0,0 +1,41 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
public class LazyTdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject, ExecuteObject> {
private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec();
public LazyTdExecuteObjectMessageCodec() {
super();
}
@Override
public void encodeToWire(Buffer buffer, ExecuteObject t) {
realCodec.encodeToWire(buffer, t);
}
@Override
public ExecuteObject decodeFromWire(int pos, Buffer buffer) {
return new ExecuteObject(pos, buffer);
}
@Override
public ExecuteObject transform(ExecuteObject t) {
// If a message is sent *locally* across the event bus.
// This sends message just as is
return t;
}
@Override
public String name() {
return "ExecuteObjectCodec";
}
@Override
public byte systemCodecID() {
// Always -1
return -1;
}
}

View File

@ -0,0 +1,44 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
@SuppressWarnings("rawtypes")
public class LazyTdResultListMessageCodec implements MessageCodec<TdResultList, TdResultList> {
private final String codecName;
private static final TdResultListMessageCodec realCodec = new TdResultListMessageCodec();
public LazyTdResultListMessageCodec() {
super();
this.codecName = "TdOptListCodec";
}
@Override
public void encodeToWire(Buffer buffer, TdResultList t) {
realCodec.encodeToWire(buffer, t);
}
@Override
public TdResultList decodeFromWire(int pos, Buffer buffer) {
return new TdResultList(pos, buffer);
}
@Override
public TdResultList transform(TdResultList t) {
// If a message is sent *locally* across the event bus.
// This sends message just as is
return t;
}
@Override
public String name() {
return codecName;
}
@Override
public byte systemCodecID() {
// Always -1
return -1;
}
}

View File

@ -0,0 +1,44 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
@SuppressWarnings("rawtypes")
public class LazyTdResultMessageCodec implements MessageCodec<TdResultMessage, TdResultMessage> {
private final String codecName;
private static final TdResultMessageCodec realCodec = new TdResultMessageCodec();
public LazyTdResultMessageCodec() {
super();
this.codecName = "TdResultCodec";
}
@Override
public void encodeToWire(Buffer buffer, TdResultMessage t) {
realCodec.encodeToWire(buffer, t);
}
@Override
public TdResultMessage decodeFromWire(int pos, Buffer buffer) {
return new TdResultMessage(pos, buffer);
}
@Override
public TdResultMessage transform(TdResultMessage t) {
// If a message is sent *locally* across the event bus.
// This sends message just as is
return t;
}
@Override
public String name() {
return codecName;
}
@Override
public byte systemCodecID() {
// Always -1
return -1;
}
}

View File

@ -23,7 +23,6 @@ import io.vertx.reactivex.core.eventbus.MessageConsumer;
import io.vertx.reactivex.core.shareddata.SharedData;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import it.tdlight.common.ConstructorDetector;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.utils.MonoUtils;
import java.nio.channels.AlreadyBoundException;
import java.util.ArrayList;
@ -55,9 +54,9 @@ public class TdClusterManager {
vertx
.eventBus()
.getDelegate()
.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())
.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec())
.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec())
.registerDefaultCodec(TdResultList.class, new LazyTdResultListMessageCodec())
.registerDefaultCodec(ExecuteObject.class, new LazyTdExecuteObjectMessageCodec())
.registerDefaultCodec(TdResultMessage.class, new LazyTdResultMessageCodec())
.registerDefaultCodec(StartSessionMessage.class, new StartSessionMessageCodec())
.registerDefaultCodec(EndSessionMessage.class, new EndSessionMessageCodec());
for (Class<?> value : ConstructorDetector.getTDConstructorsUnsafe().values()) {

View File

@ -1,5 +1,6 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import java.util.List;
@ -7,33 +8,58 @@ import java.util.Objects;
import java.util.StringJoiner;
public class TdResultList {
private final List<TdApi.Object> values;
private final Error error;
private static final TdResultListMessageCodec realCodec = new TdResultListMessageCodec();
private List<TdApi.Object> values;
private Error error;
private int pos;
private Buffer buffer;
public TdResultList(List<TdApi.Object> values) {
this.values = values;
this.error = null;
if (values == null) throw new NullPointerException("Null message");
}
public TdResultList(TdApi.Error error) {
this.values = null;
this.error = error;
if (error == null) throw new NullPointerException("Null message");
}
public TdResultList(int pos, Buffer buffer) {
this.pos = pos;
this.buffer = buffer;
}
private void tryDecode() {
if (error == null && values == null) {
var value = realCodec.decodeFromWire(pos, buffer);
this.values = value.values;
this.error = value.error;
this.buffer = null;
}
}
public List<TdApi.Object> value() {
tryDecode();
return values;
}
public TdApi.Error error() {
tryDecode();
return error;
}
public boolean succeeded() {
tryDecode();
return error == null && values != null;
}
@Override
public boolean equals(Object o) {
tryDecode();
if (this == o) {
return true;
}
@ -51,6 +77,7 @@ public class TdResultList {
@Override
public int hashCode() {
tryDecode();
int result = values != null ? values.hashCode() : 0;
result = 31 * result + (error != null ? error.hashCode() : 0);
return result;

View File

@ -0,0 +1,57 @@
package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.tdlibsession.td.TdResult;
import java.util.StringJoiner;
public class TdResultMessage {
private static final TdResultMessageCodec realCodec = new TdResultMessageCodec();
public TdApi.Object value;
public TdApi.Error cause;
private int pos;
private Buffer buffer;
public TdResultMessage(Object value, Error cause) {
this.value = value;
this.cause = cause;
if (value == null && cause == null) throw new NullPointerException("Null message");
}
public TdResultMessage(int pos, Buffer buffer) {
this.pos = pos;
this.buffer = buffer;
}
public <T extends Object> TdResult<T> toTdResult() {
if (value != null) {
//noinspection unchecked
return TdResult.succeeded((T) value);
} else if (cause != null) {
return TdResult.failed(cause);
} else {
var data = realCodec.decodeFromWire(pos, buffer);
this.value = data.value;
this.cause = data.cause;
this.buffer = null;
if (value != null) {
//noinspection unchecked
return TdResult.succeeded((T) value);
} else {
return TdResult.failed(cause);
}
}
}
@Override
public String toString() {
return new StringJoiner(", ", TdResultMessage.class.getSimpleName() + "[", "]")
.add("value=" + value)
.add("cause=" + cause)
.toString();
}
}

View File

@ -3,7 +3,6 @@ package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.io.IOException;

View File

@ -9,7 +9,7 @@ import it.tdlight.jni.TdApi.Function;
import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.middle.TdResultMessage;
import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.EndSessionMessage;
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
@ -43,7 +43,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final One<BinlogAsyncFile> binlog = Sinks.one();
private final One<Flux<TdResultList>> updates = Sinks.one();
private final One<MessageConsumer<TdResultList>> updates = Sinks.one();
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> updatesStreamEnd = Sinks.one();
// This will only result in a crash, never completes in other ways
@ -157,7 +157,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
deliveryOptionsWithTimeout
).as(MonoUtils::toMono);
})
.flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()))
.flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic()))
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
.takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> {
logger.trace("About to kill pinger because updates stream ended");
@ -188,16 +188,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.getDelegate());
}))
.flatMap(updateConsumer -> {
// Here the updates will be piped from the server to the client
var updateConsumerFlux = MonoUtils.fromConsumer(updateConsumer);
// Return when the registration of all the consumers has been done across the cluster
return Mono
.fromRunnable(() -> logger.trace("Emitting updates flux to sink"))
.then(MonoUtils.emitValue(updates, updateConsumerFlux))
.then(MonoUtils.emitValue(updates, updateConsumer))
.doOnSuccess(s -> logger.trace("Emitted updates flux to sink"))
.doOnSuccess(s -> logger.trace("Waiting to register update consumer across the cluster"))
.then(updateConsumer.rxCompletionHandler().as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("Registered update consumer across the cluster"));
})
.doOnSuccess(s ->logger.trace("Set up updates listener"))
@ -207,15 +203,23 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
@Override
public Flux<TdApi.Object> receive() {
// Here the updates will be received
return Mono
.fromRunnable(() -> logger.trace("Called receive() from parent"))
.doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux"))
.then(updates.asMono().publishOn(Schedulers.single()))
.timeout(Duration.ofSeconds(5))
.publishOn(Schedulers.single())
.flatMapMany(updatesFlux -> updatesFlux.publishOn(Schedulers.single()))
.flatMap(MonoUtils::fromConsumerAdvanced)
.flatMapMany(registration -> Mono
.fromRunnable(() -> logger.trace("Registering updates flux"))
.then(registration.getT1())
.doOnSuccess(s -> logger.trace("Registered updates flux"))
.doOnSuccess(s -> logger.trace("Sending ready-to-receive"))
.then(cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux"))
.thenMany(registration.getT2())
)
.takeUntilOther(Flux
.merge(
crash.asMono()
@ -227,6 +231,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
throw ex;
}).onErrorResume(ex -> MonoUtils.emitError(crash, ex)))
)
.doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end"))
)
.flatMapSequential(updates -> {
if (updates.succeeded()) {
@ -239,13 +244,17 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.flatMapSequential(this::interceptUpdate)
// Redirect errors to crash sink
.doOnError(crash::tryEmitError)
.onErrorResume(ex -> Mono.empty())
.onErrorResume(ex -> {
logger.trace("Absorbing the error, the error has been published using the crash sink", ex);
return Mono.empty();
})
.doOnTerminate(updatesStreamEnd::tryEmitEmpty)
.publishOn(Schedulers.single());
}
private Mono<TdApi.Object> interceptUpdate(TdApi.Object update) {
logger.trace("Received update {}", update.getClass().getSimpleName());
switch (update.getConstructor()) {
case TdApi.UpdateAuthorizationState.CONSTRUCTOR:
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
@ -253,8 +262,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return Mono.fromRunnable(() -> logger.trace("Received AuthorizationStateClosed from tdlib"))
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
.doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length)))
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog()))
.flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.boundedElastic()))
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length)))
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
.publishOn(Schedulers.boundedElastic())
.thenReturn(update);
@ -281,7 +291,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
} else {
return resp.body().toTdResult();
}
}).publishOn(Schedulers.boundedElastic())
}).subscribeOn(Schedulers.boundedElastic())
)
.doOnSuccess(s -> logger.trace("Executed request"))
)

View File

@ -17,7 +17,7 @@ import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.middle.TdResultMessage;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
import it.tdlight.tdlibsession.td.direct.TelegramClientFactory;
@ -148,7 +148,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.flatMapSequential(tuple -> {
var msg = tuple.getT1();
var body = tuple.getT2();
logger.trace("Received execute request {}", body);
logger.trace("Received execute request {}", body.getRequest().getClass().getSimpleName());
var request = overrideRequest(body.getRequest(), botId);
return td
.execute(request, body.isExecuteDirectly())
@ -202,17 +202,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.take(1)
.limitRequest(1)
.single()
.doOnNext(s -> logger.trace("Received ready-to-receive request from client"))
.flatMap(msg -> this.pipeFlux
.asMono()
.timeout(Duration.ofSeconds(5))
.map(pipeFlux -> Tuples.of(msg, pipeFlux)))
.doOnError(ex -> logger.error("Error when processing a ready-to-receive request", ex))
.doOnNext(s -> logger.trace("Replying to ready-to-receive request"))
.flatMapMany(tuple -> {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
tuple.getT1().reply(EMPTY, opts);
logger.trace("Replied to ready-to-receive");
logger.trace("Start piping data");
// Start piping the data
return tuple.getT2().doOnSubscribe(s -> {
logger.trace("Subscribed to updates pipe");

View File

@ -43,9 +43,12 @@ import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
public class MonoUtils {
@ -300,34 +303,35 @@ public class MonoUtils {
return fromEmitResultFuture(sink.tryEmitEmpty());
}
public static <T> Mono<SinkRWStream<T>> unicastBackpressureSinkStream() {
public static <T> Mono<SinkRWStream<T>> unicastBackpressureSinkStream(Scheduler scheduler) {
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
return asStream(sink, null, null, 1);
return asStream(sink, scheduler, null, null, 1);
}
/**
* Create a sink that can be written from a writeStream
*/
public static <T> Mono<SinkRWStream<T>> unicastBackpressureStream(int maxBackpressureQueueSize) {
public static <T> Mono<SinkRWStream<T>> unicastBackpressureStream(Scheduler scheduler, int maxBackpressureQueueSize) {
Queue<T> boundedQueue = Queues.<T>get(maxBackpressureQueueSize).get();
var queueSize = Flux
.interval(Duration.ZERO, Duration.ofMillis(500))
.map(n -> boundedQueue.size());
Empty<Void> termination = Sinks.empty();
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer(boundedQueue, termination::tryEmitEmpty);
return asStream(sink, queueSize, termination, maxBackpressureQueueSize);
return asStream(sink, scheduler, queueSize, termination, maxBackpressureQueueSize);
}
public static <T> Mono<SinkRWStream<T>> unicastBackpressureErrorStream() {
public static <T> Mono<SinkRWStream<T>> unicastBackpressureErrorStream(Scheduler scheduler) {
Many<T> sink = Sinks.many().unicast().onBackpressureError();
return asStream(sink, null, null, 1);
return asStream(sink, scheduler, null, null, 1);
}
public static <T> Mono<SinkRWStream<T>> asStream(Many<T> sink,
Scheduler scheduler,
@Nullable Flux<Integer> backpressureSize,
@Nullable Empty<Void> termination,
int maxBackpressureQueueSize) {
return SinkRWStream.create(sink, backpressureSize, termination, maxBackpressureQueueSize);
return SinkRWStream.create(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize);
}
private static Future<Void> toVertxFuture(Mono<Void> toTransform) {
@ -341,20 +345,59 @@ public class MonoUtils {
return (Mono) mono;
}
/**
* This method fails to guarantee that the consumer gets registered on all clusters before returning.
* Use fromConsumerAdvanced if you want better stability.
*/
@Deprecated
public static <T> Flux<T> fromConsumer(MessageConsumer<T> messageConsumer) {
return Flux.<Message<T>>create(sink -> {
messageConsumer.handler(sink::next);
messageConsumer.endHandler(e -> sink.complete());
sink.onDispose(messageConsumer::unregister);
})
.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)))
messageConsumer.endHandler(e -> sink.complete());
sink.onDispose(messageConsumer::unregister);
})
//.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)))
.flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()))
.subscribeOn(Schedulers.boundedElastic());
}
public static <T> Mono<Tuple2<Mono<Void>, Flux<T>>> fromConsumerAdvanced(MessageConsumer<T> messageConsumer) {
return Mono.<Tuple2<Mono<Void>, Flux<T>>>fromCallable(() -> {
Many<Message<T>> messages = Sinks.many().unicast().onBackpressureError();
Empty<Void> registrationRequested = Sinks.empty();
Empty<Void> registrationCompletion = Sinks.empty();
messageConsumer.endHandler(e -> {
messages.tryEmitComplete();
registrationCompletion.tryEmitEmpty();
});
messageConsumer.<T>handler(messages::tryEmitNext);
Flux<T> dataFlux = Flux
.<T>concatDelayError(
messages.asFlux()
.<T>flatMap(msg -> Mono
.fromCallable(msg::body)
.subscribeOn(Schedulers.boundedElastic())
),
messageConsumer.rxUnregister().as(MonoUtils::toMono)
)
.doOnSubscribe(s -> registrationRequested.tryEmitEmpty());
Mono<Void> registrationCompletionMono = Mono.empty()
.doOnSubscribe(s -> registrationRequested.tryEmitEmpty())
.then(registrationRequested.asMono())
.doOnSuccess(s -> logger.trace("Subscribed to registration completion mono"))
.doOnSuccess(s -> logger.trace("Waiting for consumer registration completion..."))
.<Void>then(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))
.doOnSuccess(s -> logger.trace("Consumer registered"))
.share();
return Tuples.of(registrationCompletionMono, dataFlux);
}).subscribeOn(Schedulers.boundedElastic());
}
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {
private final Many<T> sink;
private final Scheduler scheduler;
private final Flux<Integer> backpressureSize;
private final Empty<Void> termination;
private Handler<Throwable> exceptionHandler = e -> {};
@ -364,6 +407,7 @@ public class MonoUtils {
private volatile boolean writeQueueFull = false;
private SinkRWStream(Many<T> sink,
Scheduler scheduler,
@Nullable Flux<Integer> backpressureSize,
@Nullable Empty<Void> termination,
int maxBackpressureQueueSize) {
@ -372,6 +416,7 @@ public class MonoUtils {
this.backpressureSize = backpressureSize;
this.termination = termination;
this.sink = sink;
this.scheduler = scheduler;
}
public Mono<SinkRWStream<T>> initialize() {
@ -409,14 +454,15 @@ public class MonoUtils {
}
public static <T> Mono<SinkRWStream<T>> create(Many<T> sink,
Scheduler scheduler,
@Nullable Flux<Integer> backpressureSize,
@Nullable Empty<Void> termination,
int maxBackpressureQueueSize) {
return new SinkRWStream<T>(sink, backpressureSize, termination, maxBackpressureQueueSize).initialize();
return new SinkRWStream<T>(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize).initialize();
}
public Flux<T> readAsFlux() {
return sink.asFlux().publishOn(Schedulers.parallel());
return sink.asFlux();
}
public ReactiveReactorReadStream<T> readAsStream() {
@ -449,7 +495,7 @@ public class MonoUtils {
@Override
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribe(new CoreSubscriber<T>() {
sink.asFlux().publishOn(scheduler).subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
@ -554,10 +600,12 @@ public class MonoUtils {
public static class FluxReadStream<T> implements io.vertx.core.streams.ReadStream<T> {
private final Flux<T> flux;
private final Scheduler scheduler;
private Handler<Throwable> exceptionHandler = e -> {};
public FluxReadStream(Flux<T> flux) {
public FluxReadStream(Flux<T> flux, Scheduler scheduler) {
this.flux = flux;
this.scheduler = scheduler;
}
public Flux<T> readAsFlux() {
@ -587,7 +635,7 @@ public class MonoUtils {
@SuppressWarnings("DuplicatedCode")
@Override
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
flux.publishOn(Schedulers.boundedElastic()).subscribe(new CoreSubscriber<T>() {
flux.publishOn(scheduler).subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
@ -740,8 +788,8 @@ public class MonoUtils {
this.rs = ReadStream.newInstance(rs);
}
public ReactiveReactorReadStream(Flux<T> s) {
this.rs = ReadStream.newInstance(new FluxReadStream<>(s));
public ReactiveReactorReadStream(Flux<T> s, Scheduler scheduler) {
this.rs = ReadStream.newInstance(new FluxReadStream<>(s, scheduler));
}
@Override