From a93d6d4e2473cb07815d4594c17bb7005b9c14ee Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 4 Oct 2022 12:43:24 +0200 Subject: [PATCH] Code cleanup --- pom.xml | 33 ++++++++++++++- .../reactiveapi/AtomixReactiveApi.java | 34 ++++++++------- .../BaseAtomixReactiveApiClient.java | 2 +- ...fkaChannelCodec.java => ChannelCodec.java} | 4 +- .../ChannelConsumerClientBoundEvent.java | 22 ++++++++++ .../ChannelConsumerTdlibRequest.java | 19 +++++++++ .../ChannelConsumerTdlibResponse.java | 18 ++++++++ .../tdlight/reactiveapi/ChannelFactory.java | 34 +++++++++++++++ .../ChannelProducerClientBoundEvent.java | 24 +++++++++++ .../ChannelProducerTdlibRequest.java | 17 ++++++++ .../ChannelProducerTdlibResponse.java | 18 ++++++++ ...ibClients.java => ClientsSharedTdlib.java} | 16 ++----- .../it/tdlight/reactiveapi/EventConsumer.java | 15 +++++++ .../it/tdlight/reactiveapi/EventProducer.java | 15 +++++++ .../reactiveapi/KafkaClientBoundConsumer.java | 36 ---------------- .../reactiveapi/KafkaClientBoundProducer.java | 29 ------------- .../KafkaTdlibClientsChannels.java | 14 ------- .../KafkaTdlibRequestConsumer.java | 27 ------------ .../KafkaTdlibRequestProducer.java | 20 --------- .../KafkaTdlibResponseConsumer.java | 27 ------------ .../KafkaTdlibResponseProducer.java | 21 ---------- .../KafkaTdlibServersChannels.java | 23 ---------- .../LiveAtomixReactiveApiClient.java | 4 +- .../reactiveapi/ReactiveApiPublisher.java | 12 +++--- .../reactiveapi/TdlibChannelsClients.java | 18 ++++++++ .../reactiveapi/TdlibChannelsServers.java | 24 +++++++++++ ...rs.java => TdlibChannelsSharedServer.java} | 12 ++---- .../{ => kafka}/KafkaConsumer.java | 42 ++++++++++++++----- .../{ => kafka}/KafkaProducer.java | 25 ++++++++--- 29 files changed, 345 insertions(+), 260 deletions(-) rename src/main/java/it/tdlight/reactiveapi/{KafkaChannelCodec.java => ChannelCodec.java} (92%) create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelFactory.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java rename src/main/java/it/tdlight/reactiveapi/{KafkaSharedTdlibClients.java => ClientsSharedTdlib.java} (80%) create mode 100644 src/main/java/it/tdlight/reactiveapi/EventConsumer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/EventProducer.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestProducer.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseProducer.java delete mode 100644 src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java create mode 100644 src/main/java/it/tdlight/reactiveapi/TdlibChannelsClients.java create mode 100644 src/main/java/it/tdlight/reactiveapi/TdlibChannelsServers.java rename src/main/java/it/tdlight/reactiveapi/{KafkaSharedTdlibServers.java => TdlibChannelsSharedServer.java} (82%) rename src/main/java/it/tdlight/reactiveapi/{ => kafka}/KafkaConsumer.java (81%) rename src/main/java/it/tdlight/reactiveapi/{ => kafka}/KafkaProducer.java (75%) diff --git a/pom.xml b/pom.xml index 4a803e2..8f2d48a 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,14 @@ io.projectreactor reactor-bom - 2020.0.22 + 2020.0.23 + pom + import + + + io.rsocket + rsocket-bom + 1.1.3 pom import @@ -72,7 +79,7 @@ reactor-tools original runtime - 3.4.22 + 3.4.23 org.jetbrains @@ -127,6 +134,22 @@ io.projectreactor.kafka reactor-kafka + + io.rsocket + rsocket-core + + + io.rsocket + rsocket-load-balancer + + + io.rsocket + rsocket-transport-local + + + io.rsocket + rsocket-transport-netty + org.apache.logging.log4j log4j-api @@ -151,6 +174,12 @@ net.minecrell terminalconsoleappender 1.3.0 + + + org.apache.logging.log4j + log4j-core + + diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index c7d2be8..fc2c5fc 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -2,9 +2,14 @@ package it.tdlight.reactiveapi; import static java.util.Objects.requireNonNull; +import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest; +import it.tdlight.reactiveapi.ChannelFactory.KafkaChannelFactory; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnRequest; +import it.tdlight.reactiveapi.Event.OnResponse; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; @@ -31,9 +36,9 @@ public class AtomixReactiveApi implements ReactiveApi { private final AtomixReactiveApiMode mode; - private final KafkaSharedTdlibClients kafkaSharedTdlibClients; + private final ClientsSharedTdlib kafkaSharedTdlibClients; @Nullable - private final KafkaSharedTdlibServers kafkaSharedTdlibServers; + private final TdlibChannelsSharedServer kafkaSharedTdlibServers; private final ReactiveApiMultiClient client; private final Set resultingEventTransformerSet; @@ -60,35 +65,36 @@ public class AtomixReactiveApi implements ReactiveApi { @Nullable DiskSessionsManager diskSessions, @NotNull Set resultingEventTransformerSet) { this.mode = mode; + ChannelFactory channelFactory = new KafkaChannelFactory(); if (mode != AtomixReactiveApiMode.SERVER) { - var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters); - var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters); - var kafkaClientBoundConsumers = new HashMap(); + EventProducer> kafkaTDLibRequestProducer = ChannelProducerTdlibRequest.create(channelFactory, kafkaParameters); + EventConsumer> kafkaTDLibResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory, kafkaParameters); + HashMap> kafkaClientBoundConsumers = new HashMap<>(); for (String lane : kafkaParameters.getAllLanes()) { - kafkaClientBoundConsumers.put(lane, new KafkaClientBoundConsumer(kafkaParameters, lane)); + kafkaClientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, kafkaParameters, lane)); } - var kafkaTdlibClientsChannels = new KafkaTdlibClientsChannels(kafkaTDLibRequestProducer, + var kafkaTdlibClientsChannels = new TdlibChannelsClients(kafkaTDLibRequestProducer, kafkaTDLibResponseConsumer, kafkaClientBoundConsumers ); - this.kafkaSharedTdlibClients = new KafkaSharedTdlibClients(kafkaTdlibClientsChannels); + this.kafkaSharedTdlibClients = new ClientsSharedTdlib(kafkaTdlibClientsChannels); this.client = new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients); } else { this.kafkaSharedTdlibClients = null; this.client = null; } if (mode != AtomixReactiveApiMode.CLIENT) { - var kafkaTDLibRequestConsumer = new KafkaTdlibRequestConsumer(kafkaParameters); - var kafkaTDLibResponseProducer = new KafkaTdlibResponseProducer(kafkaParameters); - var kafkaClientBoundProducers = new HashMap(); + EventConsumer> kafkaTDLibRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory, kafkaParameters); + EventProducer> kafkaTDLibResponseProducer = ChannelProducerTdlibResponse.create(channelFactory, kafkaParameters); + var kafkaClientBoundProducers = new HashMap>(); for (String lane : kafkaParameters.getAllLanes()) { - kafkaClientBoundProducers.put(lane, new KafkaClientBoundProducer(kafkaParameters, lane)); + kafkaClientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, kafkaParameters, lane)); } - var kafkaTDLibServer = new KafkaTdlibServersChannels(kafkaTDLibRequestConsumer, + var kafkaTDLibServer = new TdlibChannelsServers(kafkaTDLibRequestConsumer, kafkaTDLibResponseProducer, kafkaClientBoundProducers ); - this.kafkaSharedTdlibServers = new KafkaSharedTdlibServers(kafkaTDLibServer); + this.kafkaSharedTdlibServers = new TdlibChannelsSharedServer(kafkaTDLibServer); } else { this.kafkaSharedTdlibServers = null; } diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 1259aeb..3b5bdd0 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -52,7 +52,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { private final AtomicLong requestId = new AtomicLong(0); private final Disposable subscription; - public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients) { + public BaseAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) { this.clientId = System.nanoTime(); this.requests = kafkaSharedTdlibClients.requests(); diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java similarity index 92% rename from src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java rename to src/main/java/it/tdlight/reactiveapi/ChannelCodec.java index 53344d4..2312651 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java @@ -1,6 +1,6 @@ package it.tdlight.reactiveapi; -public enum KafkaChannelCodec { +public enum ChannelCodec { CLIENT_BOUND_EVENT("event", ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class), TDLIB_REQUEST("request", TdlibRequestSerializer.class, TdlibRequestDeserializer.class), TDLIB_RESPONSE("response", TdlibResponseSerializer.class, TdlibResponseDeserializer.class); @@ -9,7 +9,7 @@ public enum KafkaChannelCodec { private final Class serializerClass; private final Class deserializerClass; - KafkaChannelCodec(String kafkaName, + ChannelCodec(String kafkaName, Class serializerClass, Class deserializerClass) { this.name = kafkaName; diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java new file mode 100644 index 0000000..7cb6c49 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java @@ -0,0 +1,22 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; + +public class ChannelConsumerClientBoundEvent { + + private ChannelConsumerClientBoundEvent() { + } + + public static EventConsumer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters, + String lane) { + var codec = ChannelCodec.CLIENT_BOUND_EVENT; + String name; + if (lane.isBlank()) { + name = codec.getKafkaName(); + } else { + name = codec.getKafkaName() + "-" + lane; + } + return channelFactory.newConsumer(kafkaParameters, false, codec, name); + } + +} diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java new file mode 100644 index 0000000..3af9244 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java @@ -0,0 +1,19 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi.Object; +import it.tdlight.reactiveapi.Event.OnRequest; + +public class ChannelConsumerTdlibRequest { + + private ChannelConsumerTdlibRequest() { + } + + public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { + return channelFactory.newConsumer(kafkaParameters, + true, + ChannelCodec.TDLIB_REQUEST, + ChannelCodec.TDLIB_REQUEST.getKafkaName() + ); + } + +} diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java new file mode 100644 index 0000000..d504f89 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java @@ -0,0 +1,18 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi.Object; +import it.tdlight.reactiveapi.Event.OnResponse; + +public class ChannelConsumerTdlibResponse { + + private ChannelConsumerTdlibResponse() { + } + + public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { + return channelFactory.newConsumer(kafkaParameters, + true, + ChannelCodec.TDLIB_RESPONSE, + ChannelCodec.TDLIB_RESPONSE.getKafkaName() + ); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java new file mode 100644 index 0000000..35855c6 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java @@ -0,0 +1,34 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.kafka.KafkaConsumer; +import it.tdlight.reactiveapi.kafka.KafkaProducer; + +public interface ChannelFactory { + + EventConsumer newConsumer(KafkaParameters kafkaParameters, + boolean quickResponse, + ChannelCodec channelCodec, + String channelName); + + EventProducer newProducer(KafkaParameters kafkaParameters, + ChannelCodec channelCodec, + String channelName); + + class KafkaChannelFactory implements ChannelFactory { + + @Override + public EventConsumer newConsumer(KafkaParameters kafkaParameters, + boolean quickResponse, + ChannelCodec channelCodec, + String channelName) { + return new KafkaConsumer<>(kafkaParameters, quickResponse, channelCodec, channelName); + } + + @Override + public EventProducer newProducer(KafkaParameters kafkaParameters, + ChannelCodec channelCodec, + String channelName) { + return new KafkaProducer<>(kafkaParameters, channelCodec, channelName); + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java new file mode 100644 index 0000000..88eea09 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java @@ -0,0 +1,24 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; + +public class ChannelProducerClientBoundEvent { + + private static final ChannelCodec CODEC = ChannelCodec.CLIENT_BOUND_EVENT; + + private ChannelProducerClientBoundEvent() { + } + + public static EventProducer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters, String lane) { + String name; + if (lane.isBlank()) { + name = CODEC.getKafkaName(); + } else { + name = CODEC.getKafkaName() + "-" + lane; + } + return channelFactory.newProducer(kafkaParameters, + CODEC, + name + ); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java new file mode 100644 index 0000000..58acad7 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java @@ -0,0 +1,17 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.OnRequest; + +public class ChannelProducerTdlibRequest { + + private ChannelProducerTdlibRequest() { + } + + public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { + return channelFactory.newProducer(kafkaParameters, + ChannelCodec.TDLIB_REQUEST, + ChannelCodec.TDLIB_REQUEST.getKafkaName() + ); + } + +} diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java new file mode 100644 index 0000000..7d914da --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java @@ -0,0 +1,18 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi.Object; +import it.tdlight.reactiveapi.Event.OnResponse; + +public class ChannelProducerTdlibResponse { + + private ChannelProducerTdlibResponse() { + } + + public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { + return channelFactory.newProducer(kafkaParameters, + ChannelCodec.TDLIB_RESPONSE, + ChannelCodec.TDLIB_RESPONSE.getKafkaName() + ); + } + +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java similarity index 80% rename from src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java rename to src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java index 498afc6..789af0d 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java +++ b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java @@ -1,36 +1,28 @@ package it.tdlight.reactiveapi; -import static java.util.Objects.requireNonNullElse; - import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnResponse; import java.io.Closeable; -import java.time.Duration; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.logging.Level; import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.Disposable; -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; -import reactor.core.publisher.GroupedFlux; -import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; -public class KafkaSharedTdlibClients implements Closeable { +public class ClientsSharedTdlib implements Closeable { - private static final Logger LOG = LogManager.getLogger(KafkaSharedTdlibClients.class); + private static final Logger LOG = LogManager.getLogger(ClientsSharedTdlib.class); - private final KafkaTdlibClientsChannels kafkaTdlibClientsChannels; + private final TdlibChannelsClients kafkaTdlibClientsChannels; private final AtomicReference responsesSub = new AtomicReference<>(); private final Disposable requestsSub; private final AtomicReference eventsSub = new AtomicReference<>(); @@ -39,7 +31,7 @@ public class KafkaSharedTdlibClients implements Closeable { private final Many> requests = Sinks.many().unicast() .onBackpressureBuffer(Queues.>get(65535).get()); - public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) { + public ClientsSharedTdlib(TdlibChannelsClients kafkaTdlibClientsChannels) { this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses"); this.events = kafkaTdlibClientsChannels.events().entrySet().stream() diff --git a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java new file mode 100644 index 0000000..409f2c4 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java @@ -0,0 +1,15 @@ +package it.tdlight.reactiveapi; + +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; + +public interface EventConsumer { + + boolean isQuickResponse(); + + ChannelCodec getChannelCodec(); + + String getChannelName(); + + Flux> consumeMessages(@NotNull String subGroupId); +} diff --git a/src/main/java/it/tdlight/reactiveapi/EventProducer.java b/src/main/java/it/tdlight/reactiveapi/EventProducer.java new file mode 100644 index 0000000..b1f91f5 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/EventProducer.java @@ -0,0 +1,15 @@ +package it.tdlight.reactiveapi; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface EventProducer { + + ChannelCodec getChannelCodec(); + + String getChannelName(); + + Mono sendMessages(Flux eventsFlux); + + void close(); +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java deleted file mode 100644 index 240794a..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java +++ /dev/null @@ -1,36 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.reactiveapi.Event.ClientBoundEvent; - -public class KafkaClientBoundConsumer extends KafkaConsumer { - - private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT; - private final String lane; - private final String name; - - public KafkaClientBoundConsumer(KafkaParameters kafkaParameters, String lane) { - super(kafkaParameters); - this.lane = lane; - if (lane.isBlank()) { - this.name = CODEC.getKafkaName(); - } else { - this.name = CODEC.getKafkaName() + "-" + lane; - } - } - - @Override - public KafkaChannelCodec getChannelCodec() { - return CODEC; - } - - @Override - public String getChannelName() { - return name; - } - - @Override - public boolean isQuickResponse() { - return false; - } - -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java deleted file mode 100644 index fe20e06..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java +++ /dev/null @@ -1,29 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.reactiveapi.Event.ClientBoundEvent; - -public class KafkaClientBoundProducer extends KafkaProducer { - - private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT; - - private final String name; - - public KafkaClientBoundProducer(KafkaParameters kafkaParameters, String lane) { - super(kafkaParameters); - if (lane.isBlank()) { - this.name = CODEC.getKafkaName(); - } else { - this.name = CODEC.getKafkaName() + "-" + lane; - } - } - - @Override - public KafkaChannelCodec getChannelCodec() { - return CODEC; - } - - @Override - public String getChannelName() { - return name; - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java deleted file mode 100644 index 626db35..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java +++ /dev/null @@ -1,14 +0,0 @@ -package it.tdlight.reactiveapi; - -import java.io.Closeable; -import java.util.Map; - -public record KafkaTdlibClientsChannels(KafkaTdlibRequestProducer request, - KafkaTdlibResponseConsumer response, - Map events) implements Closeable { - - @Override - public void close() { - request.close(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java deleted file mode 100644 index 4770670..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java +++ /dev/null @@ -1,27 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.jni.TdApi; -import it.tdlight.reactiveapi.Event.OnRequest; - -public class KafkaTdlibRequestConsumer extends KafkaConsumer> { - - public KafkaTdlibRequestConsumer(KafkaParameters kafkaParameters) { - super(kafkaParameters); - } - - @Override - public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.TDLIB_REQUEST; - } - - @Override - public String getChannelName() { - return getChannelCodec().getKafkaName(); - } - - @Override - public boolean isQuickResponse() { - return true; - } - -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestProducer.java deleted file mode 100644 index ead52d9..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestProducer.java +++ /dev/null @@ -1,20 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.reactiveapi.Event.OnRequest; - -public class KafkaTdlibRequestProducer extends KafkaProducer> { - - public KafkaTdlibRequestProducer(KafkaParameters kafkaParameters) { - super(kafkaParameters); - } - - @Override - public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.TDLIB_REQUEST; - } - - @Override - public String getChannelName() { - return getChannelCodec().getKafkaName(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java deleted file mode 100644 index abb90d2..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java +++ /dev/null @@ -1,27 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.jni.TdApi; -import it.tdlight.reactiveapi.Event.OnResponse; - -public class KafkaTdlibResponseConsumer extends KafkaConsumer> { - - public KafkaTdlibResponseConsumer(KafkaParameters kafkaParameters) { - super(kafkaParameters); - } - - @Override - public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.TDLIB_RESPONSE; - } - - @Override - public String getChannelName() { - return getChannelCodec().getKafkaName(); - } - - @Override - public boolean isQuickResponse() { - return true; - } - -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseProducer.java deleted file mode 100644 index bf9cb84..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseProducer.java +++ /dev/null @@ -1,21 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.jni.TdApi; -import it.tdlight.reactiveapi.Event.OnResponse; - -public class KafkaTdlibResponseProducer extends KafkaProducer> { - - public KafkaTdlibResponseProducer(KafkaParameters kafkaParameters) { - super(kafkaParameters); - } - - @Override - public KafkaChannelCodec getChannelCodec() { - return KafkaChannelCodec.TDLIB_RESPONSE; - } - - @Override - public String getChannelName() { - return getChannelCodec().getKafkaName(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java deleted file mode 100644 index d729b53..0000000 --- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java +++ /dev/null @@ -1,23 +0,0 @@ -package it.tdlight.reactiveapi; - -import java.io.Closeable; -import java.util.Map; - -public record KafkaTdlibServersChannels(KafkaTdlibRequestConsumer request, - KafkaTdlibResponseProducer response, - Map events) implements Closeable { - - public KafkaClientBoundProducer events(String lane) { - var p = events.get(lane); - if (p == null) { - throw new IllegalArgumentException("No lane " + lane); - } - return p; - } - - @Override - public void close() { - response.close(); - events.values().forEach(KafkaProducer::close); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index b819826..2692dd6 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -8,9 +8,9 @@ import reactor.core.publisher.Flux; public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { - private final KafkaSharedTdlibClients kafkaSharedTdlibClients; + private final ClientsSharedTdlib kafkaSharedTdlibClients; - LiveAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients) { + LiveAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) { super(kafkaSharedTdlibClients); this.kafkaSharedTdlibClients = kafkaSharedTdlibClients; } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 51ed63e..7ca3274 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -71,7 +71,7 @@ public abstract class ReactiveApiPublisher { private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5); private static final Duration HUNDRED_MS = Duration.ofMillis(100); - private final KafkaSharedTdlibServers kafkaSharedTdlibServers; + private final TdlibChannelsSharedServer kafkaSharedTdlibServers; private final Set resultingEventTransformerSet; private final ReactiveTelegramClient rawTelegramClient; private final Flux telegramClient; @@ -85,7 +85,7 @@ public abstract class ReactiveApiPublisher { private final AtomicReference disposable = new AtomicReference<>(); private final AtomicReference path = new AtomicReference<>(); - private ReactiveApiPublisher(KafkaSharedTdlibServers kafkaSharedTdlibServers, + private ReactiveApiPublisher(TdlibChannelsSharedServer kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, String lane) { this.kafkaSharedTdlibServers = kafkaSharedTdlibServers; @@ -114,7 +114,7 @@ public abstract class ReactiveApiPublisher { }); } - public static ReactiveApiPublisher fromToken(KafkaSharedTdlibServers kafkaSharedTdlibServers, + public static ReactiveApiPublisher fromToken(TdlibChannelsSharedServer kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, String token, @@ -122,7 +122,7 @@ public abstract class ReactiveApiPublisher { return new ReactiveApiPublisherToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, token, lane); } - public static ReactiveApiPublisher fromPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers, + public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber, @@ -551,7 +551,7 @@ public abstract class ReactiveApiPublisher { private final String botToken; - public ReactiveApiPublisherToken(KafkaSharedTdlibServers kafkaSharedTdlibServers, + public ReactiveApiPublisherToken(TdlibChannelsSharedServer kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, String botToken, @@ -583,7 +583,7 @@ public abstract class ReactiveApiPublisher { private final long phoneNumber; - public ReactiveApiPublisherPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers, + public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber, diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsClients.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsClients.java new file mode 100644 index 0000000..5835293 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsClients.java @@ -0,0 +1,18 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi.Object; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnRequest; +import it.tdlight.reactiveapi.Event.OnResponse; +import java.io.Closeable; +import java.util.Map; + +public record TdlibChannelsClients(EventProducer> request, + EventConsumer> response, + Map> events) implements Closeable { + + @Override + public void close() { + request.close(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsServers.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsServers.java new file mode 100644 index 0000000..1dfa7ce --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsServers.java @@ -0,0 +1,24 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.io.Closeable; +import java.util.Map; + +public record TdlibChannelsServers(EventConsumer> request, + EventProducer> response, + Map> events) implements Closeable { + + public EventProducer events(String lane) { + var p = events.get(lane); + if (p == null) { + throw new IllegalArgumentException("No lane " + lane); + } + return p; + } + + @Override + public void close() { + response.close(); + events.values().forEach(EventProducer::close); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java similarity index 82% rename from src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java rename to src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java index 67cab9b..e561d82 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java @@ -1,37 +1,31 @@ package it.tdlight.reactiveapi; -import static java.util.Objects.requireNonNullElse; - import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnResponse; import java.io.Closeable; -import java.time.Duration; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; import java.util.logging.Level; import reactor.core.Disposable; -import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; -import reactor.core.publisher.GroupedFlux; import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; -public class KafkaSharedTdlibServers implements Closeable { +public class TdlibChannelsSharedServer implements Closeable { - private final KafkaTdlibServersChannels kafkaTdlibServersChannels; + private final TdlibChannelsServers kafkaTdlibServersChannels; private final Disposable responsesSub; private final AtomicReference requestsSub = new AtomicReference<>(); private final Many> responses = Sinks.many().unicast().onBackpressureBuffer( Queues.>get(65535).get()); private final Flux>> requests; - public KafkaSharedTdlibServers(KafkaTdlibServersChannels kafkaTdlibServersChannels) { + public TdlibChannelsSharedServer(TdlibChannelsServers kafkaTdlibServersChannels) { this.kafkaTdlibServersChannels = kafkaTdlibServersChannels; this.responsesSub = kafkaTdlibServersChannels.response() .sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java similarity index 81% rename from src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java rename to src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java index e1bdebb..9928132 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java @@ -1,16 +1,20 @@ -package it.tdlight.reactiveapi; +package it.tdlight.reactiveapi.kafka; import static java.lang.Math.toIntExact; import it.tdlight.common.Init; import it.tdlight.common.utils.CantLoadLibrary; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.KafkaParameters; +import it.tdlight.reactiveapi.ReactorUtils; +import it.tdlight.reactiveapi.Timestamped; import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.logging.Level; -import java.util.regex.Pattern; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.errors.RebalanceInProgressException; @@ -19,21 +23,29 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.SignalType; import reactor.kafka.receiver.KafkaReceiver; import reactor.kafka.receiver.ReceiverOptions; import reactor.util.retry.Retry; -public abstract class KafkaConsumer { +public final class KafkaConsumer implements EventConsumer { private static final Logger LOG = LogManager.getLogger(KafkaConsumer.class); private final KafkaParameters kafkaParameters; + private final boolean quickResponse; + private final ChannelCodec channelCodec; + private final String channelName; - public KafkaConsumer(KafkaParameters kafkaParameters) { + public KafkaConsumer(KafkaParameters kafkaParameters, + boolean quickResponse, + ChannelCodec channelCodec, + String channelName) { this.kafkaParameters = kafkaParameters; + this.quickResponse = quickResponse; + this.channelCodec = channelCodec; + this.channelName = channelName; } public KafkaReceiver createReceiver(@NotNull String kafkaGroupId) { @@ -69,13 +81,22 @@ public abstract class KafkaConsumer { return KafkaReceiver.create(options); } - public abstract KafkaChannelCodec getChannelCodec(); + @Override + public boolean isQuickResponse() { + return quickResponse; + } - public abstract String getChannelName(); + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } - public abstract boolean isQuickResponse(); + @Override + public String getChannelName() { + return channelName; + } - protected Flux> retryIfCleanup(Flux> eventFlux) { + public Flux> retryIfCleanup(Flux> eventFlux) { return eventFlux.retryWhen(Retry .backoff(Long.MAX_VALUE, Duration.ofMillis(100)) .maxBackoff(Duration.ofSeconds(5)) @@ -84,7 +105,7 @@ public abstract class KafkaConsumer { .doBeforeRetry(s -> LOG.warn("Rebalancing in progress"))); } - protected Flux> retryIfCommitFailed(Flux> eventFlux) { + public Flux> retryIfCommitFailed(Flux> eventFlux) { return eventFlux.retryWhen(Retry .backoff(10, Duration.ofSeconds(1)) .maxBackoff(Duration.ofSeconds(5)) @@ -98,6 +119,7 @@ public abstract class KafkaConsumer { + " with max.poll.records."))); } + @Override public Flux> consumeMessages(@NotNull String subGroupId) { return consumeMessagesInternal(subGroupId); } diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java similarity index 75% rename from src/main/java/it/tdlight/reactiveapi/KafkaProducer.java rename to src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java index 028cc58..861e4d4 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java @@ -1,5 +1,8 @@ -package it.tdlight.reactiveapi; +package it.tdlight.reactiveapi.kafka; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.KafkaParameters; import java.util.HashMap; import java.util.Map; import java.util.logging.Level; @@ -15,13 +18,17 @@ import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; -public abstract class KafkaProducer { +public final class KafkaProducer implements EventProducer { private static final Logger LOG = LogManager.getLogger(KafkaProducer.class); private final KafkaSender sender; + private final ChannelCodec channelCodec; + private final String channelName; - public KafkaProducer(KafkaParameters kafkaParameters) { + public KafkaProducer(KafkaParameters kafkaParameters, ChannelCodec channelCodec, String channelName) { + this.channelCodec = channelCodec; + this.channelName = channelName; Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); @@ -36,10 +43,17 @@ public abstract class KafkaProducer { sender = KafkaSender.create(senderOptions.maxInFlight(1024)); } - public abstract KafkaChannelCodec getChannelCodec(); + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } - public abstract String getChannelName(); + @Override + public String getChannelName() { + return channelName; + } + @Override public Mono sendMessages(Flux eventsFlux) { var channelName = getChannelName(); return eventsFlux @@ -57,6 +71,7 @@ public abstract class KafkaProducer { .then(); } + @Override public void close() { sender.close(); }