This commit is contained in:
Andrea Cavalli 2021-01-25 16:06:05 +01:00
parent 5074d985b3
commit e807771b53
9 changed files with 60 additions and 66 deletions

View File

@ -99,7 +99,7 @@ public class EventBusFlux {
var responseHandler = MonoUtils.toHandler(itemSink);
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, responseHandler);
}))
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.single())
.subscribe(response -> {}, error -> {
if (error instanceof ReplyException) {
var errorMessageCode = ((ReplyException) error).failureCode();
@ -208,7 +208,7 @@ public class EventBusFlux {
sink.success();
}
});
}).subscribeOn(Schedulers.single()).share();
}).publishOn(Schedulers.single()).share();
return Tuples.of(servedMono, fatalErrorSink.asMono());
}
@ -311,7 +311,7 @@ public class EventBusFlux {
}
}
})))
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.onBackpressureBuffer()
.publishOn(Schedulers.single())
.subscribe(v -> {}, emitter::error);

View File

@ -102,10 +102,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
result -> logger.warn("Close result: {}", result),
ex -> logger.error("Error when disposing td client", ex)
);
}).subscribeOn(Schedulers.single()).subscribe();
}).publishOn(Schedulers.single()).subscribe();
});
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
);
}
}

View File

@ -60,7 +60,7 @@ public class TestClient implements TelegramClient {
.asFlux()
.buffer(50)
.doOnNext(ub -> logger.trace("Received update block of size {}", ub.size()))
.subscribeOn(testClientScheduler)
.publishOn(testClientScheduler)
.subscribe(updatesHandler::onUpdates, updateExceptionHandler::onException);
for (String featureName : features) {
@ -70,7 +70,7 @@ public class TestClient implements TelegramClient {
.repeat()
.buffer(100)
.doOnNext(updatesHandler::onUpdates)
.subscribeOn(testClientScheduler)
.publishOn(testClientScheduler)
.subscribe();
break;
default:

View File

@ -94,7 +94,7 @@ public class AsyncTdEasy {
}
})
.doOnComplete(() -> {
authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> {
authState.asFlux().take(1).single().publishOn(scheduler).subscribe(authState -> {
onUpdatesTerminated();
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
logger.warn("Updates stream has closed while"
@ -104,7 +104,7 @@ public class AsyncTdEasy {
}
});
}).doOnError(ex -> {
authState.asFlux().take(1).single().subscribeOn(scheduler).subscribe(authState -> {
authState.asFlux().take(1).single().publishOn(scheduler).subscribe(authState -> {
onUpdatesTerminated();
if (authState.getConstructor() != AuthorizationStateClosed.CONSTRUCTOR) {
logger.warn("Updates stream has terminated with an error while"
@ -138,7 +138,7 @@ public class AsyncTdEasy {
return true;
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
.flatMap(_v -> {
this.settings.tryEmitNext(settings);
return Mono.empty();
@ -561,7 +561,7 @@ public class AsyncTdEasy {
))
.filterWhen(file -> Mono
.fromCallable(() -> Files.exists(file))
.subscribeOn(Schedulers.boundedElastic()))
.publishOn(Schedulers.boundedElastic()))
.doOnNext(directory -> {
try {
if (!Files.walk(directory)

View File

@ -42,6 +42,6 @@ public class ScannerParameterRequestHandler implements ParameterRequestHandler {
} else {
return result;
}
}).subscribeOn(Schedulers.boundedElastic());
}).publishOn(Schedulers.boundedElastic());
}
}

View File

@ -13,6 +13,7 @@ import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.core.net.JksOptions;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.reactivex.core.Vertx;
@ -160,13 +161,14 @@ public class TdClusterManager {
vertxOptions.setClusterManager(null);
}
vertxOptions.setPreferNativeTransport(true);
vertxOptions.setPreferNativeTransport(false);
vertxOptions.setMetricsOptions(new MetricsOptions().setEnabled(false));
// check for blocked threads every 5s
vertxOptions.setBlockedThreadCheckInterval(5);
vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.SECONDS);
// warn if an event loop thread handler took more than 100ms to execute
vertxOptions.setMaxEventLoopExecuteTime(50);
vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);
// warn if an event loop thread handler took more than 5s to execute
vertxOptions.setMaxEventLoopExecuteTime(5);
vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
// warn if an worker thread handler took more than 10s to execute
vertxOptions.setMaxWorkerExecuteTime(10);
vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
@ -185,7 +187,7 @@ public class TdClusterManager {
.publishOn(Schedulers.boundedElastic())
.flatMap(vertx -> Mono
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.boundedElastic())
);
}

View File

@ -3,7 +3,6 @@ package it.tdlight.tdlibsession.td.middle.client;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.json.JsonObject;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
@ -20,11 +19,9 @@ import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.utils.BinlogAsyncFile;
import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.MonoUtils;
import it.tdlight.utils.MonoUtils.SinkRWStream;
import java.net.ConnectException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@ -46,7 +43,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final One<BinlogAsyncFile> binlog = Sinks.one();
private final One<SinkRWStream<Message<TdResultList>>> updates = Sinks.one();
private final One<Flux<TdResultList>> updates = Sinks.one();
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> updatesStreamEnd = Sinks.one();
// This will only result in a crash, never completes in other ways
@ -65,9 +62,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
}
private Mono<AsyncTdMiddleEventBusClient> initialize() {
return MonoUtils.<Message<TdResultList>>unicastBackpressureSinkStream()
.flatMap(updates -> MonoUtils.emitValue(this.updates, updates))
.thenReturn(this);
return Mono.just(this);
}
public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager,
@ -150,24 +145,16 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.fromBlockingMaybe(() -> {
MessageConsumer<TdResultList> updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(
botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate());
updateConsumer.endHandler(h -> {
logger.error("<<<<<<<<<<<<<<<<EndHandler?>>>>>>>>>>>>>");
});
// Here the updates will be piped from the server to the client
updates
.asMono()
.timeout(Duration.ofSeconds(5))
.flatMap(updates -> updateConsumer
.rxPipeTo(updates.writeAsStream()).as(MonoUtils::toMono)
)
.publishOn(Schedulers.newSingle("td-client-updates-pipe"))
.subscribe();
var updateConsumerFlux = MonoUtils.fromConsumer(updateConsumer);
return updateConsumer;
// Return when the registration of all the consumers has been done across the cluster
return MonoUtils
.emitValue(updates, updateConsumerFlux)
.then(updateConsumer.rxCompletionHandler().as(MonoUtils::toMono));
})
// Return when the registration of all the consumers has been done across the cluster
.flatMap(updateConsumer -> updateConsumer.rxCompletionHandler().as(MonoUtils::toMono));
.then();
}
@Override
@ -179,10 +166,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.then()
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
.doOnSuccess(s -> logger.trace("About to read updates flux"))
.then(updates.asMono().timeout(Duration.ofSeconds(5)))
.flatMapMany(SinkRWStream::readAsFlux)
// Cast to fix bug of reactivex
.cast(io.vertx.core.eventbus.Message.class)
.then(updates.asMono())
.timeout(Duration.ofSeconds(5))
.flatMapMany(Flux::hide)
.timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)");
ex.setStackTrace(new StackTraceElement[0]);
@ -191,13 +177,11 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.doOnSubscribe(s -> Schedulers.boundedElastic().schedule(() -> {
cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout);
}))
.flatMapSequential(updates -> Mono.fromCallable((Callable<Object>) updates::body).publishOn(Schedulers.boundedElastic()))
.flatMapSequential(updates -> {
var result = (TdResultList) updates;
if (result.succeeded()) {
return Flux.fromIterable(result.value());
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());
} else {
return Mono.fromCallable(() -> TdResult.failed(result.error()).orElseThrow());
return Mono.fromCallable(() -> TdResult.failed(updates.error()).orElseThrow());
}
})
.flatMapSequential(this::interceptUpdate)

View File

@ -165,7 +165,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
})
.then()
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.single())
.subscribe(v -> {},
ex -> logger.error("Error when processing an execute request", ex),
() -> logger.trace("Finished handling execute requests")
@ -178,7 +178,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
BinlogUtils
.readBinlogConsumer(vertx, readBinlogConsumer, botId, local)
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.single())
.subscribe(v -> {}, ex -> logger.error("Error when processing a read-binlog request", ex));
MessageConsumer<byte[]> readyToReceiveConsumer = vertx.eventBus().consumer(botAddress + ".ready-to-receive");
@ -187,7 +187,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
return;
}
// Pipe the data
// Pipe the data
var pipeSubscription = Flux
.<Message<byte[]>>create(sink -> {
readyToReceiveConsumer.handler(sink::next);
@ -214,7 +214,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
})
.then()
.doOnSuccess(s -> logger.trace("Finished handling ready-to-receive requests (updates pipe ended)"))
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.single())
// Don't handle errors here. Handle them in pipeFlux
.subscribe(v -> {});
@ -233,22 +233,21 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
msg.reply(EMPTY, opts);
})
.then()
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.single())
.subscribe(v -> {},
ex -> logger.error("Error when processing a ping request", ex),
() -> logger.trace("Finished handling ping requests")
);
//noinspection ResultOfMethodCallIgnored
executeConsumer
.rxCompletionHandler()
.andThen(readBinlogConsumer.rxCompletionHandler())
.andThen(readyToReceiveConsumer.rxCompletionHandler())
.andThen(pingConsumer.rxCompletionHandler())
.subscribeOn(io.reactivex.schedulers.Schedulers.single())
.doOnComplete(() -> logger.trace("Finished preparing listeners"))
.subscribe(registrationSink::success, registrationSink::error);
.as(MonoUtils::toMono)
.doOnSuccess(s -> logger.trace("Finished preparing listeners"))
.publishOn(Schedulers.single())
.subscribe(v -> {}, registrationSink::error, registrationSink::success);
}));
}
@ -287,7 +286,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
// Since every consumer of ReadBinLog is identical, this should not pose a problem.
.delay(Duration.ofMinutes(30))
.then(ec.rxUnregister().as(MonoUtils::toMono))
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.single())
.subscribe())
)
)

View File

@ -9,6 +9,8 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import io.vertx.reactivex.core.streams.Pipe;
import io.vertx.reactivex.core.streams.ReadStream;
import io.vertx.reactivex.core.streams.WriteStream;
@ -213,11 +215,7 @@ public class MonoUtils {
public static <T extends Object> CompletableFuture<T> toFuture(Mono<T> mono) {
var cf = new CompletableFuture<T>();
mono.subscribe(value -> {
cf.complete(value);
}, ex -> {
cf.completeExceptionally(ex);
}, () -> cf.complete(null));
mono.subscribe(cf::complete, cf::completeExceptionally, () -> cf.complete(null));
return cf;
}
@ -334,7 +332,7 @@ public class MonoUtils {
private static Future<Void> toVertxFuture(Mono<Void> toTransform) {
var promise = Promise.<Void>promise();
toTransform.subscribeOn(Schedulers.single()).subscribe(next -> {}, promise::fail, promise::complete);
toTransform.publishOn(Schedulers.single()).subscribe(next -> {}, promise::fail, promise::complete);
return promise.future();
}
@ -343,6 +341,16 @@ public class MonoUtils {
return (Mono) mono;
}
public static <T> Flux<T> fromConsumer(MessageConsumer<T> messageConsumer) {
return Flux.<Message<T>>create(sink -> {
Schedulers.boundedElastic().schedule(() -> {
messageConsumer.handler(sink::next);
messageConsumer.endHandler(e -> sink.complete());
sink.onDispose(messageConsumer::unregister);
});
}).flatMapSequential(msg -> Mono.fromCallable(msg::body).publishOn(Schedulers.boundedElastic()));
}
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {
private final Many<T> sink;
@ -440,7 +448,7 @@ public class MonoUtils {
@Override
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribeWith(new CoreSubscriber<T>() {
sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
@ -575,9 +583,10 @@ public class MonoUtils {
private final AtomicBoolean fetchMode = new AtomicBoolean(false);
@SuppressWarnings("DuplicatedCode")
@Override
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
flux.publishOn(Schedulers.boundedElastic()).subscribeWith(new CoreSubscriber<T>() {
flux.publishOn(Schedulers.boundedElastic()).subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {