diff --git a/pom.xml b/pom.xml
index 4a803e2..8f2d48a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -60,7 +60,14 @@
io.projectreactor
reactor-bom
- 2020.0.22
+ 2020.0.23
+ pom
+ import
+
+
+ io.rsocket
+ rsocket-bom
+ 1.1.3
pom
import
@@ -72,7 +79,7 @@
reactor-tools
original
runtime
- 3.4.22
+ 3.4.23
org.jetbrains
@@ -127,6 +134,22 @@
io.projectreactor.kafka
reactor-kafka
+
+ io.rsocket
+ rsocket-core
+
+
+ io.rsocket
+ rsocket-load-balancer
+
+
+ io.rsocket
+ rsocket-transport-local
+
+
+ io.rsocket
+ rsocket-transport-netty
+
org.apache.logging.log4j
log4j-api
@@ -151,6 +174,12 @@
net.minecrell
terminalconsoleappender
1.3.0
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
index c7d2be8..fc2c5fc 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
@@ -2,9 +2,14 @@ package it.tdlight.reactiveapi;
import static java.util.Objects.requireNonNull;
+import it.tdlight.jni.TdApi.Object;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
+import it.tdlight.reactiveapi.ChannelFactory.KafkaChannelFactory;
+import it.tdlight.reactiveapi.Event.ClientBoundEvent;
+import it.tdlight.reactiveapi.Event.OnRequest;
+import it.tdlight.reactiveapi.Event.OnResponse;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -31,9 +36,9 @@ public class AtomixReactiveApi implements ReactiveApi {
private final AtomixReactiveApiMode mode;
- private final KafkaSharedTdlibClients kafkaSharedTdlibClients;
+ private final ClientsSharedTdlib kafkaSharedTdlibClients;
@Nullable
- private final KafkaSharedTdlibServers kafkaSharedTdlibServers;
+ private final TdlibChannelsSharedServer kafkaSharedTdlibServers;
private final ReactiveApiMultiClient client;
private final Set resultingEventTransformerSet;
@@ -60,35 +65,36 @@ public class AtomixReactiveApi implements ReactiveApi {
@Nullable DiskSessionsManager diskSessions,
@NotNull Set resultingEventTransformerSet) {
this.mode = mode;
+ ChannelFactory channelFactory = new KafkaChannelFactory();
if (mode != AtomixReactiveApiMode.SERVER) {
- var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters);
- var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters);
- var kafkaClientBoundConsumers = new HashMap();
+ EventProducer> kafkaTDLibRequestProducer = ChannelProducerTdlibRequest.create(channelFactory, kafkaParameters);
+ EventConsumer> kafkaTDLibResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory, kafkaParameters);
+ HashMap> kafkaClientBoundConsumers = new HashMap<>();
for (String lane : kafkaParameters.getAllLanes()) {
- kafkaClientBoundConsumers.put(lane, new KafkaClientBoundConsumer(kafkaParameters, lane));
+ kafkaClientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, kafkaParameters, lane));
}
- var kafkaTdlibClientsChannels = new KafkaTdlibClientsChannels(kafkaTDLibRequestProducer,
+ var kafkaTdlibClientsChannels = new TdlibChannelsClients(kafkaTDLibRequestProducer,
kafkaTDLibResponseConsumer,
kafkaClientBoundConsumers
);
- this.kafkaSharedTdlibClients = new KafkaSharedTdlibClients(kafkaTdlibClientsChannels);
+ this.kafkaSharedTdlibClients = new ClientsSharedTdlib(kafkaTdlibClientsChannels);
this.client = new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients);
} else {
this.kafkaSharedTdlibClients = null;
this.client = null;
}
if (mode != AtomixReactiveApiMode.CLIENT) {
- var kafkaTDLibRequestConsumer = new KafkaTdlibRequestConsumer(kafkaParameters);
- var kafkaTDLibResponseProducer = new KafkaTdlibResponseProducer(kafkaParameters);
- var kafkaClientBoundProducers = new HashMap();
+ EventConsumer> kafkaTDLibRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory, kafkaParameters);
+ EventProducer> kafkaTDLibResponseProducer = ChannelProducerTdlibResponse.create(channelFactory, kafkaParameters);
+ var kafkaClientBoundProducers = new HashMap>();
for (String lane : kafkaParameters.getAllLanes()) {
- kafkaClientBoundProducers.put(lane, new KafkaClientBoundProducer(kafkaParameters, lane));
+ kafkaClientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, kafkaParameters, lane));
}
- var kafkaTDLibServer = new KafkaTdlibServersChannels(kafkaTDLibRequestConsumer,
+ var kafkaTDLibServer = new TdlibChannelsServers(kafkaTDLibRequestConsumer,
kafkaTDLibResponseProducer,
kafkaClientBoundProducers
);
- this.kafkaSharedTdlibServers = new KafkaSharedTdlibServers(kafkaTDLibServer);
+ this.kafkaSharedTdlibServers = new TdlibChannelsSharedServer(kafkaTDLibServer);
} else {
this.kafkaSharedTdlibServers = null;
}
diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
index 1259aeb..3b5bdd0 100644
--- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
@@ -52,7 +52,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
private final AtomicLong requestId = new AtomicLong(0);
private final Disposable subscription;
- public BaseAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients) {
+ public BaseAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) {
this.clientId = System.nanoTime();
this.requests = kafkaSharedTdlibClients.requests();
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
similarity index 92%
rename from src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java
rename to src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
index 53344d4..2312651 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaChannelCodec.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
@@ -1,6 +1,6 @@
package it.tdlight.reactiveapi;
-public enum KafkaChannelCodec {
+public enum ChannelCodec {
CLIENT_BOUND_EVENT("event", ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class),
TDLIB_REQUEST("request", TdlibRequestSerializer.class, TdlibRequestDeserializer.class),
TDLIB_RESPONSE("response", TdlibResponseSerializer.class, TdlibResponseDeserializer.class);
@@ -9,7 +9,7 @@ public enum KafkaChannelCodec {
private final Class> serializerClass;
private final Class> deserializerClass;
- KafkaChannelCodec(String kafkaName,
+ ChannelCodec(String kafkaName,
Class> serializerClass,
Class> deserializerClass) {
this.name = kafkaName;
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java
new file mode 100644
index 0000000..7cb6c49
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java
@@ -0,0 +1,22 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.Event.ClientBoundEvent;
+
+public class ChannelConsumerClientBoundEvent {
+
+ private ChannelConsumerClientBoundEvent() {
+ }
+
+ public static EventConsumer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters,
+ String lane) {
+ var codec = ChannelCodec.CLIENT_BOUND_EVENT;
+ String name;
+ if (lane.isBlank()) {
+ name = codec.getKafkaName();
+ } else {
+ name = codec.getKafkaName() + "-" + lane;
+ }
+ return channelFactory.newConsumer(kafkaParameters, false, codec, name);
+ }
+
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java
new file mode 100644
index 0000000..3af9244
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java
@@ -0,0 +1,19 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.jni.TdApi.Object;
+import it.tdlight.reactiveapi.Event.OnRequest;
+
+public class ChannelConsumerTdlibRequest {
+
+ private ChannelConsumerTdlibRequest() {
+ }
+
+ public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
+ return channelFactory.newConsumer(kafkaParameters,
+ true,
+ ChannelCodec.TDLIB_REQUEST,
+ ChannelCodec.TDLIB_REQUEST.getKafkaName()
+ );
+ }
+
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java
new file mode 100644
index 0000000..d504f89
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java
@@ -0,0 +1,18 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.jni.TdApi.Object;
+import it.tdlight.reactiveapi.Event.OnResponse;
+
+public class ChannelConsumerTdlibResponse {
+
+ private ChannelConsumerTdlibResponse() {
+ }
+
+ public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
+ return channelFactory.newConsumer(kafkaParameters,
+ true,
+ ChannelCodec.TDLIB_RESPONSE,
+ ChannelCodec.TDLIB_RESPONSE.getKafkaName()
+ );
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java
new file mode 100644
index 0000000..35855c6
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java
@@ -0,0 +1,34 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.kafka.KafkaConsumer;
+import it.tdlight.reactiveapi.kafka.KafkaProducer;
+
+public interface ChannelFactory {
+
+ EventConsumer newConsumer(KafkaParameters kafkaParameters,
+ boolean quickResponse,
+ ChannelCodec channelCodec,
+ String channelName);
+
+ EventProducer newProducer(KafkaParameters kafkaParameters,
+ ChannelCodec channelCodec,
+ String channelName);
+
+ class KafkaChannelFactory implements ChannelFactory {
+
+ @Override
+ public EventConsumer newConsumer(KafkaParameters kafkaParameters,
+ boolean quickResponse,
+ ChannelCodec channelCodec,
+ String channelName) {
+ return new KafkaConsumer<>(kafkaParameters, quickResponse, channelCodec, channelName);
+ }
+
+ @Override
+ public EventProducer newProducer(KafkaParameters kafkaParameters,
+ ChannelCodec channelCodec,
+ String channelName) {
+ return new KafkaProducer<>(kafkaParameters, channelCodec, channelName);
+ }
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java
new file mode 100644
index 0000000..88eea09
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java
@@ -0,0 +1,24 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.Event.ClientBoundEvent;
+
+public class ChannelProducerClientBoundEvent {
+
+ private static final ChannelCodec CODEC = ChannelCodec.CLIENT_BOUND_EVENT;
+
+ private ChannelProducerClientBoundEvent() {
+ }
+
+ public static EventProducer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters, String lane) {
+ String name;
+ if (lane.isBlank()) {
+ name = CODEC.getKafkaName();
+ } else {
+ name = CODEC.getKafkaName() + "-" + lane;
+ }
+ return channelFactory.newProducer(kafkaParameters,
+ CODEC,
+ name
+ );
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java
new file mode 100644
index 0000000..58acad7
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java
@@ -0,0 +1,17 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.Event.OnRequest;
+
+public class ChannelProducerTdlibRequest {
+
+ private ChannelProducerTdlibRequest() {
+ }
+
+ public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
+ return channelFactory.newProducer(kafkaParameters,
+ ChannelCodec.TDLIB_REQUEST,
+ ChannelCodec.TDLIB_REQUEST.getKafkaName()
+ );
+ }
+
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java
new file mode 100644
index 0000000..7d914da
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java
@@ -0,0 +1,18 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.jni.TdApi.Object;
+import it.tdlight.reactiveapi.Event.OnResponse;
+
+public class ChannelProducerTdlibResponse {
+
+ private ChannelProducerTdlibResponse() {
+ }
+
+ public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
+ return channelFactory.newProducer(kafkaParameters,
+ ChannelCodec.TDLIB_RESPONSE,
+ ChannelCodec.TDLIB_RESPONSE.getKafkaName()
+ );
+ }
+
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java
similarity index 80%
rename from src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java
rename to src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java
index 498afc6..789af0d 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibClients.java
+++ b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java
@@ -1,36 +1,28 @@
package it.tdlight.reactiveapi;
-import static java.util.Objects.requireNonNullElse;
-
import it.tdlight.jni.TdApi.Object;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnRequest;
import it.tdlight.reactiveapi.Event.OnResponse;
import java.io.Closeable;
-import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
-import java.util.logging.Level;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.Disposable;
-import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.GroupedFlux;
-import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
-public class KafkaSharedTdlibClients implements Closeable {
+public class ClientsSharedTdlib implements Closeable {
- private static final Logger LOG = LogManager.getLogger(KafkaSharedTdlibClients.class);
+ private static final Logger LOG = LogManager.getLogger(ClientsSharedTdlib.class);
- private final KafkaTdlibClientsChannels kafkaTdlibClientsChannels;
+ private final TdlibChannelsClients kafkaTdlibClientsChannels;
private final AtomicReference responsesSub = new AtomicReference<>();
private final Disposable requestsSub;
private final AtomicReference eventsSub = new AtomicReference<>();
@@ -39,7 +31,7 @@ public class KafkaSharedTdlibClients implements Closeable {
private final Many> requests = Sinks.many().unicast()
.onBackpressureBuffer(Queues.>get(65535).get());
- public KafkaSharedTdlibClients(KafkaTdlibClientsChannels kafkaTdlibClientsChannels) {
+ public ClientsSharedTdlib(TdlibChannelsClients kafkaTdlibClientsChannels) {
this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels;
this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses");
this.events = kafkaTdlibClientsChannels.events().entrySet().stream()
diff --git a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java
new file mode 100644
index 0000000..409f2c4
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java
@@ -0,0 +1,15 @@
+package it.tdlight.reactiveapi;
+
+import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Flux;
+
+public interface EventConsumer {
+
+ boolean isQuickResponse();
+
+ ChannelCodec getChannelCodec();
+
+ String getChannelName();
+
+ Flux> consumeMessages(@NotNull String subGroupId);
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/EventProducer.java b/src/main/java/it/tdlight/reactiveapi/EventProducer.java
new file mode 100644
index 0000000..b1f91f5
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/EventProducer.java
@@ -0,0 +1,15 @@
+package it.tdlight.reactiveapi;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface EventProducer {
+
+ ChannelCodec getChannelCodec();
+
+ String getChannelName();
+
+ Mono sendMessages(Flux eventsFlux);
+
+ void close();
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java
deleted file mode 100644
index 240794a..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundConsumer.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import it.tdlight.reactiveapi.Event.ClientBoundEvent;
-
-public class KafkaClientBoundConsumer extends KafkaConsumer {
-
- private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT;
- private final String lane;
- private final String name;
-
- public KafkaClientBoundConsumer(KafkaParameters kafkaParameters, String lane) {
- super(kafkaParameters);
- this.lane = lane;
- if (lane.isBlank()) {
- this.name = CODEC.getKafkaName();
- } else {
- this.name = CODEC.getKafkaName() + "-" + lane;
- }
- }
-
- @Override
- public KafkaChannelCodec getChannelCodec() {
- return CODEC;
- }
-
- @Override
- public String getChannelName() {
- return name;
- }
-
- @Override
- public boolean isQuickResponse() {
- return false;
- }
-
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java
deleted file mode 100644
index fe20e06..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaClientBoundProducer.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import it.tdlight.reactiveapi.Event.ClientBoundEvent;
-
-public class KafkaClientBoundProducer extends KafkaProducer {
-
- private static final KafkaChannelCodec CODEC = KafkaChannelCodec.CLIENT_BOUND_EVENT;
-
- private final String name;
-
- public KafkaClientBoundProducer(KafkaParameters kafkaParameters, String lane) {
- super(kafkaParameters);
- if (lane.isBlank()) {
- this.name = CODEC.getKafkaName();
- } else {
- this.name = CODEC.getKafkaName() + "-" + lane;
- }
- }
-
- @Override
- public KafkaChannelCodec getChannelCodec() {
- return CODEC;
- }
-
- @Override
- public String getChannelName() {
- return name;
- }
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java
deleted file mode 100644
index 626db35..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibClientsChannels.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import java.io.Closeable;
-import java.util.Map;
-
-public record KafkaTdlibClientsChannels(KafkaTdlibRequestProducer request,
- KafkaTdlibResponseConsumer response,
- Map events) implements Closeable {
-
- @Override
- public void close() {
- request.close();
- }
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java
deleted file mode 100644
index 4770670..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestConsumer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import it.tdlight.jni.TdApi;
-import it.tdlight.reactiveapi.Event.OnRequest;
-
-public class KafkaTdlibRequestConsumer extends KafkaConsumer> {
-
- public KafkaTdlibRequestConsumer(KafkaParameters kafkaParameters) {
- super(kafkaParameters);
- }
-
- @Override
- public KafkaChannelCodec getChannelCodec() {
- return KafkaChannelCodec.TDLIB_REQUEST;
- }
-
- @Override
- public String getChannelName() {
- return getChannelCodec().getKafkaName();
- }
-
- @Override
- public boolean isQuickResponse() {
- return true;
- }
-
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestProducer.java
deleted file mode 100644
index ead52d9..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibRequestProducer.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import it.tdlight.reactiveapi.Event.OnRequest;
-
-public class KafkaTdlibRequestProducer extends KafkaProducer> {
-
- public KafkaTdlibRequestProducer(KafkaParameters kafkaParameters) {
- super(kafkaParameters);
- }
-
- @Override
- public KafkaChannelCodec getChannelCodec() {
- return KafkaChannelCodec.TDLIB_REQUEST;
- }
-
- @Override
- public String getChannelName() {
- return getChannelCodec().getKafkaName();
- }
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java
deleted file mode 100644
index abb90d2..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseConsumer.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import it.tdlight.jni.TdApi;
-import it.tdlight.reactiveapi.Event.OnResponse;
-
-public class KafkaTdlibResponseConsumer extends KafkaConsumer> {
-
- public KafkaTdlibResponseConsumer(KafkaParameters kafkaParameters) {
- super(kafkaParameters);
- }
-
- @Override
- public KafkaChannelCodec getChannelCodec() {
- return KafkaChannelCodec.TDLIB_RESPONSE;
- }
-
- @Override
- public String getChannelName() {
- return getChannelCodec().getKafkaName();
- }
-
- @Override
- public boolean isQuickResponse() {
- return true;
- }
-
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseProducer.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseProducer.java
deleted file mode 100644
index bf9cb84..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibResponseProducer.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import it.tdlight.jni.TdApi;
-import it.tdlight.reactiveapi.Event.OnResponse;
-
-public class KafkaTdlibResponseProducer extends KafkaProducer> {
-
- public KafkaTdlibResponseProducer(KafkaParameters kafkaParameters) {
- super(kafkaParameters);
- }
-
- @Override
- public KafkaChannelCodec getChannelCodec() {
- return KafkaChannelCodec.TDLIB_RESPONSE;
- }
-
- @Override
- public String getChannelName() {
- return getChannelCodec().getKafkaName();
- }
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java b/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java
deleted file mode 100644
index d729b53..0000000
--- a/src/main/java/it/tdlight/reactiveapi/KafkaTdlibServersChannels.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import java.io.Closeable;
-import java.util.Map;
-
-public record KafkaTdlibServersChannels(KafkaTdlibRequestConsumer request,
- KafkaTdlibResponseProducer response,
- Map events) implements Closeable {
-
- public KafkaClientBoundProducer events(String lane) {
- var p = events.get(lane);
- if (p == null) {
- throw new IllegalArgumentException("No lane " + lane);
- }
- return p;
- }
-
- @Override
- public void close() {
- response.close();
- events.values().forEach(KafkaProducer::close);
- }
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
index b819826..2692dd6 100644
--- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
@@ -8,9 +8,9 @@ import reactor.core.publisher.Flux;
public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {
- private final KafkaSharedTdlibClients kafkaSharedTdlibClients;
+ private final ClientsSharedTdlib kafkaSharedTdlibClients;
- LiveAtomixReactiveApiClient(KafkaSharedTdlibClients kafkaSharedTdlibClients) {
+ LiveAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) {
super(kafkaSharedTdlibClients);
this.kafkaSharedTdlibClients = kafkaSharedTdlibClients;
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
index 51ed63e..7ca3274 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
@@ -71,7 +71,7 @@ public abstract class ReactiveApiPublisher {
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5);
private static final Duration HUNDRED_MS = Duration.ofMillis(100);
- private final KafkaSharedTdlibServers kafkaSharedTdlibServers;
+ private final TdlibChannelsSharedServer kafkaSharedTdlibServers;
private final Set resultingEventTransformerSet;
private final ReactiveTelegramClient rawTelegramClient;
private final Flux telegramClient;
@@ -85,7 +85,7 @@ public abstract class ReactiveApiPublisher {
private final AtomicReference disposable = new AtomicReference<>();
private final AtomicReference path = new AtomicReference<>();
- private ReactiveApiPublisher(KafkaSharedTdlibServers kafkaSharedTdlibServers,
+ private ReactiveApiPublisher(TdlibChannelsSharedServer kafkaSharedTdlibServers,
Set resultingEventTransformerSet,
long userId, String lane) {
this.kafkaSharedTdlibServers = kafkaSharedTdlibServers;
@@ -114,7 +114,7 @@ public abstract class ReactiveApiPublisher {
});
}
- public static ReactiveApiPublisher fromToken(KafkaSharedTdlibServers kafkaSharedTdlibServers,
+ public static ReactiveApiPublisher fromToken(TdlibChannelsSharedServer kafkaSharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
String token,
@@ -122,7 +122,7 @@ public abstract class ReactiveApiPublisher {
return new ReactiveApiPublisherToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, token, lane);
}
- public static ReactiveApiPublisher fromPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers,
+ public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
long phoneNumber,
@@ -551,7 +551,7 @@ public abstract class ReactiveApiPublisher {
private final String botToken;
- public ReactiveApiPublisherToken(KafkaSharedTdlibServers kafkaSharedTdlibServers,
+ public ReactiveApiPublisherToken(TdlibChannelsSharedServer kafkaSharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
String botToken,
@@ -583,7 +583,7 @@ public abstract class ReactiveApiPublisher {
private final long phoneNumber;
- public ReactiveApiPublisherPhoneNumber(KafkaSharedTdlibServers kafkaSharedTdlibServers,
+ public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
long phoneNumber,
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsClients.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsClients.java
new file mode 100644
index 0000000..5835293
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsClients.java
@@ -0,0 +1,18 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.jni.TdApi.Object;
+import it.tdlight.reactiveapi.Event.ClientBoundEvent;
+import it.tdlight.reactiveapi.Event.OnRequest;
+import it.tdlight.reactiveapi.Event.OnResponse;
+import java.io.Closeable;
+import java.util.Map;
+
+public record TdlibChannelsClients(EventProducer> request,
+ EventConsumer> response,
+ Map> events) implements Closeable {
+
+ @Override
+ public void close() {
+ request.close();
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsServers.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsServers.java
new file mode 100644
index 0000000..1dfa7ce
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsServers.java
@@ -0,0 +1,24 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.Event.ClientBoundEvent;
+import java.io.Closeable;
+import java.util.Map;
+
+public record TdlibChannelsServers(EventConsumer> request,
+ EventProducer> response,
+ Map> events) implements Closeable {
+
+ public EventProducer events(String lane) {
+ var p = events.get(lane);
+ if (p == null) {
+ throw new IllegalArgumentException("No lane " + lane);
+ }
+ return p;
+ }
+
+ @Override
+ public void close() {
+ response.close();
+ events.values().forEach(EventProducer::close);
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java
similarity index 82%
rename from src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java
rename to src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java
index 67cab9b..e561d82 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaSharedTdlibServers.java
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java
@@ -1,37 +1,31 @@
package it.tdlight.reactiveapi;
-import static java.util.Objects.requireNonNullElse;
-
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnRequest;
import it.tdlight.reactiveapi.Event.OnResponse;
import java.io.Closeable;
-import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
import java.util.logging.Level;
import reactor.core.Disposable;
-import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
-public class KafkaSharedTdlibServers implements Closeable {
+public class TdlibChannelsSharedServer implements Closeable {
- private final KafkaTdlibServersChannels kafkaTdlibServersChannels;
+ private final TdlibChannelsServers kafkaTdlibServersChannels;
private final Disposable responsesSub;
private final AtomicReference requestsSub = new AtomicReference<>();
private final Many> responses = Sinks.many().unicast().onBackpressureBuffer(
Queues.>get(65535).get());
private final Flux>> requests;
- public KafkaSharedTdlibServers(KafkaTdlibServersChannels kafkaTdlibServersChannels) {
+ public TdlibChannelsSharedServer(TdlibChannelsServers kafkaTdlibServersChannels) {
this.kafkaTdlibServersChannels = kafkaTdlibServersChannels;
this.responsesSub = kafkaTdlibServersChannels.response()
.sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT))
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java
similarity index 81%
rename from src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java
rename to src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java
index e1bdebb..9928132 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaConsumer.java
+++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java
@@ -1,16 +1,20 @@
-package it.tdlight.reactiveapi;
+package it.tdlight.reactiveapi.kafka;
import static java.lang.Math.toIntExact;
import it.tdlight.common.Init;
import it.tdlight.common.utils.CantLoadLibrary;
+import it.tdlight.reactiveapi.EventConsumer;
+import it.tdlight.reactiveapi.ChannelCodec;
+import it.tdlight.reactiveapi.KafkaParameters;
+import it.tdlight.reactiveapi.ReactorUtils;
+import it.tdlight.reactiveapi.Timestamped;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.logging.Level;
-import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -19,21 +23,29 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.util.retry.Retry;
-public abstract class KafkaConsumer {
+public final class KafkaConsumer implements EventConsumer {
private static final Logger LOG = LogManager.getLogger(KafkaConsumer.class);
private final KafkaParameters kafkaParameters;
+ private final boolean quickResponse;
+ private final ChannelCodec channelCodec;
+ private final String channelName;
- public KafkaConsumer(KafkaParameters kafkaParameters) {
+ public KafkaConsumer(KafkaParameters kafkaParameters,
+ boolean quickResponse,
+ ChannelCodec channelCodec,
+ String channelName) {
this.kafkaParameters = kafkaParameters;
+ this.quickResponse = quickResponse;
+ this.channelCodec = channelCodec;
+ this.channelName = channelName;
}
public KafkaReceiver createReceiver(@NotNull String kafkaGroupId) {
@@ -69,13 +81,22 @@ public abstract class KafkaConsumer {
return KafkaReceiver.create(options);
}
- public abstract KafkaChannelCodec getChannelCodec();
+ @Override
+ public boolean isQuickResponse() {
+ return quickResponse;
+ }
- public abstract String getChannelName();
+ @Override
+ public ChannelCodec getChannelCodec() {
+ return channelCodec;
+ }
- public abstract boolean isQuickResponse();
+ @Override
+ public String getChannelName() {
+ return channelName;
+ }
- protected Flux> retryIfCleanup(Flux> eventFlux) {
+ public Flux> retryIfCleanup(Flux> eventFlux) {
return eventFlux.retryWhen(Retry
.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(5))
@@ -84,7 +105,7 @@ public abstract class KafkaConsumer {
.doBeforeRetry(s -> LOG.warn("Rebalancing in progress")));
}
- protected Flux> retryIfCommitFailed(Flux> eventFlux) {
+ public Flux> retryIfCommitFailed(Flux> eventFlux) {
return eventFlux.retryWhen(Retry
.backoff(10, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(5))
@@ -98,6 +119,7 @@ public abstract class KafkaConsumer {
+ " with max.poll.records.")));
}
+ @Override
public Flux> consumeMessages(@NotNull String subGroupId) {
return consumeMessagesInternal(subGroupId);
}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java
similarity index 75%
rename from src/main/java/it/tdlight/reactiveapi/KafkaProducer.java
rename to src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java
index 028cc58..861e4d4 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaProducer.java
+++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java
@@ -1,5 +1,8 @@
-package it.tdlight.reactiveapi;
+package it.tdlight.reactiveapi.kafka;
+import it.tdlight.reactiveapi.ChannelCodec;
+import it.tdlight.reactiveapi.EventProducer;
+import it.tdlight.reactiveapi.KafkaParameters;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
@@ -15,13 +18,17 @@ import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
-public abstract class KafkaProducer {
+public final class KafkaProducer implements EventProducer {
private static final Logger LOG = LogManager.getLogger(KafkaProducer.class);
private final KafkaSender sender;
+ private final ChannelCodec channelCodec;
+ private final String channelName;
- public KafkaProducer(KafkaParameters kafkaParameters) {
+ public KafkaProducer(KafkaParameters kafkaParameters, ChannelCodec channelCodec, String channelName) {
+ this.channelCodec = channelCodec;
+ this.channelName = channelName;
Map props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId());
@@ -36,10 +43,17 @@ public abstract class KafkaProducer {
sender = KafkaSender.create(senderOptions.maxInFlight(1024));
}
- public abstract KafkaChannelCodec getChannelCodec();
+ @Override
+ public ChannelCodec getChannelCodec() {
+ return channelCodec;
+ }
- public abstract String getChannelName();
+ @Override
+ public String getChannelName() {
+ return channelName;
+ }
+ @Override
public Mono sendMessages(Flux eventsFlux) {
var channelName = getChannelName();
return eventsFlux
@@ -57,6 +71,7 @@ public abstract class KafkaProducer {
.then();
}
+ @Override
public void close() {
sender.close();
}