diff --git a/pom.xml b/pom.xml
index 8f2d48a..6586969 100644
--- a/pom.xml
+++ b/pom.xml
@@ -155,6 +155,18 @@
log4j-api
2.19.0
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.19.0
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j20-impl
+ 2.18.1-SNAPSHOT
+ test
+
com.lmax
disruptor
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
index fc2c5fc..6564681 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
@@ -6,7 +6,6 @@ import it.tdlight.jni.TdApi.Object;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
-import it.tdlight.reactiveapi.ChannelFactory.KafkaChannelFactory;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnRequest;
import it.tdlight.reactiveapi.Event.OnResponse;
@@ -36,9 +35,9 @@ public class AtomixReactiveApi implements ReactiveApi {
private final AtomixReactiveApiMode mode;
- private final ClientsSharedTdlib kafkaSharedTdlibClients;
+ private final ClientsSharedTdlib sharedTdlibClients;
@Nullable
- private final TdlibChannelsSharedServer kafkaSharedTdlibServers;
+ private final TdlibChannelsSharedServer sharedTdlibServers;
private final ReactiveApiMultiClient client;
private final Set resultingEventTransformerSet;
@@ -61,42 +60,42 @@ public class AtomixReactiveApi implements ReactiveApi {
}
public AtomixReactiveApi(AtomixReactiveApiMode mode,
- KafkaParameters kafkaParameters,
+ ChannelsParameters channelsParameters,
@Nullable DiskSessionsManager diskSessions,
@NotNull Set resultingEventTransformerSet) {
this.mode = mode;
- ChannelFactory channelFactory = new KafkaChannelFactory();
+ ChannelFactory channelFactory = ChannelFactory.getFactoryFromParameters(channelsParameters);
if (mode != AtomixReactiveApiMode.SERVER) {
- EventProducer> kafkaTDLibRequestProducer = ChannelProducerTdlibRequest.create(channelFactory, kafkaParameters);
- EventConsumer> kafkaTDLibResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory, kafkaParameters);
- HashMap> kafkaClientBoundConsumers = new HashMap<>();
- for (String lane : kafkaParameters.getAllLanes()) {
- kafkaClientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, kafkaParameters, lane));
+ EventProducer> tdRequestProducer = ChannelProducerTdlibRequest.create(channelFactory, channelsParameters);
+ EventConsumer> tdResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory, channelsParameters);
+ HashMap> clientBoundConsumers = new HashMap<>();
+ for (String lane : channelsParameters.getAllLanes()) {
+ clientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, channelsParameters, lane));
}
- var kafkaTdlibClientsChannels = new TdlibChannelsClients(kafkaTDLibRequestProducer,
- kafkaTDLibResponseConsumer,
- kafkaClientBoundConsumers
+ var tdClientsChannels = new TdlibChannelsClients(tdRequestProducer,
+ tdResponseConsumer,
+ clientBoundConsumers
);
- this.kafkaSharedTdlibClients = new ClientsSharedTdlib(kafkaTdlibClientsChannels);
- this.client = new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients);
+ this.sharedTdlibClients = new ClientsSharedTdlib(tdClientsChannels);
+ this.client = new LiveAtomixReactiveApiClient(sharedTdlibClients);
} else {
- this.kafkaSharedTdlibClients = null;
+ this.sharedTdlibClients = null;
this.client = null;
}
if (mode != AtomixReactiveApiMode.CLIENT) {
- EventConsumer> kafkaTDLibRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory, kafkaParameters);
- EventProducer> kafkaTDLibResponseProducer = ChannelProducerTdlibResponse.create(channelFactory, kafkaParameters);
- var kafkaClientBoundProducers = new HashMap>();
- for (String lane : kafkaParameters.getAllLanes()) {
- kafkaClientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, kafkaParameters, lane));
+ EventConsumer> tdRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory, channelsParameters);
+ EventProducer> tdResponseProducer = ChannelProducerTdlibResponse.create(channelFactory, channelsParameters);
+ var clientBoundProducers = new HashMap>();
+ for (String lane : channelsParameters.getAllLanes()) {
+ clientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, channelsParameters, lane));
}
- var kafkaTDLibServer = new TdlibChannelsServers(kafkaTDLibRequestConsumer,
- kafkaTDLibResponseProducer,
- kafkaClientBoundProducers
+ var tdServer = new TdlibChannelsServers(tdRequestConsumer,
+ tdResponseProducer,
+ clientBoundProducers
);
- this.kafkaSharedTdlibServers = new TdlibChannelsSharedServer(kafkaTDLibServer);
+ this.sharedTdlibServers = new TdlibChannelsSharedServer(tdServer);
} else {
- this.kafkaSharedTdlibServers = null;
+ this.sharedTdlibServers = null;
}
this.resultingEventTransformerSet = resultingEventTransformerSet;
@@ -142,8 +141,8 @@ public class AtomixReactiveApi implements ReactiveApi {
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
return loadSessions.then(Mono.fromRunnable(() -> {
- if (kafkaSharedTdlibServers != null) {
- requestsSub = kafkaSharedTdlibServers.requests()
+ if (sharedTdlibServers != null) {
+ requestsSub = sharedTdlibServers.requests()
.onBackpressureError()
.doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data()))
.subscribeOn(Schedulers.parallel())
@@ -173,7 +172,7 @@ public class AtomixReactiveApi implements ReactiveApi {
botToken = createBotSessionRequest.token();
phoneNumber = null;
lane = createBotSessionRequest.lane();
- reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, resultingEventTransformerSet,
+ reactiveApiPublisher = ReactiveApiPublisher.fromToken(sharedTdlibServers, resultingEventTransformerSet,
userId,
botToken,
lane
@@ -184,7 +183,7 @@ public class AtomixReactiveApi implements ReactiveApi {
botToken = null;
phoneNumber = createUserSessionRequest.phoneNumber();
lane = createUserSessionRequest.lane();
- reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet,
+ reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(sharedTdlibServers, resultingEventTransformerSet,
userId,
phoneNumber,
lane
@@ -196,14 +195,14 @@ public class AtomixReactiveApi implements ReactiveApi {
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
lane = loadSessionFromDiskRequest.lane();
if (loadSessionFromDiskRequest.phoneNumber() != null) {
- reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers,
+ reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(sharedTdlibServers,
resultingEventTransformerSet,
userId,
phoneNumber,
lane
);
} else {
- reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers,
+ reactiveApiPublisher = ReactiveApiPublisher.fromToken(sharedTdlibServers,
resultingEventTransformerSet,
userId,
botToken,
@@ -279,24 +278,24 @@ public class AtomixReactiveApi implements ReactiveApi {
@Override
public Mono close() {
closeRequested = true;
- Mono> kafkaServerProducersStopper;
- if (kafkaSharedTdlibServers != null) {
- kafkaServerProducersStopper = Mono.fromRunnable(kafkaSharedTdlibServers::close).subscribeOn(Schedulers.boundedElastic());
+ Mono> serverProducersStopper;
+ if (sharedTdlibServers != null) {
+ serverProducersStopper = Mono.fromRunnable(sharedTdlibServers::close).subscribeOn(Schedulers.boundedElastic());
} else {
- kafkaServerProducersStopper = Mono.empty();
+ serverProducersStopper = Mono.empty();
}
- Mono> kafkaClientProducersStopper;
- if (kafkaSharedTdlibClients != null) {
- kafkaClientProducersStopper = Mono
- .fromRunnable(kafkaSharedTdlibClients::close)
+ Mono> clientProducersStopper;
+ if (sharedTdlibClients != null) {
+ clientProducersStopper = Mono
+ .fromRunnable(sharedTdlibClients::close)
.subscribeOn(Schedulers.boundedElastic());
} else {
- kafkaClientProducersStopper = Mono.empty();
+ clientProducersStopper = Mono.empty();
}
if (requestsSub != null) {
requestsSub.dispose();
}
- return Mono.when(kafkaServerProducersStopper, kafkaClientProducersStopper);
+ return Mono.when(serverProducersStopper, clientProducersStopper);
}
@Override
diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
index 3b5bdd0..9e71705 100644
--- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java
@@ -52,11 +52,11 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
private final AtomicLong requestId = new AtomicLong(0);
private final Disposable subscription;
- public BaseAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) {
+ public BaseAtomixReactiveApiClient(ClientsSharedTdlib sharedTdlibClients) {
this.clientId = System.nanoTime();
- this.requests = kafkaSharedTdlibClients.requests();
+ this.requests = sharedTdlibClients.requests();
- this.subscription = kafkaSharedTdlibClients.responses().doOnNext(response -> {
+ this.subscription = sharedTdlibClients.responses().doOnNext(response -> {
var responseSink = responses.get(response.data().requestId());
if (responseSink == null) {
LOG.debug("Bot received a response for an unknown request id: {}", response.data().requestId());
diff --git a/src/main/java/it/tdlight/reactiveapi/Channel.java b/src/main/java/it/tdlight/reactiveapi/Channel.java
new file mode 100644
index 0000000..912a862
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/Channel.java
@@ -0,0 +1,22 @@
+package it.tdlight.reactiveapi;
+
+public enum Channel {
+ CLIENT_BOUND_EVENT("event"),
+ TDLIB_REQUEST("request"),
+ TDLIB_RESPONSE("response");
+
+ private final String name;
+
+ Channel(String name) {
+ this.name = name;
+ }
+
+ public String getChannelName() {
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
index 2312651..da307fe 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java
@@ -1,26 +1,24 @@
package it.tdlight.reactiveapi;
-public enum ChannelCodec {
- CLIENT_BOUND_EVENT("event", ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class),
- TDLIB_REQUEST("request", TdlibRequestSerializer.class, TdlibRequestDeserializer.class),
- TDLIB_RESPONSE("response", TdlibResponseSerializer.class, TdlibResponseDeserializer.class);
+import java.lang.reflect.InvocationTargetException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class ChannelCodec {
+ public static final ChannelCodec CLIENT_BOUND_EVENT = new ChannelCodec(ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class);
+ public static final ChannelCodec TDLIB_REQUEST = new ChannelCodec(TdlibRequestSerializer.class, TdlibRequestDeserializer.class);
+ public static final ChannelCodec TDLIB_RESPONSE = new ChannelCodec(TdlibResponseSerializer.class, TdlibResponseDeserializer.class);
+ public static final ChannelCodec UTF8_TEST = new ChannelCodec(UtfCodec.class, UtfCodec.class);
- private final String name;
private final Class> serializerClass;
private final Class> deserializerClass;
- ChannelCodec(String kafkaName,
- Class> serializerClass,
+ public ChannelCodec(Class> serializerClass,
Class> deserializerClass) {
- this.name = kafkaName;
this.serializerClass = serializerClass;
this.deserializerClass = deserializerClass;
}
- public String getKafkaName() {
- return name;
- }
-
public Class> getSerializerClass() {
return serializerClass;
}
@@ -29,8 +27,23 @@ public enum ChannelCodec {
return deserializerClass;
}
- @Override
- public String toString() {
- return name;
+ public Deserializer getNewDeserializer() {
+ try {
+ //noinspection unchecked
+ return (Deserializer) deserializerClass.getDeclaredConstructor().newInstance();
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException |
+ ClassCastException e) {
+ throw new IllegalStateException("Can't instantiate the codec deserializer", e);
+ }
+ }
+
+ public Serializer getNewSerializer() {
+ try {
+ //noinspection unchecked
+ return (Serializer) serializerClass.getDeclaredConstructor().newInstance();
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException |
+ ClassCastException e) {
+ throw new IllegalStateException("Can't instantiate the codec serializer", e);
+ }
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java
index 7cb6c49..ec492fc 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java
@@ -1,22 +1,22 @@
package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
+import org.jetbrains.annotations.NotNull;
public class ChannelConsumerClientBoundEvent {
private ChannelConsumerClientBoundEvent() {
}
- public static EventConsumer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters,
- String lane) {
- var codec = ChannelCodec.CLIENT_BOUND_EVENT;
+ public static EventConsumer create(ChannelFactory channelFactory, ChannelsParameters channelsParameters,
+ @NotNull String lane) {
String name;
- if (lane.isBlank()) {
- name = codec.getKafkaName();
+ if (lane.isEmpty()) {
+ name = Channel.CLIENT_BOUND_EVENT.getChannelName();
} else {
- name = codec.getKafkaName() + "-" + lane;
+ name = Channel.CLIENT_BOUND_EVENT.getChannelName() + "-" + lane;
}
- return channelFactory.newConsumer(kafkaParameters, false, codec, name);
+ return channelFactory.newConsumer(channelsParameters, false, ChannelCodec.CLIENT_BOUND_EVENT, name);
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java
index 3af9244..2bcc837 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java
@@ -8,11 +8,11 @@ public class ChannelConsumerTdlibRequest {
private ChannelConsumerTdlibRequest() {
}
- public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
- return channelFactory.newConsumer(kafkaParameters,
+ public static EventConsumer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) {
+ return channelFactory.newConsumer(channelsParameters,
true,
ChannelCodec.TDLIB_REQUEST,
- ChannelCodec.TDLIB_REQUEST.getKafkaName()
+ Channel.TDLIB_REQUEST.getChannelName()
);
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java
index d504f89..1353e19 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java
@@ -8,11 +8,11 @@ public class ChannelConsumerTdlibResponse {
private ChannelConsumerTdlibResponse() {
}
- public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
- return channelFactory.newConsumer(kafkaParameters,
+ public static EventConsumer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) {
+ return channelFactory.newConsumer(channelsParameters,
true,
ChannelCodec.TDLIB_RESPONSE,
- ChannelCodec.TDLIB_RESPONSE.getKafkaName()
+ Channel.TDLIB_RESPONSE.getChannelName()
);
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java
index 35855c6..cc77481 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java
@@ -2,33 +2,73 @@ package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.kafka.KafkaConsumer;
import it.tdlight.reactiveapi.kafka.KafkaProducer;
+import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsClient;
+import it.tdlight.reactiveapi.rsocket.RSocketProduceAsServer;
+import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsServer;
+import it.tdlight.reactiveapi.rsocket.RSocketProduceAsClient;
public interface ChannelFactory {
- EventConsumer newConsumer(KafkaParameters kafkaParameters,
+ static ChannelFactory getFactoryFromParameters(ChannelsParameters channelsParameters) {
+ if (channelsParameters instanceof KafkaParameters) {
+ return new KafkaChannelFactory();
+ } else {
+ return new RsocketChannelFactory();
+ }
+ }
+
+ EventConsumer newConsumer(ChannelsParameters channelsParameters,
boolean quickResponse,
ChannelCodec channelCodec,
String channelName);
- EventProducer newProducer(KafkaParameters kafkaParameters,
+ EventProducer newProducer(ChannelsParameters channelsParameters,
ChannelCodec channelCodec,
String channelName);
class KafkaChannelFactory implements ChannelFactory {
@Override
- public EventConsumer newConsumer(KafkaParameters kafkaParameters,
+ public EventConsumer newConsumer(ChannelsParameters channelsParameters,
boolean quickResponse,
ChannelCodec channelCodec,
String channelName) {
- return new KafkaConsumer<>(kafkaParameters, quickResponse, channelCodec, channelName);
+ return new KafkaConsumer<>((KafkaParameters) channelsParameters, quickResponse, channelCodec, channelName);
}
@Override
- public EventProducer newProducer(KafkaParameters kafkaParameters,
+ public EventProducer newProducer(ChannelsParameters channelsParameters,
ChannelCodec channelCodec,
String channelName) {
- return new KafkaProducer<>(kafkaParameters, channelCodec, channelName);
+ return new KafkaProducer<>((KafkaParameters) channelsParameters, channelCodec, channelName);
+ }
+ }
+
+ class RsocketChannelFactory implements ChannelFactory {
+
+ @Override
+ public EventConsumer newConsumer(ChannelsParameters channelsParameters,
+ boolean quickResponse,
+ ChannelCodec channelCodec,
+ String channelName) {
+ var socketParameters = (RSocketParameters) channelsParameters;
+ if (socketParameters.isClient()) {
+ return new RSocketConsumeAsClient<>(socketParameters.host(), channelCodec, channelName);
+ } else {
+ return new RSocketConsumeAsServer<>(socketParameters.host(), channelCodec, channelName);
+ }
+ }
+
+ @Override
+ public EventProducer newProducer(ChannelsParameters channelsParameters,
+ ChannelCodec channelCodec,
+ String channelName) {
+ var socketParameters = (RSocketParameters) channelsParameters;
+ if (socketParameters.isClient()) {
+ return new RSocketProduceAsServer<>(socketParameters.host(), channelCodec, channelName);
+ } else {
+ return new RSocketProduceAsClient<>(socketParameters.host(), channelCodec, channelName);
+ }
}
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java
index 88eea09..d22c9ce 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java
@@ -4,21 +4,16 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent;
public class ChannelProducerClientBoundEvent {
- private static final ChannelCodec CODEC = ChannelCodec.CLIENT_BOUND_EVENT;
-
private ChannelProducerClientBoundEvent() {
}
- public static EventProducer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters, String lane) {
+ public static EventProducer create(ChannelFactory channelFactory, ChannelsParameters channelsParameters, String lane) {
String name;
if (lane.isBlank()) {
- name = CODEC.getKafkaName();
+ name = Channel.CLIENT_BOUND_EVENT.getChannelName();
} else {
- name = CODEC.getKafkaName() + "-" + lane;
+ name = Channel.CLIENT_BOUND_EVENT.getChannelName() + "-" + lane;
}
- return channelFactory.newProducer(kafkaParameters,
- CODEC,
- name
- );
+ return channelFactory.newProducer(channelsParameters, ChannelCodec.CLIENT_BOUND_EVENT, name);
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java
index 58acad7..9dad97d 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java
@@ -7,10 +7,10 @@ public class ChannelProducerTdlibRequest {
private ChannelProducerTdlibRequest() {
}
- public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
- return channelFactory.newProducer(kafkaParameters,
+ public static EventProducer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) {
+ return channelFactory.newProducer(channelsParameters,
ChannelCodec.TDLIB_REQUEST,
- ChannelCodec.TDLIB_REQUEST.getKafkaName()
+ Channel.TDLIB_REQUEST.getChannelName()
);
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java
index 7d914da..92be3df 100644
--- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java
@@ -8,10 +8,10 @@ public class ChannelProducerTdlibResponse {
private ChannelProducerTdlibResponse() {
}
- public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) {
- return channelFactory.newProducer(kafkaParameters,
+ public static EventProducer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) {
+ return channelFactory.newProducer(channelsParameters,
ChannelCodec.TDLIB_RESPONSE,
- ChannelCodec.TDLIB_RESPONSE.getKafkaName()
+ Channel.TDLIB_RESPONSE.getChannelName()
);
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelsParameters.java b/src/main/java/it/tdlight/reactiveapi/ChannelsParameters.java
new file mode 100644
index 0000000..dbd8561
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ChannelsParameters.java
@@ -0,0 +1,8 @@
+package it.tdlight.reactiveapi;
+
+import java.util.Set;
+
+public interface ChannelsParameters {
+
+ Set getAllLanes();
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java
index 789af0d..a4578bd 100644
--- a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java
+++ b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java
@@ -22,7 +22,7 @@ public class ClientsSharedTdlib implements Closeable {
private static final Logger LOG = LogManager.getLogger(ClientsSharedTdlib.class);
- private final TdlibChannelsClients kafkaTdlibClientsChannels;
+ private final TdlibChannelsClients tdClientsChannels;
private final AtomicReference responsesSub = new AtomicReference<>();
private final Disposable requestsSub;
private final AtomicReference eventsSub = new AtomicReference<>();
@@ -31,12 +31,12 @@ public class ClientsSharedTdlib implements Closeable {
private final Many> requests = Sinks.many().unicast()
.onBackpressureBuffer(Queues.>get(65535).get());
- public ClientsSharedTdlib(TdlibChannelsClients kafkaTdlibClientsChannels) {
- this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels;
- this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses");
- this.events = kafkaTdlibClientsChannels.events().entrySet().stream()
- .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages(e.getKey())));
- this.requestsSub = kafkaTdlibClientsChannels.request()
+ public ClientsSharedTdlib(TdlibChannelsClients tdClientsChannels) {
+ this.tdClientsChannels = tdClientsChannels;
+ this.responses = tdClientsChannels.response().consumeMessages();
+ this.events = tdClientsChannels.events().entrySet().stream()
+ .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages()));
+ this.requestsSub = tdClientsChannels.request()
.sendMessages(requests.asFlux())
.subscribeOn(Schedulers.parallel())
.subscribe();
@@ -73,6 +73,6 @@ public class ClientsSharedTdlib implements Closeable {
if (eventsSub != null) {
eventsSub.dispose();
}
- kafkaTdlibClientsChannels.close();
+ tdClientsChannels.close();
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java
index 61672e4..23affcb 100644
--- a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java
+++ b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java
@@ -3,6 +3,7 @@ package it.tdlight.reactiveapi;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
+import org.jetbrains.annotations.Nullable;
/**
* Define the cluster structure
@@ -11,14 +12,28 @@ public class ClusterSettings {
public String id;
public List kafkaBootstrapServers;
+ public String rsocketHost;
public List lanes;
@JsonCreator
public ClusterSettings(@JsonProperty(required = true, value = "id") String id,
- @JsonProperty(required = true, value = "kafkaBootstrapServers") List kafkaBootstrapServers,
+ @JsonProperty(value = "kafkaBootstrapServers") List kafkaBootstrapServers,
+ @JsonProperty(value = "rsocketHost") String rsocketHost,
@JsonProperty(required = true, value = "lanes") List lanes) {
this.id = id;
this.kafkaBootstrapServers = kafkaBootstrapServers;
+ this.rsocketHost = rsocketHost;
this.lanes = lanes;
+ if ((rsocketHost == null) == (kafkaBootstrapServers == null || kafkaBootstrapServers.isEmpty())) {
+ throw new IllegalArgumentException("Please configure either RSocket or Kafka");
+ }
+ }
+
+ public ChannelsParameters toParameters(String clientId, InstanceType instanceType) {
+ if (rsocketHost != null) {
+ return new RSocketParameters(instanceType, rsocketHost, lanes);
+ } else {
+ return new KafkaParameters(clientId, clientId, kafkaBootstrapServers, List.copyOf(lanes));
+ }
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java
index c69b027..2b907f1 100644
--- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java
+++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java
@@ -9,11 +9,8 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
-import java.util.concurrent.ThreadLocalRandom;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,11 +49,10 @@ public class Entrypoint {
String diskSessionsConfigPath = args.diskSessionsPath;
clusterSettings = mapper.readValue(Paths.get(clusterConfigPath).toFile(), ClusterSettings.class);
instanceSettings = mapper.readValue(Paths.get(instanceConfigPath).toFile(), InstanceSettings.class);
- if (instanceSettings.client) {
- diskSessions = null;
- } else {
- diskSessions = new DiskSessionsManager(mapper, diskSessionsConfigPath);
- }
+ diskSessions = switch (instanceSettings.instanceType) {
+ case TDLIB -> new DiskSessionsManager(mapper, diskSessionsConfigPath);
+ case UPDATES_CONSUMER -> null;
+ };
}
return start(clusterSettings, instanceSettings, diskSessions);
@@ -68,43 +64,47 @@ public class Entrypoint {
Set resultingEventTransformerSet;
AtomixReactiveApiMode mode = AtomixReactiveApiMode.SERVER;
- if (instanceSettings.client) {
- if (diskSessions != null) {
- throw new IllegalArgumentException("A client instance can't have a session manager!");
- }
- if (instanceSettings.clientAddress == null) {
- throw new IllegalArgumentException("A client instance must have an address (host:port)");
- }
- mode = AtomixReactiveApiMode.CLIENT;
- resultingEventTransformerSet = Set.of();
- } else {
- if (diskSessions == null) {
- throw new IllegalArgumentException("A full instance must have a session manager!");
+ switch (instanceSettings.instanceType) {
+ case UPDATES_CONSUMER -> {
+ if (diskSessions != null) {
+ throw new IllegalArgumentException("An updates-consumer instance can't have a session manager!");
+ }
+ if (instanceSettings.listenAddress == null) {
+ throw new IllegalArgumentException("An updates-consumer instance must have an address (host:port)");
+ }
+ mode = AtomixReactiveApiMode.CLIENT;
+ resultingEventTransformerSet = Set.of();
}
+ case TDLIB -> {
+ if (diskSessions == null) {
+ throw new IllegalArgumentException("A tdlib instance must have a session manager!");
+ }
- resultingEventTransformerSet = new HashSet<>();
- if (instanceSettings.resultingEventTransformers != null) {
- for (var resultingEventTransformer: instanceSettings.resultingEventTransformers) {
- try {
- var instance = resultingEventTransformer.getConstructor().newInstance();
- resultingEventTransformerSet.add(instance);
- LOG.info("Loaded and applied resulting event transformer: " + resultingEventTransformer.getName());
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
- throw new IllegalArgumentException("Failed to load resulting event transformer: "
- + resultingEventTransformer.getName());
- } catch (NoSuchMethodException e) {
- throw new IllegalArgumentException("The client transformer must declare an empty constructor: "
- + resultingEventTransformer.getName());
+ resultingEventTransformerSet = new HashSet<>();
+ if (instanceSettings.resultingEventTransformers != null) {
+ for (var resultingEventTransformer: instanceSettings.resultingEventTransformers) {
+ try {
+ var instance = resultingEventTransformer.getConstructor().newInstance();
+ resultingEventTransformerSet.add(instance);
+ LOG.info("Loaded and applied resulting event transformer: " + resultingEventTransformer.getName());
+ } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new IllegalArgumentException("Failed to load resulting event transformer: "
+ + resultingEventTransformer.getName());
+ } catch (NoSuchMethodException e) {
+ throw new IllegalArgumentException("The client transformer must declare an empty constructor: "
+ + resultingEventTransformer.getName());
+ }
}
}
- }
- resultingEventTransformerSet = unmodifiableSet(resultingEventTransformerSet);
+ resultingEventTransformerSet = unmodifiableSet(resultingEventTransformerSet);
+ }
+ default -> throw new UnsupportedOperationException("Unsupported instance type: " + instanceSettings.instanceType);
}
- var kafkaParameters = new KafkaParameters(clusterSettings, instanceSettings.id);
+ ChannelsParameters channelsParameters = clusterSettings.toParameters(instanceSettings.id, instanceSettings.instanceType);
- var api = new AtomixReactiveApi(mode, kafkaParameters, diskSessions, resultingEventTransformerSet);
+ var api = new AtomixReactiveApi(mode, channelsParameters, diskSessions, resultingEventTransformerSet);
LOG.info("Starting ReactiveApi...");
diff --git a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java
index 409f2c4..4a6d689 100644
--- a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java
+++ b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java
@@ -5,11 +5,9 @@ import reactor.core.publisher.Flux;
public interface EventConsumer {
- boolean isQuickResponse();
-
ChannelCodec getChannelCodec();
String getChannelName();
- Flux> consumeMessages(@NotNull String subGroupId);
+ Flux> consumeMessages();
}
diff --git a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java
index abd86c3..bff1b21 100644
--- a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java
+++ b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java
@@ -3,7 +3,7 @@ package it.tdlight.reactiveapi;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
-import java.util.Set;
+import java.util.Objects;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -12,30 +12,47 @@ public class InstanceSettings {
@NotNull
public String id;
- /**
- * True if this is just a client, false if this is a server
- */
- public boolean client;
+ public InstanceType instanceType;
/**
- * If {@link #client} is true, this will be the address of this client
+ * If {@link #instanceType} is true, this will be the address of this client
*/
- public @Nullable String clientAddress;
+ public @Nullable String listenAddress;
/**
- * If {@link #client} is false, this will transform resulting events before being sent
+ * If {@link #instanceType} is false, this will transform resulting events before being sent
*/
public @Nullable List> resultingEventTransformers;
+ public InstanceSettings(@NotNull String id,
+ @NotNull InstanceType instanceType,
+ @Nullable String listenAddress,
+ @Nullable List> resultingEventTransformers) {
+ this.id = id;
+ this.instanceType = instanceType;
+ this.listenAddress = listenAddress;
+ this.resultingEventTransformers = resultingEventTransformers;
+ }
+
@JsonCreator
public InstanceSettings(@JsonProperty(required = true, value = "id") @NotNull String id,
- @JsonProperty(required = true, value = "client") boolean client,
- @JsonProperty("clientAddress") @Nullable String clientAddress,
- @JsonProperty("resultingEventTransformers") @Nullable
- List> resultingEventTransformers) {
+ @Deprecated @JsonProperty(value = "client", defaultValue = "null") Boolean deprecatedIsClient,
+ @JsonProperty(value = "instanceType", defaultValue = "null") String instanceType,
+ @Deprecated @JsonProperty(value = "clientAddress", defaultValue = "null") @Nullable String deprecatedClientAddress,
+ @JsonProperty(value = "listenAddress", defaultValue = "null") @Nullable String listenAddress,
+ @JsonProperty("resultingEventTransformers")
+ @Nullable List> resultingEventTransformers) {
this.id = id;
- this.client = client;
- this.clientAddress = clientAddress;
+ if (deprecatedIsClient != null) {
+ this.instanceType = deprecatedIsClient ? InstanceType.UPDATES_CONSUMER : InstanceType.TDLIB;
+ } else {
+ this.instanceType = InstanceType.valueOf(instanceType.toUpperCase());
+ }
+ if (deprecatedClientAddress != null) {
+ this.listenAddress = deprecatedClientAddress;
+ } else {
+ this.listenAddress = listenAddress;
+ }
this.resultingEventTransformers = resultingEventTransformers;
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/InstanceType.java b/src/main/java/it/tdlight/reactiveapi/InstanceType.java
new file mode 100644
index 0000000..2610571
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/InstanceType.java
@@ -0,0 +1,6 @@
+package it.tdlight.reactiveapi;
+
+public enum InstanceType {
+ UPDATES_CONSUMER,
+ TDLIB
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java
index 0bd5378..10bf279 100644
--- a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java
+++ b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java
@@ -4,16 +4,14 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
-public record KafkaParameters(String groupId, String clientId, String bootstrapServers, List lanes) {
+public record KafkaParameters(String groupId, String clientId, List bootstrapServers,
+ List lanes) implements ChannelsParameters {
- public KafkaParameters(ClusterSettings clusterSettings, String clientId) {
- this(clientId,
- clientId,
- String.join(",", clusterSettings.kafkaBootstrapServers),
- List.copyOf(clusterSettings.lanes)
- );
+ public String getBootstrapServersString() {
+ return String.join(",", bootstrapServers);
}
+ @Override
public Set getAllLanes() {
var lanes = new LinkedHashSet(this.lanes.size() + 1);
lanes.add("main");
diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
index 2692dd6..cb53f63 100644
--- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
+++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java
@@ -8,21 +8,21 @@ import reactor.core.publisher.Flux;
public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient {
- private final ClientsSharedTdlib kafkaSharedTdlibClients;
+ private final ClientsSharedTdlib sharedTdlibClients;
- LiveAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) {
- super(kafkaSharedTdlibClients);
- this.kafkaSharedTdlibClients = kafkaSharedTdlibClients;
+ LiveAtomixReactiveApiClient(ClientsSharedTdlib sharedTdlibClients) {
+ super(sharedTdlibClients);
+ this.sharedTdlibClients = sharedTdlibClients;
}
@Override
public Flux clientBoundEvents(String lane) {
- return kafkaSharedTdlibClients.events(lane).map(Timestamped::data);
+ return sharedTdlibClients.events(lane).map(Timestamped::data);
}
@Override
public Map> clientBoundEvents() {
- return kafkaSharedTdlibClients.events().entrySet().stream()
+ return sharedTdlibClients.events().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().map(Timestamped::data)));
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java
new file mode 100644
index 0000000..94bcd3e
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java
@@ -0,0 +1,63 @@
+package it.tdlight.reactiveapi;
+
+import com.google.common.net.HostAndPort;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import org.jetbrains.annotations.Nullable;
+
+public final class RSocketParameters implements ChannelsParameters {
+
+ private final boolean client;
+ private final HostAndPort host;
+ private final List lanes;
+
+ public RSocketParameters(InstanceType instanceType, String host, List lanes) {
+ this.client = instanceType != InstanceType.UPDATES_CONSUMER;
+ this.host = HostAndPort.fromString(host);
+ this.lanes = lanes;
+ }
+
+ @Override
+ public Set getAllLanes() {
+ var lanes = new LinkedHashSet(this.lanes.size() + 1);
+ lanes.add("main");
+ lanes.addAll(this.lanes);
+ return lanes;
+ }
+
+ public boolean isClient() {
+ return client;
+ }
+
+ public HostAndPort host() {
+ return host;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null || obj.getClass() != this.getClass()) {
+ return false;
+ }
+ var that = (RSocketParameters) obj;
+ return Objects.equals(this.client, that.client) && Objects.equals(
+ this.host,
+ that.host) && Objects.equals(this.lanes, that.lanes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(client, host, lanes);
+ }
+
+ @Override
+ public String toString() {
+ return "RSocketParameters[client=" + client + ", " + "host=" + host + ", "
+ + "lanes=" + lanes + ']';
+ }
+
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
index 7ca3274..94ada35 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
@@ -71,7 +71,7 @@ public abstract class ReactiveApiPublisher {
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5);
private static final Duration HUNDRED_MS = Duration.ofMillis(100);
- private final TdlibChannelsSharedServer kafkaSharedTdlibServers;
+ private final TdlibChannelsSharedServer sharedTdlibServers;
private final Set resultingEventTransformerSet;
private final ReactiveTelegramClient rawTelegramClient;
private final Flux telegramClient;
@@ -85,14 +85,14 @@ public abstract class ReactiveApiPublisher {
private final AtomicReference disposable = new AtomicReference<>();
private final AtomicReference path = new AtomicReference<>();
- private ReactiveApiPublisher(TdlibChannelsSharedServer kafkaSharedTdlibServers,
+ private ReactiveApiPublisher(TdlibChannelsSharedServer sharedTdlibServers,
Set resultingEventTransformerSet,
long userId, String lane) {
- this.kafkaSharedTdlibServers = kafkaSharedTdlibServers;
+ this.sharedTdlibServers = sharedTdlibServers;
this.resultingEventTransformerSet = resultingEventTransformerSet;
this.userId = userId;
this.lane = Objects.requireNonNull(lane);
- this.responses = this.kafkaSharedTdlibServers.responses();
+ this.responses = this.sharedTdlibServers.responses();
this.rawTelegramClient = ClientManager.createReactive();
try {
Init.start();
@@ -114,20 +114,20 @@ public abstract class ReactiveApiPublisher {
});
}
- public static ReactiveApiPublisher fromToken(TdlibChannelsSharedServer kafkaSharedTdlibServers,
+ public static ReactiveApiPublisher fromToken(TdlibChannelsSharedServer sharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
String token,
String lane) {
- return new ReactiveApiPublisherToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, token, lane);
+ return new ReactiveApiPublisherToken(sharedTdlibServers, resultingEventTransformerSet, userId, token, lane);
}
- public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers,
+ public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedServer sharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
long phoneNumber,
String lane) {
- return new ReactiveApiPublisherPhoneNumber(kafkaSharedTdlibServers,
+ return new ReactiveApiPublisherPhoneNumber(sharedTdlibServers,
resultingEventTransformerSet,
userId,
phoneNumber,
@@ -209,7 +209,7 @@ public abstract class ReactiveApiPublisher {
// Buffer requests to avoid halting the event loop
.onBackpressureBuffer();
- kafkaSharedTdlibServers.events(lane, messagesToSend);
+ sharedTdlibServers.events(lane, messagesToSend);
publishedResultingEvents
// Obtain only cluster-bound events
@@ -551,12 +551,12 @@ public abstract class ReactiveApiPublisher {
private final String botToken;
- public ReactiveApiPublisherToken(TdlibChannelsSharedServer kafkaSharedTdlibServers,
+ public ReactiveApiPublisherToken(TdlibChannelsSharedServer sharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
String botToken,
String lane) {
- super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, lane);
+ super(sharedTdlibServers, resultingEventTransformerSet, userId, lane);
this.botToken = botToken;
}
@@ -583,12 +583,12 @@ public abstract class ReactiveApiPublisher {
private final long phoneNumber;
- public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers,
+ public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedServer sharedTdlibServers,
Set resultingEventTransformerSet,
long userId,
long phoneNumber,
String lane) {
- super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, lane);
+ super(sharedTdlibServers, resultingEventTransformerSet, userId, lane);
this.phoneNumber = phoneNumber;
}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
index 62a8098..c94ebcb 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
@@ -1,11 +1,32 @@
package it.tdlight.reactiveapi;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.LongConsumer;
+import org.jetbrains.annotations.NotNull;
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.Signal;
+import reactor.util.context.Context;
public class ReactorUtils {
+ @SuppressWarnings("rawtypes")
+ private static final WaitingSink WAITING_SINK = new WaitingSink<>();
+
public static Flux subscribeOnce(Flux f) {
AtomicBoolean subscribed = new AtomicBoolean();
return f.doOnSubscribe(s -> {
@@ -23,4 +44,144 @@ public class ReactorUtils {
}
});
}
+
+ public static Flux createLastestSubscriptionFlux(Flux upstream, int maxBufferSize) {
+ return upstream.transform(parent -> {
+ AtomicReference subscriptionAtomicReference = new AtomicReference<>();
+ AtomicReference> prevEmitterRef = new AtomicReference<>();
+ Deque> queue = new ArrayDeque<>(maxBufferSize);
+
+ return Flux.create(emitter -> {
+ var prevEmitter = prevEmitterRef.getAndSet(emitter);
+
+ if (prevEmitter != null) {
+ if (prevEmitter != WAITING_SINK) {
+ prevEmitter.error(new CancellationException());
+ }
+ synchronized (queue) {
+ Signal next;
+ while (!emitter.isCancelled() && (next = queue.peek()) != null) {
+ if (next.isOnNext()) {
+ queue.poll();
+ var nextVal = next.get();
+ assert nextVal != null;
+ emitter.next(nextVal);
+ } else if (next.isOnError()) {
+ var throwable = next.getThrowable();
+ assert throwable != null;
+ emitter.error(throwable);
+ break;
+ } else if (next.isOnComplete()) {
+ emitter.complete();
+ break;
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+ }
+ } else {
+ parent.subscribe(new CoreSubscriber<>() {
+ @Override
+ public void onSubscribe(@NotNull Subscription s) {
+ subscriptionAtomicReference.set(s);
+ }
+
+ @Override
+ public void onNext(K payload) {
+ FluxSink prevEmitter = prevEmitterRef.get();
+ if (prevEmitter != WAITING_SINK) {
+ prevEmitter.next(payload);
+ } else {
+ synchronized (queue) {
+ queue.add(Signal.next(payload));
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ FluxSink prevEmitter = prevEmitterRef.get();
+ synchronized (queue) {
+ queue.add(Signal.error(throwable));
+ }
+ if (prevEmitter != WAITING_SINK) {
+ prevEmitter.error(throwable);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ FluxSink prevEmitter = prevEmitterRef.get();
+ synchronized (queue) {
+ queue.add(Signal.complete());
+ }
+ if (prevEmitter != WAITING_SINK) {
+ prevEmitter.complete();
+ }
+ }
+ });
+ }
+ var s = subscriptionAtomicReference.get();
+ emitter.onRequest(n -> {
+ if (n > maxBufferSize) {
+ emitter.error(new UnsupportedOperationException("Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize));
+ } else {
+ s.request(n);
+ }
+ });
+ //noinspection unchecked
+ emitter.onCancel(() -> prevEmitterRef.compareAndSet(emitter, WAITING_SINK));
+ //noinspection unchecked
+ emitter.onDispose(() -> prevEmitterRef.compareAndSet(emitter, WAITING_SINK));
+ }, OverflowStrategy.BUFFER);
+ });
+ }
+
+ private static class WaitingSink implements FluxSink {
+
+ @Override
+ public @NotNull FluxSink next(@NotNull T t) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void complete() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void error(@NotNull Throwable e) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NotNull Context currentContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long requestedFromDownstream() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isCancelled() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NotNull FluxSink onRequest(@NotNull LongConsumer consumer) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NotNull FluxSink onCancel(@NotNull Disposable d) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public @NotNull FluxSink onDispose(@NotNull Disposable d) {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java
index e561d82..7f62399 100644
--- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java
@@ -18,21 +18,20 @@ import reactor.util.concurrent.Queues;
public class TdlibChannelsSharedServer implements Closeable {
- private final TdlibChannelsServers kafkaTdlibServersChannels;
+ private final TdlibChannelsServers tdServersChannels;
private final Disposable responsesSub;
private final AtomicReference requestsSub = new AtomicReference<>();
private final Many> responses = Sinks.many().unicast().onBackpressureBuffer(
Queues.>get(65535).get());
private final Flux>> requests;
- public TdlibChannelsSharedServer(TdlibChannelsServers kafkaTdlibServersChannels) {
- this.kafkaTdlibServersChannels = kafkaTdlibServersChannels;
- this.responsesSub = kafkaTdlibServersChannels.response()
+ public TdlibChannelsSharedServer(TdlibChannelsServers tdServersChannels) {
+ this.tdServersChannels = tdServersChannels;
+ this.responsesSub = tdServersChannels.response()
.sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT))
.subscribeOn(Schedulers.parallel())
.subscribe();
- this.requests = kafkaTdlibServersChannels.request()
- .consumeMessages("td-requests");
+ this.requests = tdServersChannels.request().consumeMessages();
}
public Flux>> requests() {
@@ -42,7 +41,7 @@ public class TdlibChannelsSharedServer implements Closeable {
}
public Disposable events(String lane, Flux eventFlux) {
- return kafkaTdlibServersChannels.events(lane)
+ return tdServersChannels.events(lane)
.sendMessages(eventFlux)
.subscribeOn(Schedulers.parallel())
.subscribe();
@@ -59,6 +58,6 @@ public class TdlibChannelsSharedServer implements Closeable {
if (requestsSub != null) {
requestsSub.dispose();
}
- kafkaTdlibServersChannels.close();
+ tdServersChannels.close();
}
}
diff --git a/src/main/java/it/tdlight/reactiveapi/UtfCodec.java b/src/main/java/it/tdlight/reactiveapi/UtfCodec.java
new file mode 100644
index 0000000..5485655
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/UtfCodec.java
@@ -0,0 +1,28 @@
+package it.tdlight.reactiveapi;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+
+public class UtfCodec implements Serializer, Deserializer {
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ }
+
+ @Override
+ public String deserialize(String topic, byte[] data) {
+ return new String(data, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public byte[] serialize(String topic, String data) {
+ return data.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java
index 9928132..b4b6361 100644
--- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java
+++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java
@@ -56,7 +56,7 @@ public final class KafkaConsumer implements EventConsumer {
throw new RuntimeException(e);
}
Map props = new HashMap<>();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBootstrapServersString());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
@@ -81,7 +81,6 @@ public final class KafkaConsumer implements EventConsumer {
return KafkaReceiver.create(options);
}
- @Override
public boolean isQuickResponse() {
return quickResponse;
}
@@ -120,12 +119,12 @@ public final class KafkaConsumer implements EventConsumer {
}
@Override
- public Flux> consumeMessages(@NotNull String subGroupId) {
- return consumeMessagesInternal(subGroupId);
+ public Flux> consumeMessages() {
+ return consumeMessagesInternal();
}
- private Flux> consumeMessagesInternal(@NotNull String subGroupId) {
- return createReceiver(kafkaParameters.groupId() + (subGroupId.isBlank() ? "" : ("-" + subGroupId)))
+ private Flux> consumeMessagesInternal() {
+ return createReceiver(kafkaParameters.groupId() + "-" + channelName)
.receiveAutoAck(isQuickResponse() ? 1 : 4)
.concatMap(Function.identity())
.log("consume-messages",
diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java
index 861e4d4..4292504 100644
--- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java
+++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java
@@ -30,7 +30,7 @@ public final class KafkaProducer implements EventProducer {
this.channelCodec = channelCodec;
this.channelName = channelName;
Map props = new HashMap<>();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers());
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBootstrapServersString());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId());
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024));
diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java
new file mode 100644
index 0000000..799ebf9
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java
@@ -0,0 +1,75 @@
+package it.tdlight.reactiveapi.rsocket;
+
+import com.google.common.net.HostAndPort;
+import io.netty.buffer.ByteBuf;
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.core.Resume;
+import io.rsocket.frame.decoder.PayloadDecoder;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.util.DefaultPayload;
+import it.tdlight.reactiveapi.ChannelCodec;
+import it.tdlight.reactiveapi.EventConsumer;
+import it.tdlight.reactiveapi.RSocketParameters;
+import it.tdlight.reactiveapi.Timestamped;
+import java.util.logging.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class RSocketConsumeAsClient implements EventConsumer {
+
+ private static final Logger LOG = LogManager.getLogger(RSocketConsumeAsClient.class);
+
+ private final HostAndPort host;
+ private final ChannelCodec channelCodec;
+ private final String channelName;
+
+ public RSocketConsumeAsClient(HostAndPort hostAndPort,
+ ChannelCodec channelCodec,
+ String channelName) {
+ this.channelCodec = channelCodec;
+ this.channelName = channelName;
+ this.host = hostAndPort;
+ }
+
+ @Override
+ public ChannelCodec getChannelCodec() {
+ return channelCodec;
+ }
+
+ @Override
+ public String getChannelName() {
+ return channelName;
+ }
+
+ @Override
+ public Flux> consumeMessages() {
+ var deserializer = channelCodec.getNewDeserializer();
+ return
+ RSocketConnector.create()
+ .resume(new Resume())
+ .payloadDecoder(PayloadDecoder.ZERO_COPY)
+ .connect(TcpClientTransport.create(host.getHost(), host.getPort()))
+ .flatMapMany(socket -> socket
+ .requestStream(DefaultPayload.create("", "consume"))
+ .map(payload -> {
+ ByteBuf slice = payload.sliceData();
+ var data = new byte[slice.readableBytes()];
+ slice.readBytes(data, 0, data.length);
+ //noinspection unchecked
+ return new Timestamped(System.currentTimeMillis(), (T) deserializer.deserialize(null, data));
+ })
+ .doFinally(signalType -> socket
+ .fireAndForget(DefaultPayload.create("", "close"))
+ .then(socket.onClose())
+ .doFinally(s -> socket.dispose())
+ .subscribeOn(Schedulers.parallel())
+ .subscribe()))
+ .log("RSOCKET_CONSUMER_CLIENT", Level.FINE);
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java
new file mode 100644
index 0000000..3543b0c
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java
@@ -0,0 +1,230 @@
+package it.tdlight.reactiveapi.rsocket;
+
+import com.google.common.net.HostAndPort;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.rsocket.ConnectionSetupPayload;
+import io.rsocket.RSocket;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.core.RSocketServer;
+import io.rsocket.core.Resume;
+import io.rsocket.frame.decoder.PayloadDecoder;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.transport.netty.server.CloseableChannel;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.DefaultPayload;
+import it.tdlight.reactiveapi.ChannelCodec;
+import it.tdlight.reactiveapi.EventConsumer;
+import it.tdlight.reactiveapi.RSocketParameters;
+import it.tdlight.reactiveapi.Timestamped;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.logging.Level;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.core.publisher.BaseSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Empty;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.function.Tuple2;
+import reactor.util.function.Tuple3;
+import reactor.util.function.Tuples;
+import reactor.util.retry.Retry;
+
+public class RSocketConsumeAsServer implements EventConsumer {
+
+ private static final Logger LOG = LogManager.getLogger(RSocketConsumeAsServer.class);
+
+ private final HostAndPort host;
+ private final ChannelCodec channelCodec;
+ private final String channelName;
+
+ public RSocketConsumeAsServer(HostAndPort hostAndPort,
+ ChannelCodec channelCodec,
+ String channelName) {
+ this.channelCodec = channelCodec;
+ this.channelName = channelName;
+ this.host = hostAndPort;
+ }
+
+ @Override
+ public ChannelCodec getChannelCodec() {
+ return channelCodec;
+ }
+
+ @Override
+ public String getChannelName() {
+ return channelName;
+ }
+
+ @Override
+ public Flux> consumeMessages() {
+ Deserializer deserializer = channelCodec.getNewDeserializer();
+ return Mono
+ .>>>create(sink -> {
+ AtomicReference serverRef = new AtomicReference<>();
+ var server = RSocketServer
+ .create((setup, in) -> {
+ var inRawFlux = in.requestStream(DefaultPayload.create("", "consume"));
+ var inFlux = inRawFlux.map(payload -> {
+ ByteBuf slice = payload.sliceData();
+ var data = new byte[slice.readableBytes()];
+ slice.readBytes(data, 0, data.length);
+ return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, data));
+ });
+ sink.success(Tuples.of(serverRef.get(), in, inFlux));
+
+ return Mono.just(new RSocket() {});
+ })
+ .payloadDecoder(PayloadDecoder.ZERO_COPY)
+ .resume(new Resume())
+ .bindNow(TcpServerTransport.create(host.getHost(), host.getPort()));
+ serverRef.set(server);
+ sink.onCancel(server);
+ })
+ .subscribeOn(Schedulers.boundedElastic())
+ .flatMapMany(t -> t.getT3().doFinally(s -> {
+ t.getT2().dispose();
+ t.getT1().dispose();
+ }))
+ .log("RSOCKET_CONSUMER_SERVER", Level.FINE);
+ }
+ /*return Flux.defer(() -> {
+ var deserializer = channelCodec.getNewDeserializer();
+ AtomicReference inRef = new AtomicReference<>();
+ AtomicReference inSubRef = new AtomicReference<>();
+ return Flux.>create(sink -> {
+ var server = RSocketServer.create((setup, in) -> {
+ var prev = inRef.getAndSet(in);
+ if (prev != null) {
+ prev.dispose();
+ }
+
+ var inRawFlux = in.requestStream(DefaultPayload.create("", "consume"));
+ var inFlux = inRawFlux.map(payload -> {
+ ByteBuf slice = payload.sliceData();
+ var data = new byte[slice.readableBytes()];
+ slice.readBytes(data, 0, data.length);
+ //noinspection unchecked
+ return new Timestamped<>(System.currentTimeMillis(), (T) deserializer.deserialize(null, data));
+ });
+
+ inFlux.subscribe(new CoreSubscriber<>() {
+ @Override
+ public void onSubscribe(@NotNull Subscription s) {
+ var prevS = inSubRef.getAndSet(s);
+ if (prevS != null) {
+ prevS.cancel();
+ } else {
+ sink.onRequest(n -> {
+ s.request(n);
+ });
+ }
+ }
+
+ @Override
+ public void onNext(Timestamped val) {
+ sink.next(val);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ sink.error(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ sink.complete();
+ }
+ });
+
+ return Mono.just(new RSocket() {});
+ }).payloadDecoder(PayloadDecoder.ZERO_COPY).bindNow(TcpServerTransport.create(host.getHost(), host.getPort()));
+ sink.onCancel(() -> {
+ var inSub = inSubRef.get();
+ if (inSub != null) {
+ inSub.cancel();
+ }
+ });
+ sink.onDispose(() -> {
+ var in = inRef.get();
+ if (in != null) {
+ in.dispose();
+ }
+ server.dispose();
+ });
+ }).subscribeOn(Schedulers.boundedElastic()).log("RSOCKET_CONSUMER_SERVER", Level.FINE)
+ .retryWhen(Retry
+ .backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
+ .maxBackoff(Duration.ofSeconds(16))
+ .jitter(1.0)
+ .doBeforeRetry(rs -> LOG.warn("Failed to consume as server, retrying. {}", rs)));
+ });*/
+ /*
+ return Flux.>create(sink -> {
+ RSocketServer
+ .create((setup, socket) -> {
+ socket.requestStream(DefaultPayload.create("", "consume")).map(payload -> {
+ ByteBuf slice = payload.sliceData();
+ var data = new byte[slice.readableBytes()];
+ slice.readBytes(data, 0, data.length);
+ //noinspection unchecked
+ return new Timestamped<>(System.currentTimeMillis(), (T) deserializer.deserialize(null, data));
+ }).subscribe(new CoreSubscriber<>() {
+ @Override
+ public void onSubscribe(@NotNull Subscription s) {
+ sink.onDispose(() -> {
+ s.cancel();
+ socket.dispose();
+ });
+ sink.onRequest(n -> {
+ if (n > 8192) {
+ throw new UnsupportedOperationException(
+ "Requests count is bigger than max buffer size! " + n + " > " + 8192);
+ }
+ s.request(n);
+ });
+ sink.onCancel(() -> s.cancel());
+ }
+
+ @Override
+ public void onNext(Timestamped val) {
+ sink.next(val);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ sink.error(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ sink.complete();
+ }
+ });
+ return Mono.just(socket);
+ })
+ .payloadDecoder(PayloadDecoder.ZERO_COPY)
+ .bind(TcpServerTransport.create(host.getHost(), host.getPort()))
+ .subscribeOn(Schedulers.parallel())
+ .subscribe(v -> {
+ sink.onDispose(v);
+ }, sink::error, sink::complete);
+ })
+ .retryWhen(Retry
+ .backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
+ .maxBackoff(Duration.ofSeconds(16))
+ .jitter(1.0)
+ .doBeforeRetry(rs -> LOG.warn("Failed to consume as server, retrying. {}", rs)));
+ */
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java
new file mode 100644
index 0000000..19308c5
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java
@@ -0,0 +1,79 @@
+package it.tdlight.reactiveapi.rsocket;
+
+import com.google.common.net.HostAndPort;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.core.RSocketConnector;
+import io.rsocket.core.Resume;
+import io.rsocket.frame.decoder.PayloadDecoder;
+import io.rsocket.transport.netty.client.TcpClientTransport;
+import io.rsocket.util.DefaultPayload;
+import it.tdlight.reactiveapi.ChannelCodec;
+import it.tdlight.reactiveapi.EventProducer;
+import it.tdlight.reactiveapi.RSocketParameters;
+import it.tdlight.reactiveapi.ReactorUtils;
+import it.tdlight.reactiveapi.Timestamped;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.logging.Level;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Empty;
+import reactor.util.retry.Retry;
+
+public final class RSocketProduceAsClient implements EventProducer {
+
+ private static final Logger LOG = LogManager.getLogger(RSocketProduceAsClient.class);
+ private final ChannelCodec channelCodec;
+ private final String channelName;
+ private final HostAndPort host;
+ private final Empty closeRequest = Sinks.empty();
+
+ public RSocketProduceAsClient(HostAndPort host, ChannelCodec channelCodec, String channelName) {
+ this.channelCodec = channelCodec;
+ this.channelName = channelName;
+ this.host = host;
+ }
+
+ @Override
+ public ChannelCodec getChannelCodec() {
+ return channelCodec;
+ }
+
+ @Override
+ public String getChannelName() {
+ return channelName;
+ }
+
+ @Override
+ public Mono sendMessages(Flux eventsFlux) {
+ Serializer serializer = channelCodec.getNewSerializer();
+ Flux serializedEventsFlux = eventsFlux
+ .map(event -> DefaultPayload.create(serializer.serialize(null, event)))
+ .log("RSOCKET_PRODUCER_CLIENT", Level.FINE)
+ .doFinally(s -> LOG.debug("Events flux ended: {}", s));
+
+ return
+ RSocketConnector.create()
+ .payloadDecoder(PayloadDecoder.ZERO_COPY)
+ .setupPayload(DefaultPayload.create("", "connect"))
+ .acceptor(SocketAcceptor.forRequestStream(payload -> serializedEventsFlux))
+ .connect(TcpClientTransport.create(host.getHost(), host.getPort()))
+ .flatMap(rSocket -> rSocket.onClose()
+ .takeUntilOther(closeRequest.asMono().doFinally(s -> rSocket.dispose())))
+ .log("RSOCKET_PRODUCER_CLIENT_Y", Level.FINE);
+ }
+
+ @Override
+ public void close() {
+ closeRequest.tryEmitEmpty();
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java
new file mode 100644
index 0000000..6da58da
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java
@@ -0,0 +1,100 @@
+package it.tdlight.reactiveapi.rsocket;
+
+import com.google.common.net.HostAndPort;
+import io.netty.buffer.Unpooled;
+import io.rsocket.ConnectionSetupPayload;
+import io.rsocket.Payload;
+import io.rsocket.RSocket;
+import io.rsocket.SocketAcceptor;
+import io.rsocket.core.RSocketServer;
+import io.rsocket.core.Resume;
+import io.rsocket.frame.decoder.PayloadDecoder;
+import io.rsocket.transport.netty.server.CloseableChannel;
+import io.rsocket.transport.netty.server.TcpServerTransport;
+import io.rsocket.util.DefaultPayload;
+import it.tdlight.reactiveapi.ChannelCodec;
+import it.tdlight.reactiveapi.EventProducer;
+import it.tdlight.reactiveapi.ReactorUtils;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.publisher.Sinks.Empty;
+
+public final class RSocketProduceAsServer implements EventProducer {
+
+ private static final Logger LOG = LogManager.getLogger(RSocketProduceAsServer.class);
+ private final ChannelCodec channelCodec;
+ private final String channelName;
+ private final HostAndPort host;
+
+ private final Empty closeRequest = Sinks.empty();
+
+ public RSocketProduceAsServer(HostAndPort hostAndPort, ChannelCodec channelCodec, String channelName) {
+ this.host = hostAndPort;
+ this.channelCodec = channelCodec;
+ this.channelName = channelName;
+ }
+
+ @Override
+ public ChannelCodec getChannelCodec() {
+ return channelCodec;
+ }
+
+ @Override
+ public String getChannelName() {
+ return channelName;
+ }
+
+ @Override
+ public Mono sendMessages(Flux eventsFlux) {
+ return Mono.defer(()-> {
+ AtomicReference serverRef = new AtomicReference<>();
+ Serializer serializer = channelCodec.getNewSerializer();
+ Flux serializedEventsFlux = eventsFlux
+ .log("RSOCKET_PRODUCER_SERVER", Level.FINE)
+ .map(event -> DefaultPayload.create(serializer.serialize(null, event)))
+ .doFinally(s -> LOG.debug("Events flux ended: {}", s));
+
+ return RSocketServer
+ .create(new SocketAcceptor() {
+ @Override
+ public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) {
+ return Mono.just(new RSocket() {
+ @Override
+ public @NotNull Mono fireAndForget(@NotNull Payload payload) {
+ return Mono.fromRunnable(() -> {
+ var srv = serverRef.get();
+ if (srv != null) {
+ srv.dispose();
+ }
+ });
+ }
+
+ @Override
+ public @NotNull Flux requestStream(@NotNull Payload payload) {
+ return serializedEventsFlux;
+ }
+ });
+ }
+ })
+ .resume(new Resume())
+ .payloadDecoder(PayloadDecoder.ZERO_COPY)
+ .bind(TcpServerTransport.create(host.getHost(), host.getPort()))
+ .doOnNext(serverRef::set)
+ .flatMap(closeableChannel -> closeableChannel.onClose()
+ .takeUntilOther(closeRequest.asMono().doFinally(s -> closeableChannel.dispose())));
+ });
+ }
+
+ @Override
+ public void close() {
+ closeRequest.tryEmitEmpty();
+ }
+}
diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java
index 23581c6..11b01c7 100644
--- a/src/main/java/module-info.java
+++ b/src/main/java/module-info.java
@@ -1,6 +1,8 @@
module tdlib.reactive.api {
exports it.tdlight.reactiveapi;
exports it.tdlight.reactiveapi.generated;
+ exports it.tdlight.reactiveapi.rsocket;
+ exports it.tdlight.reactiveapi.kafka;
requires com.fasterxml.jackson.annotation;
requires org.jetbrains.annotations;
requires org.slf4j;
@@ -22,4 +24,8 @@ module tdlib.reactive.api {
requires jdk.unsupported;
requires jakarta.xml.bind;
requires reactor.core;
+ requires rsocket.core;
+ requires rsocket.transport.local;
+ requires rsocket.transport.netty;
+ requires io.netty.buffer;
}
\ No newline at end of file
diff --git a/src/test/java/it/tdlight/reactiveapi/test/BombDeserializer.java b/src/test/java/it/tdlight/reactiveapi/test/BombDeserializer.java
new file mode 100644
index 0000000..2eecda8
--- /dev/null
+++ b/src/test/java/it/tdlight/reactiveapi/test/BombDeserializer.java
@@ -0,0 +1,11 @@
+package it.tdlight.reactiveapi.test;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+public final class BombDeserializer implements Deserializer