diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java index 92d413f..28a0a40 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java @@ -13,4 +13,9 @@ public class KafkaClientBoundConsumer extends KafkaConsumer { return KafkaChannelName.CLIENT_BOUND_EVENT; } + @Override + public boolean isQuickResponse() { + return false; + } + } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java index ec640b4..31f824d 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java @@ -49,15 +49,16 @@ public abstract class KafkaConsumer { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, getChannelName().getDeserializerClass()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, toIntExact(Duration.ofMinutes(5).toMillis())); - props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576"); - props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100"); - props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); - ReceiverOptions receiverOptions = ReceiverOptions - .create(props) - .commitInterval(Duration.ofSeconds(10)) - .commitBatchSize(65535) - .maxCommitAttempts(100) - .maxDeferredCommits(100); + if (isQuickResponse()) { + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000"); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); + } else { + props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000"); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1048576"); + props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100"); + } + ReceiverOptions receiverOptions = ReceiverOptions.create(props); Pattern pattern; if (userId == null) { pattern = Pattern.compile("tdlib\\." + getChannelName() + "\\.\\d+"); @@ -73,6 +74,8 @@ public abstract class KafkaConsumer { public abstract KafkaChannelName getChannelName(); + public abstract boolean isQuickResponse(); + protected Flux> retryIfCleanup(Flux> eventFlux) { return eventFlux.retryWhen(Retry .backoff(Long.MAX_VALUE, Duration.ofMillis(100)) @@ -114,7 +117,7 @@ public abstract class KafkaConsumer { SignalType.ON_ERROR, SignalType.ON_COMPLETE ) - .doOnNext(result -> result.receiverOffset().acknowledge()) + //.doOnNext(result -> result.receiverOffset().acknowledge()) .map(record -> { if (record.timestampType() == TimestampType.CREATE_TIME) { return new Timestamped<>(record.timestamp(), record.value()); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java index 0b8bba8..cedbc9f 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java @@ -26,7 +26,8 @@ public abstract class KafkaProducer { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); - props.put(ProducerConfig.ACKS_CONFIG, "1"); + //props.put(ProducerConfig.ACKS_CONFIG, "1"); + props.put(ProducerConfig.ACKS_CONFIG, "0"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java index 7a7b406..8d92ace 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java @@ -15,6 +15,7 @@ import java.util.logging.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.Disposable; +import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.SignalType; @@ -31,23 +32,18 @@ public class KafkaSharedTdlibClients implements Closeable { private final AtomicReference responsesSub = new AtomicReference<>(); private final Disposable requestsSub; private final AtomicReference eventsSub = new AtomicReference<>(); - private final Flux>>> responses; - private final Flux>> events; + private final Flux>> responses; + private final Flux> events; private final Many> requests = Sinks.many().unicast() .onBackpressureBuffer(Queues.>get(65535).get()); public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) { this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses") - .onBackpressureBuffer() - .groupBy(k1 -> k1.data().clientId(), 1) - .replay() + .publish() .autoConnect(1, this.responsesSub::set); this.events = kafkaTdlibClientsChannels.events().consumeMessages("td-handler") - .onBackpressureBuffer() - .groupBy(k -> k.data().userId(), 1) - .doOnNext(g -> LOG.info("Receiving updates of client: {}", g.key())) - .replay() + .publish() .autoConnect(1, this.eventsSub::set); this.requestsSub = kafkaTdlibClientsChannels.request() .sendMessages(0L, requests.asFlux()) @@ -56,20 +52,18 @@ public class KafkaSharedTdlibClients implements Closeable { } public Flux>> responses(long clientId) { - return responses.filter(group -> group.key() == clientId) - .take(1, true) - .singleOrEmpty() - .flatMapMany(Function.identity()) - .log("req-" + clientId, Level.FINE, SignalType.REQUEST); + return responses + .filter(group -> group.data().clientId() == clientId) + //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) + .log("req-" + clientId, Level.FINEST, SignalType.REQUEST); } public Flux> events(long userId) { - return events.filter(group -> group.key() == userId) - .take(1, true) - .singleOrEmpty() - .flatMapMany(Function.identity()) - .doOnSubscribe(s -> LOG.info("Reading updates of client: {}", userId)); - //.log("event-" + userId, Level.FINE, SignalType.REQUEST); + return events + .filter(group -> group.data().userId() == userId) + //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) + .doOnSubscribe(s -> LOG.info("Reading updates of client: {}", userId)) + .log("event-" + userId, Level.FINEST, SignalType.REQUEST); } public Many> requests() { diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java index 44cf37c..c0be0e2 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.logging.Level; import reactor.core.Disposable; +import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.SignalType; @@ -29,28 +30,25 @@ public class KafkaSharedTdlibServers implements Closeable { private final AtomicReference requestsSub = new AtomicReference<>(); private final Many> responses = Sinks.many().unicast().onBackpressureBuffer( Queues.>get(65535).get()); - private final Flux>>> requests; + private final Flux>> requests; public KafkaSharedTdlibServers(KafkaTdlibServersChannels kafkaTdlibServersChannels) { this.kafkaTdlibServersChannels = kafkaTdlibServersChannels; this.responsesSub = kafkaTdlibServersChannels.response() - .sendMessages(0L, responses.asFlux()) + .sendMessages(0L, responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) .subscribeOn(Schedulers.parallel()) .subscribe(); this.requests = kafkaTdlibServersChannels.request() .consumeMessages("td-requests") - .onBackpressureBuffer() - .groupBy(k -> k.data().userId(), 1) - .replay() + .publish() .autoConnect(1, this.requestsSub::set); } public Flux>> requests(long userId) { - return requests.filter(group -> group.key() == userId) - .take(1, true) - .singleOrEmpty() - .flatMapMany(Function.identity()) - .log("req-" + userId, Level.FINE, SignalType.REQUEST, SignalType.ON_NEXT); + return requests + .filter(group -> group.data().userId() == userId) + //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) + .log("requests-" + userId, Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT); } public Disposable events(Flux eventFlux) { diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java index 398493a..8de0b75 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java @@ -16,4 +16,9 @@ public class KafkaTdlibRequestConsumer extends KafkaConsumer