From 41be43d7112d8fae294c2386095ac9459acafd84 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 22 Sep 2022 02:26:22 +0200 Subject: [PATCH] Bugfixes --- .../reactiveapi/AtomixReactiveApi.java | 5 ++-- .../BaseAtomixReactiveApiClient.java | 5 ++-- .../it/tdlight/reactiveapi/KafkaConsumer.java | 18 ++++++++----- .../reactiveapi/ReactiveApiPublisher.java | 7 +++-- .../it/tdlight/reactiveapi/ReactorUtils.java | 26 +++++++++++++++++++ 5 files changed, 48 insertions(+), 13 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/ReactorUtils.java diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 2133e50..86bff6b 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -128,14 +128,15 @@ public class AtomixReactiveApi implements ReactiveApi { .then() .doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk")); - return loadSessions.then(Mono.fromRunnable(() -> { + return loadSessions.then(Mono.fromRunnable(() -> { if (kafkaSharedTdlibServers != null) { requestsSub = kafkaSharedTdlibServers.requests() + .onBackpressureError() .doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data())) .subscribeOn(Schedulers.parallel()) .subscribe(); } - })); + })).transform(ReactorUtils::subscribeOnce); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index b9fadc9..1259aeb 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -100,8 +100,9 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { } }) .doFinally(s -> this.responses.remove(requestId)); - requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.busyLooping( - HUNDRED_MS)); + synchronized (requests) { + requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.FAIL_FAST); + } return response; }); } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index a8c05e1..a96013b 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -7,6 +7,7 @@ import it.tdlight.common.utils.CantLoadLibrary; import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.logging.Level; import java.util.regex.Pattern; @@ -19,6 +20,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.SignalType; import reactor.kafka.receiver.KafkaReceiver; @@ -58,8 +60,9 @@ public abstract class KafkaConsumer { } ReceiverOptions receiverOptions = ReceiverOptions.create(props) .commitInterval(Duration.ofSeconds(10)) - .commitBatchSize(65535) - .maxCommitAttempts(100); + .commitBatchSize(isQuickResponse() ? 64 : 1024) + .maxCommitAttempts(5) + .maxDeferredCommits((isQuickResponse() ? 64 : 1024) * 5); Pattern pattern; if (userId == null) { pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\.\\d+"); @@ -110,9 +113,8 @@ public abstract class KafkaConsumer { private Flux> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) { return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId) - .receiveAutoAck(isQuickResponse() ? null : 32) - .concatMap(Flux::collectList) - .flatMapIterable(Function.identity()) + .receiveAutoAck(isQuickResponse() ? 1 : 4) + .concatMap(Function.identity()) .log("consume-messages" + (userId != null ? "-" + userId : ""), Level.FINEST, SignalType.REQUEST, @@ -127,7 +129,9 @@ public abstract class KafkaConsumer { return new Timestamped<>(1, record.value()); } }) - .transform(this::retryIfCleanup) - .transform(this::retryIfCommitFailed); + .transform(ReactorUtils::subscribeOnce); + //todo: check if they must be re-enabled + //.transform(this::retryIfCleanup) + //.transform(this::retryIfCommitFailed); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 716e20b..640e750 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -471,8 +471,11 @@ public abstract class ReactiveApiPublisher { } public void handleRequest(OnRequest onRequestObj) { - handleRequestInternal(onRequestObj, - response -> this.responses.emitNext(response, EmitFailureHandler.busyLooping(HUNDRED_MS))); + handleRequestInternal(onRequestObj, response -> { + synchronized (this.responses) { + this.responses.emitNext(response, EmitFailureHandler.FAIL_FAST); + } + }); } private void handleRequestInternal(OnRequest onRequestObj, Consumer> r) { diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java new file mode 100644 index 0000000..62a8098 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java @@ -0,0 +1,26 @@ +package it.tdlight.reactiveapi; + +import java.util.concurrent.atomic.AtomicBoolean; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class ReactorUtils { + + public static Flux subscribeOnce(Flux f) { + AtomicBoolean subscribed = new AtomicBoolean(); + return f.doOnSubscribe(s -> { + if (!subscribed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("Can't subscribe more than once!"); + } + }); + } + + public static Mono subscribeOnce(Mono f) { + AtomicBoolean subscribed = new AtomicBoolean(); + return f.doOnSubscribe(s -> { + if (!subscribed.compareAndSet(false, true)) { + throw new UnsupportedOperationException("Can't subscribe more than once!"); + } + }); + } +}