This commit is contained in:
Andrea Cavalli 2022-09-22 02:26:22 +02:00
parent c36812e052
commit 41be43d711
5 changed files with 48 additions and 13 deletions

View File

@ -128,14 +128,15 @@ public class AtomixReactiveApi implements ReactiveApi {
.then()
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
return loadSessions.then(Mono.fromRunnable(() -> {
return loadSessions.<Void>then(Mono.fromRunnable(() -> {
if (kafkaSharedTdlibServers != null) {
requestsSub = kafkaSharedTdlibServers.requests()
.onBackpressureError()
.doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data()))
.subscribeOn(Schedulers.parallel())
.subscribe();
}
}));
})).transform(ReactorUtils::subscribeOnce);
}
@Override

View File

@ -100,8 +100,9 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
}
})
.doFinally(s -> this.responses.remove(requestId));
requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.busyLooping(
HUNDRED_MS));
synchronized (requests) {
requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.FAIL_FAST);
}
return response;
});
}

View File

@ -7,6 +7,7 @@ import it.tdlight.common.utils.CantLoadLibrary;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.regex.Pattern;
@ -19,6 +20,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.kafka.receiver.KafkaReceiver;
@ -58,8 +60,9 @@ public abstract class KafkaConsumer<K> {
}
ReceiverOptions<Integer, K> receiverOptions = ReceiverOptions.<Integer, K>create(props)
.commitInterval(Duration.ofSeconds(10))
.commitBatchSize(65535)
.maxCommitAttempts(100);
.commitBatchSize(isQuickResponse() ? 64 : 1024)
.maxCommitAttempts(5)
.maxDeferredCommits((isQuickResponse() ? 64 : 1024) * 5);
Pattern pattern;
if (userId == null) {
pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\.\\d+");
@ -110,9 +113,8 @@ public abstract class KafkaConsumer<K> {
private Flux<Timestamped<K>> consumeMessagesInternal(@NotNull String subGroupId, @Nullable Long userId) {
return createReceiver(kafkaParameters.groupId() + "-" + subGroupId, userId)
.receiveAutoAck(isQuickResponse() ? null : 32)
.concatMap(Flux::collectList)
.flatMapIterable(Function.identity())
.receiveAutoAck(isQuickResponse() ? 1 : 4)
.concatMap(Function.identity())
.log("consume-messages" + (userId != null ? "-" + userId : ""),
Level.FINEST,
SignalType.REQUEST,
@ -127,7 +129,9 @@ public abstract class KafkaConsumer<K> {
return new Timestamped<>(1, record.value());
}
})
.transform(this::retryIfCleanup)
.transform(this::retryIfCommitFailed);
.transform(ReactorUtils::subscribeOnce);
//todo: check if they must be re-enabled
//.transform(this::retryIfCleanup)
//.transform(this::retryIfCommitFailed);
}
}

View File

@ -471,8 +471,11 @@ public abstract class ReactiveApiPublisher {
}
public void handleRequest(OnRequest<TdApi.Object> onRequestObj) {
handleRequestInternal(onRequestObj,
response -> this.responses.emitNext(response, EmitFailureHandler.busyLooping(HUNDRED_MS)));
handleRequestInternal(onRequestObj, response -> {
synchronized (this.responses) {
this.responses.emitNext(response, EmitFailureHandler.FAIL_FAST);
}
});
}
private void handleRequestInternal(OnRequest<TdApi.Object> onRequestObj, Consumer<Event.OnResponse.Response<TdApi.Object>> r) {

View File

@ -0,0 +1,26 @@
package it.tdlight.reactiveapi;
import java.util.concurrent.atomic.AtomicBoolean;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class ReactorUtils {
public static <V> Flux<V> subscribeOnce(Flux<V> f) {
AtomicBoolean subscribed = new AtomicBoolean();
return f.doOnSubscribe(s -> {
if (!subscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Can't subscribe more than once!");
}
});
}
public static <V> Mono<V> subscribeOnce(Mono<V> f) {
AtomicBoolean subscribed = new AtomicBoolean();
return f.doOnSubscribe(s -> {
if (!subscribed.compareAndSet(false, true)) {
throw new UnsupportedOperationException("Can't subscribe more than once!");
}
});
}
}