diff --git a/pom.xml b/pom.xml index 91973e0..1a0e254 100644 --- a/pom.xml +++ b/pom.xml @@ -135,6 +135,11 @@ common-utils 1.1.2 + + io.netty + netty-tcnative-boringssl-static + 2.0.30.Final + it.tdlight diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index b6879dd..aa32f81 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -170,6 +170,10 @@ public class TDLibRemoteClient implements AutoCloseable { .chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate()) .then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath)) .then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono)) + .then(MonoUtils.fromBlockingMaybe(() -> { + msg.reply(new byte[0]); + return null; + })) .publishOn(Schedulers.single()) .subscribe( v -> {}, @@ -177,7 +181,7 @@ public class TDLibRemoteClient implements AutoCloseable { logger.error("Failed to deploy bot verticle", ex); msg.fail(500, "Failed to deploy bot verticle: " + ex.getMessage()); }, - () -> msg.reply(new byte[0]) + () -> {} ); }); }); diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index c3580de..c24e856 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -106,7 +106,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { }); }) .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.single()) - ); + .publishOn(Schedulers.boundedElastic()) + ) + .subscribeOn(Schedulers.boundedElastic()); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index 32f7909..af2be5a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -154,21 +154,25 @@ public class TdClusterManager { vertxOptions.getEventBusOptions().setTrustStoreOptions(trustStoreOptions); vertxOptions.getEventBusOptions().setHost(masterHostname); vertxOptions.getEventBusOptions().setPort(port + 1); - vertxOptions.getEventBusOptions().setSsl(true).setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2")); + vertxOptions + .getEventBusOptions() + .setUseAlpn(true) + .setSsl(true) + .setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2")); vertxOptions.getEventBusOptions().setClientAuth(ClientAuth.REQUIRED); } else { mgr = null; vertxOptions.setClusterManager(null); } - vertxOptions.setPreferNativeTransport(false); + vertxOptions.setPreferNativeTransport(true); vertxOptions.setMetricsOptions(new MetricsOptions().setEnabled(false)); // check for blocked threads every 5s vertxOptions.setBlockedThreadCheckInterval(5); vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.SECONDS); - // warn if an event loop thread handler took more than 100ms to execute - vertxOptions.setMaxEventLoopExecuteTime(100); - vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS); + // warn if an event loop thread handler took more than 10s to execute + vertxOptions.setMaxEventLoopExecuteTime(10); + vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS); // warn if an worker thread handler took more than 10s to execute vertxOptions.setMaxWorkerExecuteTime(10); vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS); @@ -177,14 +181,14 @@ public class TdClusterManager { vertxOptions.setWarningExceptionTimeUnit(TimeUnit.MILLISECONDS); return Mono - .create(sink -> { + .defer(() -> { if (mgr != null) { - Vertx.clusteredVertx(vertxOptions, MonoUtils.toHandler(sink)); + return Vertx.rxClusteredVertx(vertxOptions).as(MonoUtils::toMono).subscribeOn(Schedulers.boundedElastic()); } else { - sink.success(Vertx.vertx(vertxOptions)); + return Mono.just(Vertx.vertx(vertxOptions)); } }) - .publishOn(Schedulers.boundedElastic()) + .publishOn(Schedulers.single()) .flatMap(vertx -> Mono .fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx)) .publishOn(Schedulers.boundedElastic()) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index c72649b..b197813 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -170,16 +170,15 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .timeout(Duration.ofSeconds(5)) .publishOn(Schedulers.single()) .flatMapMany(tdResultListFlux -> tdResultListFlux.publishOn(Schedulers.single())) - .startWith(MonoUtils - .castVoid(Mono.fromRunnable(() -> { - cluster.getEventBus().send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout); - }).subscribeOn(Schedulers.boundedElastic())) - ) .timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> { var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)"); ex.setStackTrace(new StackTraceElement[0]); throw ex; })) + .doOnSubscribe(s -> cluster.getEventBus().send(botAddress + ".ready-to-receive", + EMPTY, + deliveryOptionsWithTimeout + )) .flatMapSequential(updates -> { if (updates.succeeded()) { return Flux.fromIterable(updates.value()); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index e4cc7da..7a0dc9e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -91,6 +91,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd .doOnNext(s -> logger.trace("Received update from tdlib: {}", s)) .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) + .subscribeOn(Schedulers.boundedElastic()) .publishOn(Schedulers.single()); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index cda42d2..d048376 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -327,7 +327,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } return false; }) - .flatMapSequential(update -> Mono.fromCallable(() -> { + .flatMapSequential(update -> MonoUtils.fromBlockingSingle(() -> { if (update.getConstructor() == TdApi.Error.CONSTRUCTOR) { var error = (Error) update; throw new TdError(error.code, error.message); diff --git a/src/main/java/it/tdlight/utils/BatchSubscriber.java b/src/main/java/it/tdlight/utils/BatchSubscriber.java new file mode 100644 index 0000000..82a6876 --- /dev/null +++ b/src/main/java/it/tdlight/utils/BatchSubscriber.java @@ -0,0 +1,154 @@ +package it.tdlight.utils; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; +import static reactor.core.Exceptions.addSuppressed; +import static reactor.core.publisher.Operators.cancelledSubscription; +import static reactor.core.publisher.Operators.onErrorDropped; +import static reactor.core.publisher.Operators.onOperatorError; +import static reactor.core.publisher.Operators.setOnce; +import static reactor.core.publisher.Operators.terminate; +import static reactor.core.scheduler.Schedulers.parallel; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.scheduler.Scheduler; + +public abstract class BatchSubscriber implements CoreSubscriber { + + private static final Logger log = LoggerFactory.getLogger(BatchSubscriber.class); + + private final Scheduler scheduler; + private final int batchSize; + private final Duration timeout; + + private final BlockingQueue buffer = new LinkedBlockingQueue<>(); + private final AtomicInteger requests = new AtomicInteger(0); + + private final AtomicReference flushTimer = new AtomicReference<>(); + private final Runnable flushTask = () -> { + log.trace("timeout [{}] -> flush", buffer.size()); + flush(); + }; + + private volatile Subscription subscription; + private static AtomicReferenceFieldUpdater S = newUpdater(BatchSubscriber.class, Subscription.class, "subscription"); + + public BatchSubscriber(int batchSize, Duration timeout) { + this.batchSize = batchSize; + this.timeout = timeout; + this.scheduler = parallel(); + } + + @Override + public void onSubscribe(Subscription s) { + setOnce(S, this, s); + } + + @Override + public void onNext(T record) { + try { + buffer.add(record); + if (requests.get() > 0) { + if (buffer.size() >= batchSize) { + log.trace("+ value [{}] -> flush", buffer.size()); + flush(); + } + else { + log.trace("+ value [{}] -> flush in {}ms", buffer.size(), timeout.toMillis()); + scheduleFlush(); + } + } + else { + log.trace("+ value [{}] -> buffer", buffer.size()); + } + } + catch (Throwable t) { + onError(onOperatorError(subscription, t, record, currentContext())); + } + } + + @Override + public void onError(Throwable t) { + if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) { + try { + suspendFlush(); + } + catch (Throwable e) { + t = addSuppressed(e, t); + } + } + + onErrorDropped(t, currentContext()); + } + + @Override + public void onComplete() { + if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) { + try { + suspendFlush(); + } + catch (Throwable e) { } + } + } + + // Implement what to do with a batch (either full or partial due to timeout). + // Could be publish to another subscriber. + public abstract void flush(List batch); + + private void flush() { + suspendFlush(); + + List batch = new ArrayList<>(batchSize); + buffer.drainTo(batch, batchSize); + flush(batch); + + requests.decrementAndGet(); + log.trace("- request [{}]", requests.get()); + } + + private void scheduleFlush() { + flushTimer.updateAndGet(current -> { + if (current != null) current.dispose(); + return scheduler.schedule(flushTask, timeout.toMillis(), MILLISECONDS); + }); + } + + private void suspendFlush() { + flushTimer.updateAndGet(current -> { + if (current != null) current.dispose(); + return null; + }); + } + + public void request() { + if (requests.get() == 0 && buffer.size() >= batchSize) { + log.trace(". request [{}] -> flush", buffer.size()); + flush(); + } + else { + int required = requests.incrementAndGet() == 1 + ? batchSize - buffer.size() + : batchSize; + log.trace("+ request [{}] -> request {} values", buffer.size(), required); + subscription.request(required); + + if (!buffer.isEmpty()) scheduleFlush(); + } + } + + public void cancel() { + terminate(S, this); + } +} \ No newline at end of file diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 27b5d98..8492f6b 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -100,7 +100,7 @@ public class MonoUtils { } public static Mono fromBlockingMaybe(Callable callable) { - return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single()); + return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()); } public static Mono fromBlockingSingle(Callable callable) { @@ -230,15 +230,15 @@ public class MonoUtils { } public static Mono toMono(Single single) { - return Mono.fromDirect(single.toFlowable()); + return Mono.from(single.toFlowable()); } public static Mono toMono(Maybe single) { - return Mono.fromDirect(single.toFlowable()); + return Mono.from(single.toFlowable()); } public static Mono toMono(Completable completable) { - return Mono.fromDirect(completable.toFlowable()); + return Mono.from(completable.toFlowable()); } public static Completable toCompletable(Mono s) { @@ -348,8 +348,7 @@ public class MonoUtils { sink.onDispose(messageConsumer::unregister); }) .subscribeOn(Schedulers.boundedElastic()) - .publishOn(Schedulers.single()) - .flatMapSequential(msg -> Mono.fromCallable(msg::body).publishOn(Schedulers.boundedElastic())); + .flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())); } public static class SinkRWStream implements io.vertx.core.streams.WriteStream, io.vertx.core.streams.ReadStream {