Connection bugfixes and use tcnative

This commit is contained in:
Andrea Cavalli 2021-01-26 03:46:20 +01:00
parent cb3609586b
commit a6ae239836
9 changed files with 191 additions and 24 deletions

View File

@ -135,6 +135,11 @@
<artifactId>common-utils</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>2.0.30.Final</version>
</dependency>
<dependency>
<groupId>it.tdlight</groupId>

View File

@ -170,6 +170,10 @@ public class TDLibRemoteClient implements AutoCloseable {
.chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
.then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath))
.then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
.then(MonoUtils.fromBlockingMaybe(() -> {
msg.reply(new byte[0]);
return null;
}))
.publishOn(Schedulers.single())
.subscribe(
v -> {},
@ -177,7 +181,7 @@ public class TDLibRemoteClient implements AutoCloseable {
logger.error("Failed to deploy bot verticle", ex);
msg.fail(500, "Failed to deploy bot verticle: " + ex.getMessage());
},
() -> msg.reply(new byte[0])
() -> {}
);
});
});

View File

@ -106,7 +106,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
});
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single())
);
.publishOn(Schedulers.boundedElastic())
)
.subscribeOn(Schedulers.boundedElastic());
}
}

View File

@ -154,21 +154,25 @@ public class TdClusterManager {
vertxOptions.getEventBusOptions().setTrustStoreOptions(trustStoreOptions);
vertxOptions.getEventBusOptions().setHost(masterHostname);
vertxOptions.getEventBusOptions().setPort(port + 1);
vertxOptions.getEventBusOptions().setSsl(true).setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2"));
vertxOptions
.getEventBusOptions()
.setUseAlpn(true)
.setSsl(true)
.setEnabledSecureTransportProtocols(Set.of("TLSv1.3", "TLSv1.2"));
vertxOptions.getEventBusOptions().setClientAuth(ClientAuth.REQUIRED);
} else {
mgr = null;
vertxOptions.setClusterManager(null);
}
vertxOptions.setPreferNativeTransport(false);
vertxOptions.setPreferNativeTransport(true);
vertxOptions.setMetricsOptions(new MetricsOptions().setEnabled(false));
// check for blocked threads every 5s
vertxOptions.setBlockedThreadCheckInterval(5);
vertxOptions.setBlockedThreadCheckIntervalUnit(TimeUnit.SECONDS);
// warn if an event loop thread handler took more than 100ms to execute
vertxOptions.setMaxEventLoopExecuteTime(100);
vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);
// warn if an event loop thread handler took more than 10s to execute
vertxOptions.setMaxEventLoopExecuteTime(10);
vertxOptions.setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
// warn if an worker thread handler took more than 10s to execute
vertxOptions.setMaxWorkerExecuteTime(10);
vertxOptions.setMaxWorkerExecuteTimeUnit(TimeUnit.SECONDS);
@ -177,14 +181,14 @@ public class TdClusterManager {
vertxOptions.setWarningExceptionTimeUnit(TimeUnit.MILLISECONDS);
return Mono
.<Vertx>create(sink -> {
.defer(() -> {
if (mgr != null) {
Vertx.clusteredVertx(vertxOptions, MonoUtils.toHandler(sink));
return Vertx.rxClusteredVertx(vertxOptions).as(MonoUtils::toMono).subscribeOn(Schedulers.boundedElastic());
} else {
sink.success(Vertx.vertx(vertxOptions));
return Mono.just(Vertx.vertx(vertxOptions));
}
})
.publishOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single())
.flatMap(vertx -> Mono
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
.publishOn(Schedulers.boundedElastic())

View File

@ -170,16 +170,15 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.timeout(Duration.ofSeconds(5))
.publishOn(Schedulers.single())
.flatMapMany(tdResultListFlux -> tdResultListFlux.publishOn(Schedulers.single()))
.startWith(MonoUtils
.castVoid(Mono.<Void>fromRunnable(() -> {
cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout);
}).subscribeOn(Schedulers.boundedElastic()))
)
.timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> {
var ex = new ConnectException("Server did not respond to 12 pings after 1 minute (5 seconds per ping)");
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}))
.doOnSubscribe(s -> cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive",
EMPTY,
deliveryOptionsWithTimeout
))
.flatMapSequential(updates -> {
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());

View File

@ -91,6 +91,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
.doOnNext(s -> logger.trace("Received update from tdlib: {}", s))
.doOnError(ex -> logger.info("TdMiddle verticle error", ex))
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"))
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single());
}

View File

@ -327,7 +327,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
return false;
})
.flatMapSequential(update -> Mono.fromCallable(() -> {
.flatMapSequential(update -> MonoUtils.fromBlockingSingle(() -> {
if (update.getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = (Error) update;
throw new TdError(error.code, error.message);

View File

@ -0,0 +1,154 @@
package it.tdlight.utils;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
import static reactor.core.Exceptions.addSuppressed;
import static reactor.core.publisher.Operators.cancelledSubscription;
import static reactor.core.publisher.Operators.onErrorDropped;
import static reactor.core.publisher.Operators.onOperatorError;
import static reactor.core.publisher.Operators.setOnce;
import static reactor.core.publisher.Operators.terminate;
import static reactor.core.scheduler.Schedulers.parallel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.scheduler.Scheduler;
public abstract class BatchSubscriber<T> implements CoreSubscriber<T> {
private static final Logger log = LoggerFactory.getLogger(BatchSubscriber.class);
private final Scheduler scheduler;
private final int batchSize;
private final Duration timeout;
private final BlockingQueue<T> buffer = new LinkedBlockingQueue<>();
private final AtomicInteger requests = new AtomicInteger(0);
private final AtomicReference<Disposable> flushTimer = new AtomicReference<>();
private final Runnable flushTask = () -> {
log.trace("timeout [{}] -> flush", buffer.size());
flush();
};
private volatile Subscription subscription;
private static AtomicReferenceFieldUpdater<BatchSubscriber, Subscription> S = newUpdater(BatchSubscriber.class, Subscription.class, "subscription");
public BatchSubscriber(int batchSize, Duration timeout) {
this.batchSize = batchSize;
this.timeout = timeout;
this.scheduler = parallel();
}
@Override
public void onSubscribe(Subscription s) {
setOnce(S, this, s);
}
@Override
public void onNext(T record) {
try {
buffer.add(record);
if (requests.get() > 0) {
if (buffer.size() >= batchSize) {
log.trace("+ value [{}] -> flush", buffer.size());
flush();
}
else {
log.trace("+ value [{}] -> flush in {}ms", buffer.size(), timeout.toMillis());
scheduleFlush();
}
}
else {
log.trace("+ value [{}] -> buffer", buffer.size());
}
}
catch (Throwable t) {
onError(onOperatorError(subscription, t, record, currentContext()));
}
}
@Override
public void onError(Throwable t) {
if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) {
try {
suspendFlush();
}
catch (Throwable e) {
t = addSuppressed(e, t);
}
}
onErrorDropped(t, currentContext());
}
@Override
public void onComplete() {
if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) {
try {
suspendFlush();
}
catch (Throwable e) { }
}
}
// Implement what to do with a batch (either full or partial due to timeout).
// Could be publish to another subscriber.
public abstract void flush(List<T> batch);
private void flush() {
suspendFlush();
List<T> batch = new ArrayList<>(batchSize);
buffer.drainTo(batch, batchSize);
flush(batch);
requests.decrementAndGet();
log.trace("- request [{}]", requests.get());
}
private void scheduleFlush() {
flushTimer.updateAndGet(current -> {
if (current != null) current.dispose();
return scheduler.schedule(flushTask, timeout.toMillis(), MILLISECONDS);
});
}
private void suspendFlush() {
flushTimer.updateAndGet(current -> {
if (current != null) current.dispose();
return null;
});
}
public void request() {
if (requests.get() == 0 && buffer.size() >= batchSize) {
log.trace(". request [{}] -> flush", buffer.size());
flush();
}
else {
int required = requests.incrementAndGet() == 1
? batchSize - buffer.size()
: batchSize;
log.trace("+ request [{}] -> request {} values", buffer.size(), required);
subscription.request(required);
if (!buffer.isEmpty()) scheduleFlush();
}
}
public void cancel() {
terminate(S, this);
}
}

View File

@ -100,7 +100,7 @@ public class MonoUtils {
}
public static <T> Mono<T> fromBlockingMaybe(Callable<T> callable) {
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()).publishOn(Schedulers.single());
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic());
}
public static <T> Mono<T> fromBlockingSingle(Callable<T> callable) {
@ -230,15 +230,15 @@ public class MonoUtils {
}
public static <T> Mono<T> toMono(Single<T> single) {
return Mono.fromDirect(single.toFlowable());
return Mono.from(single.toFlowable());
}
public static <T> Mono<T> toMono(Maybe<T> single) {
return Mono.fromDirect(single.toFlowable());
return Mono.from(single.toFlowable());
}
public static <T> Mono<T> toMono(Completable completable) {
return Mono.fromDirect(completable.toFlowable());
return Mono.from(completable.toFlowable());
}
public static <T> Completable toCompletable(Mono<T> s) {
@ -348,8 +348,7 @@ public class MonoUtils {
sink.onDispose(messageConsumer::unregister);
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.single())
.flatMapSequential(msg -> Mono.fromCallable(msg::body).publishOn(Schedulers.boundedElastic()));
.flatMapSequential(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()));
}
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {