From 03b8cfa5794c0fd10e2fcb33ea853a2c637b7e4d Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Wed, 5 Oct 2022 02:26:30 +0200 Subject: [PATCH] Add RSocket --- pom.xml | 12 + .../reactiveapi/AtomixReactiveApi.java | 83 ++++--- .../BaseAtomixReactiveApiClient.java | 6 +- .../java/it/tdlight/reactiveapi/Channel.java | 22 ++ .../it/tdlight/reactiveapi/ChannelCodec.java | 43 ++-- .../ChannelConsumerClientBoundEvent.java | 14 +- .../ChannelConsumerTdlibRequest.java | 6 +- .../ChannelConsumerTdlibResponse.java | 6 +- .../tdlight/reactiveapi/ChannelFactory.java | 52 +++- .../ChannelProducerClientBoundEvent.java | 13 +- .../ChannelProducerTdlibRequest.java | 6 +- .../ChannelProducerTdlibResponse.java | 6 +- .../reactiveapi/ChannelsParameters.java | 8 + .../reactiveapi/ClientsSharedTdlib.java | 16 +- .../tdlight/reactiveapi/ClusterSettings.java | 17 +- .../it/tdlight/reactiveapi/Entrypoint.java | 74 +++--- .../it/tdlight/reactiveapi/EventConsumer.java | 4 +- .../tdlight/reactiveapi/InstanceSettings.java | 45 ++-- .../it/tdlight/reactiveapi/InstanceType.java | 6 + .../tdlight/reactiveapi/KafkaParameters.java | 12 +- .../LiveAtomixReactiveApiClient.java | 12 +- .../reactiveapi/RSocketParameters.java | 63 +++++ .../reactiveapi/ReactiveApiPublisher.java | 26 +- .../it/tdlight/reactiveapi/ReactorUtils.java | 161 ++++++++++++ .../TdlibChannelsSharedServer.java | 15 +- .../java/it/tdlight/reactiveapi/UtfCodec.java | 28 +++ .../reactiveapi/kafka/KafkaConsumer.java | 11 +- .../reactiveapi/kafka/KafkaProducer.java | 2 +- .../rsocket/RSocketConsumeAsClient.java | 75 ++++++ .../rsocket/RSocketConsumeAsServer.java | 230 ++++++++++++++++++ .../rsocket/RSocketProduceAsClient.java | 79 ++++++ .../rsocket/RSocketProduceAsServer.java | 100 ++++++++ src/main/java/module-info.java | 6 + .../reactiveapi/test/BombDeserializer.java | 11 + .../reactiveapi/test/BombSerializer.java | 11 + .../reactiveapi/test/FakeException.java | 3 + .../tdlight/reactiveapi/test/TestChannel.java | 200 +++++++++++++++ .../test/TestClientToServerChannel.java | 35 +++ .../test/TestServerToClientChannel.java | 42 ++++ src/test/java/module-info.java | 12 + src/test/resources/log4j2.xml | 26 ++ 41 files changed, 1401 insertions(+), 198 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/Channel.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ChannelsParameters.java create mode 100644 src/main/java/it/tdlight/reactiveapi/InstanceType.java create mode 100644 src/main/java/it/tdlight/reactiveapi/RSocketParameters.java create mode 100644 src/main/java/it/tdlight/reactiveapi/UtfCodec.java create mode 100644 src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java create mode 100644 src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java create mode 100644 src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java create mode 100644 src/test/java/it/tdlight/reactiveapi/test/BombDeserializer.java create mode 100644 src/test/java/it/tdlight/reactiveapi/test/BombSerializer.java create mode 100644 src/test/java/it/tdlight/reactiveapi/test/FakeException.java create mode 100644 src/test/java/it/tdlight/reactiveapi/test/TestChannel.java create mode 100644 src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java create mode 100644 src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java create mode 100644 src/test/java/module-info.java create mode 100644 src/test/resources/log4j2.xml diff --git a/pom.xml b/pom.xml index 8f2d48a..6586969 100644 --- a/pom.xml +++ b/pom.xml @@ -155,6 +155,18 @@ log4j-api 2.19.0 + + org.apache.logging.log4j + log4j-core + 2.19.0 + test + + + org.apache.logging.log4j + log4j-slf4j20-impl + 2.18.1-SNAPSHOT + test + com.lmax disruptor diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index fc2c5fc..6564681 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -6,7 +6,6 @@ import it.tdlight.jni.TdApi.Object; import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest; -import it.tdlight.reactiveapi.ChannelFactory.KafkaChannelFactory; import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnResponse; @@ -36,9 +35,9 @@ public class AtomixReactiveApi implements ReactiveApi { private final AtomixReactiveApiMode mode; - private final ClientsSharedTdlib kafkaSharedTdlibClients; + private final ClientsSharedTdlib sharedTdlibClients; @Nullable - private final TdlibChannelsSharedServer kafkaSharedTdlibServers; + private final TdlibChannelsSharedServer sharedTdlibServers; private final ReactiveApiMultiClient client; private final Set resultingEventTransformerSet; @@ -61,42 +60,42 @@ public class AtomixReactiveApi implements ReactiveApi { } public AtomixReactiveApi(AtomixReactiveApiMode mode, - KafkaParameters kafkaParameters, + ChannelsParameters channelsParameters, @Nullable DiskSessionsManager diskSessions, @NotNull Set resultingEventTransformerSet) { this.mode = mode; - ChannelFactory channelFactory = new KafkaChannelFactory(); + ChannelFactory channelFactory = ChannelFactory.getFactoryFromParameters(channelsParameters); if (mode != AtomixReactiveApiMode.SERVER) { - EventProducer> kafkaTDLibRequestProducer = ChannelProducerTdlibRequest.create(channelFactory, kafkaParameters); - EventConsumer> kafkaTDLibResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory, kafkaParameters); - HashMap> kafkaClientBoundConsumers = new HashMap<>(); - for (String lane : kafkaParameters.getAllLanes()) { - kafkaClientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, kafkaParameters, lane)); + EventProducer> tdRequestProducer = ChannelProducerTdlibRequest.create(channelFactory, channelsParameters); + EventConsumer> tdResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory, channelsParameters); + HashMap> clientBoundConsumers = new HashMap<>(); + for (String lane : channelsParameters.getAllLanes()) { + clientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, channelsParameters, lane)); } - var kafkaTdlibClientsChannels = new TdlibChannelsClients(kafkaTDLibRequestProducer, - kafkaTDLibResponseConsumer, - kafkaClientBoundConsumers + var tdClientsChannels = new TdlibChannelsClients(tdRequestProducer, + tdResponseConsumer, + clientBoundConsumers ); - this.kafkaSharedTdlibClients = new ClientsSharedTdlib(kafkaTdlibClientsChannels); - this.client = new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients); + this.sharedTdlibClients = new ClientsSharedTdlib(tdClientsChannels); + this.client = new LiveAtomixReactiveApiClient(sharedTdlibClients); } else { - this.kafkaSharedTdlibClients = null; + this.sharedTdlibClients = null; this.client = null; } if (mode != AtomixReactiveApiMode.CLIENT) { - EventConsumer> kafkaTDLibRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory, kafkaParameters); - EventProducer> kafkaTDLibResponseProducer = ChannelProducerTdlibResponse.create(channelFactory, kafkaParameters); - var kafkaClientBoundProducers = new HashMap>(); - for (String lane : kafkaParameters.getAllLanes()) { - kafkaClientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, kafkaParameters, lane)); + EventConsumer> tdRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory, channelsParameters); + EventProducer> tdResponseProducer = ChannelProducerTdlibResponse.create(channelFactory, channelsParameters); + var clientBoundProducers = new HashMap>(); + for (String lane : channelsParameters.getAllLanes()) { + clientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, channelsParameters, lane)); } - var kafkaTDLibServer = new TdlibChannelsServers(kafkaTDLibRequestConsumer, - kafkaTDLibResponseProducer, - kafkaClientBoundProducers + var tdServer = new TdlibChannelsServers(tdRequestConsumer, + tdResponseProducer, + clientBoundProducers ); - this.kafkaSharedTdlibServers = new TdlibChannelsSharedServer(kafkaTDLibServer); + this.sharedTdlibServers = new TdlibChannelsSharedServer(tdServer); } else { - this.kafkaSharedTdlibServers = null; + this.sharedTdlibServers = null; } this.resultingEventTransformerSet = resultingEventTransformerSet; @@ -142,8 +141,8 @@ public class AtomixReactiveApi implements ReactiveApi { .doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk")); return loadSessions.then(Mono.fromRunnable(() -> { - if (kafkaSharedTdlibServers != null) { - requestsSub = kafkaSharedTdlibServers.requests() + if (sharedTdlibServers != null) { + requestsSub = sharedTdlibServers.requests() .onBackpressureError() .doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data())) .subscribeOn(Schedulers.parallel()) @@ -173,7 +172,7 @@ public class AtomixReactiveApi implements ReactiveApi { botToken = createBotSessionRequest.token(); phoneNumber = null; lane = createBotSessionRequest.lane(); - reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromToken(sharedTdlibServers, resultingEventTransformerSet, userId, botToken, lane @@ -184,7 +183,7 @@ public class AtomixReactiveApi implements ReactiveApi { botToken = null; phoneNumber = createUserSessionRequest.phoneNumber(); lane = createUserSessionRequest.lane(); - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet, + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(sharedTdlibServers, resultingEventTransformerSet, userId, phoneNumber, lane @@ -196,14 +195,14 @@ public class AtomixReactiveApi implements ReactiveApi { phoneNumber = loadSessionFromDiskRequest.phoneNumber(); lane = loadSessionFromDiskRequest.lane(); if (loadSessionFromDiskRequest.phoneNumber() != null) { - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(sharedTdlibServers, resultingEventTransformerSet, userId, phoneNumber, lane ); } else { - reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, + reactiveApiPublisher = ReactiveApiPublisher.fromToken(sharedTdlibServers, resultingEventTransformerSet, userId, botToken, @@ -279,24 +278,24 @@ public class AtomixReactiveApi implements ReactiveApi { @Override public Mono close() { closeRequested = true; - Mono kafkaServerProducersStopper; - if (kafkaSharedTdlibServers != null) { - kafkaServerProducersStopper = Mono.fromRunnable(kafkaSharedTdlibServers::close).subscribeOn(Schedulers.boundedElastic()); + Mono serverProducersStopper; + if (sharedTdlibServers != null) { + serverProducersStopper = Mono.fromRunnable(sharedTdlibServers::close).subscribeOn(Schedulers.boundedElastic()); } else { - kafkaServerProducersStopper = Mono.empty(); + serverProducersStopper = Mono.empty(); } - Mono kafkaClientProducersStopper; - if (kafkaSharedTdlibClients != null) { - kafkaClientProducersStopper = Mono - .fromRunnable(kafkaSharedTdlibClients::close) + Mono clientProducersStopper; + if (sharedTdlibClients != null) { + clientProducersStopper = Mono + .fromRunnable(sharedTdlibClients::close) .subscribeOn(Schedulers.boundedElastic()); } else { - kafkaClientProducersStopper = Mono.empty(); + clientProducersStopper = Mono.empty(); } if (requestsSub != null) { requestsSub.dispose(); } - return Mono.when(kafkaServerProducersStopper, kafkaClientProducersStopper); + return Mono.when(serverProducersStopper, clientProducersStopper); } @Override diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 3b5bdd0..9e71705 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -52,11 +52,11 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { private final AtomicLong requestId = new AtomicLong(0); private final Disposable subscription; - public BaseAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) { + public BaseAtomixReactiveApiClient(ClientsSharedTdlib sharedTdlibClients) { this.clientId = System.nanoTime(); - this.requests = kafkaSharedTdlibClients.requests(); + this.requests = sharedTdlibClients.requests(); - this.subscription = kafkaSharedTdlibClients.responses().doOnNext(response -> { + this.subscription = sharedTdlibClients.responses().doOnNext(response -> { var responseSink = responses.get(response.data().requestId()); if (responseSink == null) { LOG.debug("Bot received a response for an unknown request id: {}", response.data().requestId()); diff --git a/src/main/java/it/tdlight/reactiveapi/Channel.java b/src/main/java/it/tdlight/reactiveapi/Channel.java new file mode 100644 index 0000000..912a862 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/Channel.java @@ -0,0 +1,22 @@ +package it.tdlight.reactiveapi; + +public enum Channel { + CLIENT_BOUND_EVENT("event"), + TDLIB_REQUEST("request"), + TDLIB_RESPONSE("response"); + + private final String name; + + Channel(String name) { + this.name = name; + } + + public String getChannelName() { + return name; + } + + @Override + public String toString() { + return name; + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java index 2312651..da307fe 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelCodec.java @@ -1,26 +1,24 @@ package it.tdlight.reactiveapi; -public enum ChannelCodec { - CLIENT_BOUND_EVENT("event", ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class), - TDLIB_REQUEST("request", TdlibRequestSerializer.class, TdlibRequestDeserializer.class), - TDLIB_RESPONSE("response", TdlibResponseSerializer.class, TdlibResponseDeserializer.class); +import java.lang.reflect.InvocationTargetException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +public class ChannelCodec { + public static final ChannelCodec CLIENT_BOUND_EVENT = new ChannelCodec(ClientBoundEventSerializer.class, ClientBoundEventDeserializer.class); + public static final ChannelCodec TDLIB_REQUEST = new ChannelCodec(TdlibRequestSerializer.class, TdlibRequestDeserializer.class); + public static final ChannelCodec TDLIB_RESPONSE = new ChannelCodec(TdlibResponseSerializer.class, TdlibResponseDeserializer.class); + public static final ChannelCodec UTF8_TEST = new ChannelCodec(UtfCodec.class, UtfCodec.class); - private final String name; private final Class serializerClass; private final Class deserializerClass; - ChannelCodec(String kafkaName, - Class serializerClass, + public ChannelCodec(Class serializerClass, Class deserializerClass) { - this.name = kafkaName; this.serializerClass = serializerClass; this.deserializerClass = deserializerClass; } - public String getKafkaName() { - return name; - } - public Class getSerializerClass() { return serializerClass; } @@ -29,8 +27,23 @@ public enum ChannelCodec { return deserializerClass; } - @Override - public String toString() { - return name; + public Deserializer getNewDeserializer() { + try { + //noinspection unchecked + return (Deserializer) deserializerClass.getDeclaredConstructor().newInstance(); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | + ClassCastException e) { + throw new IllegalStateException("Can't instantiate the codec deserializer", e); + } + } + + public Serializer getNewSerializer() { + try { + //noinspection unchecked + return (Serializer) serializerClass.getDeclaredConstructor().newInstance(); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | + ClassCastException e) { + throw new IllegalStateException("Can't instantiate the codec serializer", e); + } } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java index 7cb6c49..ec492fc 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java @@ -1,22 +1,22 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import org.jetbrains.annotations.NotNull; public class ChannelConsumerClientBoundEvent { private ChannelConsumerClientBoundEvent() { } - public static EventConsumer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters, - String lane) { - var codec = ChannelCodec.CLIENT_BOUND_EVENT; + public static EventConsumer create(ChannelFactory channelFactory, ChannelsParameters channelsParameters, + @NotNull String lane) { String name; - if (lane.isBlank()) { - name = codec.getKafkaName(); + if (lane.isEmpty()) { + name = Channel.CLIENT_BOUND_EVENT.getChannelName(); } else { - name = codec.getKafkaName() + "-" + lane; + name = Channel.CLIENT_BOUND_EVENT.getChannelName() + "-" + lane; } - return channelFactory.newConsumer(kafkaParameters, false, codec, name); + return channelFactory.newConsumer(channelsParameters, false, ChannelCodec.CLIENT_BOUND_EVENT, name); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java index 3af9244..2bcc837 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java @@ -8,11 +8,11 @@ public class ChannelConsumerTdlibRequest { private ChannelConsumerTdlibRequest() { } - public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { - return channelFactory.newConsumer(kafkaParameters, + public static EventConsumer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { + return channelFactory.newConsumer(channelsParameters, true, ChannelCodec.TDLIB_REQUEST, - ChannelCodec.TDLIB_REQUEST.getKafkaName() + Channel.TDLIB_REQUEST.getChannelName() ); } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java index d504f89..1353e19 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java @@ -8,11 +8,11 @@ public class ChannelConsumerTdlibResponse { private ChannelConsumerTdlibResponse() { } - public static EventConsumer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { - return channelFactory.newConsumer(kafkaParameters, + public static EventConsumer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { + return channelFactory.newConsumer(channelsParameters, true, ChannelCodec.TDLIB_RESPONSE, - ChannelCodec.TDLIB_RESPONSE.getKafkaName() + Channel.TDLIB_RESPONSE.getChannelName() ); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java index 35855c6..cc77481 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java @@ -2,33 +2,73 @@ package it.tdlight.reactiveapi; import it.tdlight.reactiveapi.kafka.KafkaConsumer; import it.tdlight.reactiveapi.kafka.KafkaProducer; +import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsClient; +import it.tdlight.reactiveapi.rsocket.RSocketProduceAsServer; +import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsServer; +import it.tdlight.reactiveapi.rsocket.RSocketProduceAsClient; public interface ChannelFactory { - EventConsumer newConsumer(KafkaParameters kafkaParameters, + static ChannelFactory getFactoryFromParameters(ChannelsParameters channelsParameters) { + if (channelsParameters instanceof KafkaParameters) { + return new KafkaChannelFactory(); + } else { + return new RsocketChannelFactory(); + } + } + + EventConsumer newConsumer(ChannelsParameters channelsParameters, boolean quickResponse, ChannelCodec channelCodec, String channelName); - EventProducer newProducer(KafkaParameters kafkaParameters, + EventProducer newProducer(ChannelsParameters channelsParameters, ChannelCodec channelCodec, String channelName); class KafkaChannelFactory implements ChannelFactory { @Override - public EventConsumer newConsumer(KafkaParameters kafkaParameters, + public EventConsumer newConsumer(ChannelsParameters channelsParameters, boolean quickResponse, ChannelCodec channelCodec, String channelName) { - return new KafkaConsumer<>(kafkaParameters, quickResponse, channelCodec, channelName); + return new KafkaConsumer<>((KafkaParameters) channelsParameters, quickResponse, channelCodec, channelName); } @Override - public EventProducer newProducer(KafkaParameters kafkaParameters, + public EventProducer newProducer(ChannelsParameters channelsParameters, ChannelCodec channelCodec, String channelName) { - return new KafkaProducer<>(kafkaParameters, channelCodec, channelName); + return new KafkaProducer<>((KafkaParameters) channelsParameters, channelCodec, channelName); + } + } + + class RsocketChannelFactory implements ChannelFactory { + + @Override + public EventConsumer newConsumer(ChannelsParameters channelsParameters, + boolean quickResponse, + ChannelCodec channelCodec, + String channelName) { + var socketParameters = (RSocketParameters) channelsParameters; + if (socketParameters.isClient()) { + return new RSocketConsumeAsClient<>(socketParameters.host(), channelCodec, channelName); + } else { + return new RSocketConsumeAsServer<>(socketParameters.host(), channelCodec, channelName); + } + } + + @Override + public EventProducer newProducer(ChannelsParameters channelsParameters, + ChannelCodec channelCodec, + String channelName) { + var socketParameters = (RSocketParameters) channelsParameters; + if (socketParameters.isClient()) { + return new RSocketProduceAsServer<>(socketParameters.host(), channelCodec, channelName); + } else { + return new RSocketProduceAsClient<>(socketParameters.host(), channelCodec, channelName); + } } } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java index 88eea09..d22c9ce 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java @@ -4,21 +4,16 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; public class ChannelProducerClientBoundEvent { - private static final ChannelCodec CODEC = ChannelCodec.CLIENT_BOUND_EVENT; - private ChannelProducerClientBoundEvent() { } - public static EventProducer create(ChannelFactory channelFactory, KafkaParameters kafkaParameters, String lane) { + public static EventProducer create(ChannelFactory channelFactory, ChannelsParameters channelsParameters, String lane) { String name; if (lane.isBlank()) { - name = CODEC.getKafkaName(); + name = Channel.CLIENT_BOUND_EVENT.getChannelName(); } else { - name = CODEC.getKafkaName() + "-" + lane; + name = Channel.CLIENT_BOUND_EVENT.getChannelName() + "-" + lane; } - return channelFactory.newProducer(kafkaParameters, - CODEC, - name - ); + return channelFactory.newProducer(channelsParameters, ChannelCodec.CLIENT_BOUND_EVENT, name); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java index 58acad7..9dad97d 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java @@ -7,10 +7,10 @@ public class ChannelProducerTdlibRequest { private ChannelProducerTdlibRequest() { } - public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { - return channelFactory.newProducer(kafkaParameters, + public static EventProducer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { + return channelFactory.newProducer(channelsParameters, ChannelCodec.TDLIB_REQUEST, - ChannelCodec.TDLIB_REQUEST.getKafkaName() + Channel.TDLIB_REQUEST.getChannelName() ); } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java index 7d914da..92be3df 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java @@ -8,10 +8,10 @@ public class ChannelProducerTdlibResponse { private ChannelProducerTdlibResponse() { } - public static EventProducer> create(ChannelFactory channelFactory, KafkaParameters kafkaParameters) { - return channelFactory.newProducer(kafkaParameters, + public static EventProducer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { + return channelFactory.newProducer(channelsParameters, ChannelCodec.TDLIB_RESPONSE, - ChannelCodec.TDLIB_RESPONSE.getKafkaName() + Channel.TDLIB_RESPONSE.getChannelName() ); } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelsParameters.java b/src/main/java/it/tdlight/reactiveapi/ChannelsParameters.java new file mode 100644 index 0000000..dbd8561 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ChannelsParameters.java @@ -0,0 +1,8 @@ +package it.tdlight.reactiveapi; + +import java.util.Set; + +public interface ChannelsParameters { + + Set getAllLanes(); +} diff --git a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java index 789af0d..a4578bd 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java +++ b/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java @@ -22,7 +22,7 @@ public class ClientsSharedTdlib implements Closeable { private static final Logger LOG = LogManager.getLogger(ClientsSharedTdlib.class); - private final TdlibChannelsClients kafkaTdlibClientsChannels; + private final TdlibChannelsClients tdClientsChannels; private final AtomicReference responsesSub = new AtomicReference<>(); private final Disposable requestsSub; private final AtomicReference eventsSub = new AtomicReference<>(); @@ -31,12 +31,12 @@ public class ClientsSharedTdlib implements Closeable { private final Many> requests = Sinks.many().unicast() .onBackpressureBuffer(Queues.>get(65535).get()); - public ClientsSharedTdlib(TdlibChannelsClients kafkaTdlibClientsChannels) { - this.kafkaTdlibClientsChannels = kafkaTdlibClientsChannels; - this.responses = kafkaTdlibClientsChannels.response().consumeMessages("td-responses"); - this.events = kafkaTdlibClientsChannels.events().entrySet().stream() - .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages(e.getKey()))); - this.requestsSub = kafkaTdlibClientsChannels.request() + public ClientsSharedTdlib(TdlibChannelsClients tdClientsChannels) { + this.tdClientsChannels = tdClientsChannels; + this.responses = tdClientsChannels.response().consumeMessages(); + this.events = tdClientsChannels.events().entrySet().stream() + .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages())); + this.requestsSub = tdClientsChannels.request() .sendMessages(requests.asFlux()) .subscribeOn(Schedulers.parallel()) .subscribe(); @@ -73,6 +73,6 @@ public class ClientsSharedTdlib implements Closeable { if (eventsSub != null) { eventsSub.dispose(); } - kafkaTdlibClientsChannels.close(); + tdClientsChannels.close(); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java index 61672e4..23affcb 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java +++ b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java @@ -3,6 +3,7 @@ package it.tdlight.reactiveapi; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import org.jetbrains.annotations.Nullable; /** * Define the cluster structure @@ -11,14 +12,28 @@ public class ClusterSettings { public String id; public List kafkaBootstrapServers; + public String rsocketHost; public List lanes; @JsonCreator public ClusterSettings(@JsonProperty(required = true, value = "id") String id, - @JsonProperty(required = true, value = "kafkaBootstrapServers") List kafkaBootstrapServers, + @JsonProperty(value = "kafkaBootstrapServers") List kafkaBootstrapServers, + @JsonProperty(value = "rsocketHost") String rsocketHost, @JsonProperty(required = true, value = "lanes") List lanes) { this.id = id; this.kafkaBootstrapServers = kafkaBootstrapServers; + this.rsocketHost = rsocketHost; this.lanes = lanes; + if ((rsocketHost == null) == (kafkaBootstrapServers == null || kafkaBootstrapServers.isEmpty())) { + throw new IllegalArgumentException("Please configure either RSocket or Kafka"); + } + } + + public ChannelsParameters toParameters(String clientId, InstanceType instanceType) { + if (rsocketHost != null) { + return new RSocketParameters(instanceType, rsocketHost, lanes); + } else { + return new KafkaParameters(clientId, clientId, kafkaBootstrapServers, List.copyOf(lanes)); + } } } diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java index c69b027..2b907f1 100644 --- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java +++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java @@ -9,11 +9,8 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,11 +49,10 @@ public class Entrypoint { String diskSessionsConfigPath = args.diskSessionsPath; clusterSettings = mapper.readValue(Paths.get(clusterConfigPath).toFile(), ClusterSettings.class); instanceSettings = mapper.readValue(Paths.get(instanceConfigPath).toFile(), InstanceSettings.class); - if (instanceSettings.client) { - diskSessions = null; - } else { - diskSessions = new DiskSessionsManager(mapper, diskSessionsConfigPath); - } + diskSessions = switch (instanceSettings.instanceType) { + case TDLIB -> new DiskSessionsManager(mapper, diskSessionsConfigPath); + case UPDATES_CONSUMER -> null; + }; } return start(clusterSettings, instanceSettings, diskSessions); @@ -68,43 +64,47 @@ public class Entrypoint { Set resultingEventTransformerSet; AtomixReactiveApiMode mode = AtomixReactiveApiMode.SERVER; - if (instanceSettings.client) { - if (diskSessions != null) { - throw new IllegalArgumentException("A client instance can't have a session manager!"); - } - if (instanceSettings.clientAddress == null) { - throw new IllegalArgumentException("A client instance must have an address (host:port)"); - } - mode = AtomixReactiveApiMode.CLIENT; - resultingEventTransformerSet = Set.of(); - } else { - if (diskSessions == null) { - throw new IllegalArgumentException("A full instance must have a session manager!"); + switch (instanceSettings.instanceType) { + case UPDATES_CONSUMER -> { + if (diskSessions != null) { + throw new IllegalArgumentException("An updates-consumer instance can't have a session manager!"); + } + if (instanceSettings.listenAddress == null) { + throw new IllegalArgumentException("An updates-consumer instance must have an address (host:port)"); + } + mode = AtomixReactiveApiMode.CLIENT; + resultingEventTransformerSet = Set.of(); } + case TDLIB -> { + if (diskSessions == null) { + throw new IllegalArgumentException("A tdlib instance must have a session manager!"); + } - resultingEventTransformerSet = new HashSet<>(); - if (instanceSettings.resultingEventTransformers != null) { - for (var resultingEventTransformer: instanceSettings.resultingEventTransformers) { - try { - var instance = resultingEventTransformer.getConstructor().newInstance(); - resultingEventTransformerSet.add(instance); - LOG.info("Loaded and applied resulting event transformer: " + resultingEventTransformer.getName()); - } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { - throw new IllegalArgumentException("Failed to load resulting event transformer: " - + resultingEventTransformer.getName()); - } catch (NoSuchMethodException e) { - throw new IllegalArgumentException("The client transformer must declare an empty constructor: " - + resultingEventTransformer.getName()); + resultingEventTransformerSet = new HashSet<>(); + if (instanceSettings.resultingEventTransformers != null) { + for (var resultingEventTransformer: instanceSettings.resultingEventTransformers) { + try { + var instance = resultingEventTransformer.getConstructor().newInstance(); + resultingEventTransformerSet.add(instance); + LOG.info("Loaded and applied resulting event transformer: " + resultingEventTransformer.getName()); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalArgumentException("Failed to load resulting event transformer: " + + resultingEventTransformer.getName()); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("The client transformer must declare an empty constructor: " + + resultingEventTransformer.getName()); + } } } - } - resultingEventTransformerSet = unmodifiableSet(resultingEventTransformerSet); + resultingEventTransformerSet = unmodifiableSet(resultingEventTransformerSet); + } + default -> throw new UnsupportedOperationException("Unsupported instance type: " + instanceSettings.instanceType); } - var kafkaParameters = new KafkaParameters(clusterSettings, instanceSettings.id); + ChannelsParameters channelsParameters = clusterSettings.toParameters(instanceSettings.id, instanceSettings.instanceType); - var api = new AtomixReactiveApi(mode, kafkaParameters, diskSessions, resultingEventTransformerSet); + var api = new AtomixReactiveApi(mode, channelsParameters, diskSessions, resultingEventTransformerSet); LOG.info("Starting ReactiveApi..."); diff --git a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java index 409f2c4..4a6d689 100644 --- a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java @@ -5,11 +5,9 @@ import reactor.core.publisher.Flux; public interface EventConsumer { - boolean isQuickResponse(); - ChannelCodec getChannelCodec(); String getChannelName(); - Flux> consumeMessages(@NotNull String subGroupId); + Flux> consumeMessages(); } diff --git a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java index abd86c3..bff1b21 100644 --- a/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java +++ b/src/main/java/it/tdlight/reactiveapi/InstanceSettings.java @@ -3,7 +3,7 @@ package it.tdlight.reactiveapi; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -import java.util.Set; +import java.util.Objects; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -12,30 +12,47 @@ public class InstanceSettings { @NotNull public String id; - /** - * True if this is just a client, false if this is a server - */ - public boolean client; + public InstanceType instanceType; /** - * If {@link #client} is true, this will be the address of this client + * If {@link #instanceType} is true, this will be the address of this client */ - public @Nullable String clientAddress; + public @Nullable String listenAddress; /** - * If {@link #client} is false, this will transform resulting events before being sent + * If {@link #instanceType} is false, this will transform resulting events before being sent */ public @Nullable List> resultingEventTransformers; + public InstanceSettings(@NotNull String id, + @NotNull InstanceType instanceType, + @Nullable String listenAddress, + @Nullable List> resultingEventTransformers) { + this.id = id; + this.instanceType = instanceType; + this.listenAddress = listenAddress; + this.resultingEventTransformers = resultingEventTransformers; + } + @JsonCreator public InstanceSettings(@JsonProperty(required = true, value = "id") @NotNull String id, - @JsonProperty(required = true, value = "client") boolean client, - @JsonProperty("clientAddress") @Nullable String clientAddress, - @JsonProperty("resultingEventTransformers") @Nullable - List> resultingEventTransformers) { + @Deprecated @JsonProperty(value = "client", defaultValue = "null") Boolean deprecatedIsClient, + @JsonProperty(value = "instanceType", defaultValue = "null") String instanceType, + @Deprecated @JsonProperty(value = "clientAddress", defaultValue = "null") @Nullable String deprecatedClientAddress, + @JsonProperty(value = "listenAddress", defaultValue = "null") @Nullable String listenAddress, + @JsonProperty("resultingEventTransformers") + @Nullable List> resultingEventTransformers) { this.id = id; - this.client = client; - this.clientAddress = clientAddress; + if (deprecatedIsClient != null) { + this.instanceType = deprecatedIsClient ? InstanceType.UPDATES_CONSUMER : InstanceType.TDLIB; + } else { + this.instanceType = InstanceType.valueOf(instanceType.toUpperCase()); + } + if (deprecatedClientAddress != null) { + this.listenAddress = deprecatedClientAddress; + } else { + this.listenAddress = listenAddress; + } this.resultingEventTransformers = resultingEventTransformers; } } diff --git a/src/main/java/it/tdlight/reactiveapi/InstanceType.java b/src/main/java/it/tdlight/reactiveapi/InstanceType.java new file mode 100644 index 0000000..2610571 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/InstanceType.java @@ -0,0 +1,6 @@ +package it.tdlight.reactiveapi; + +public enum InstanceType { + UPDATES_CONSUMER, + TDLIB +} diff --git a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java index 0bd5378..10bf279 100644 --- a/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java +++ b/src/main/java/it/tdlight/reactiveapi/KafkaParameters.java @@ -4,16 +4,14 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; -public record KafkaParameters(String groupId, String clientId, String bootstrapServers, List lanes) { +public record KafkaParameters(String groupId, String clientId, List bootstrapServers, + List lanes) implements ChannelsParameters { - public KafkaParameters(ClusterSettings clusterSettings, String clientId) { - this(clientId, - clientId, - String.join(",", clusterSettings.kafkaBootstrapServers), - List.copyOf(clusterSettings.lanes) - ); + public String getBootstrapServersString() { + return String.join(",", bootstrapServers); } + @Override public Set getAllLanes() { var lanes = new LinkedHashSet(this.lanes.size() + 1); lanes.add("main"); diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 2692dd6..cb53f63 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -8,21 +8,21 @@ import reactor.core.publisher.Flux; public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { - private final ClientsSharedTdlib kafkaSharedTdlibClients; + private final ClientsSharedTdlib sharedTdlibClients; - LiveAtomixReactiveApiClient(ClientsSharedTdlib kafkaSharedTdlibClients) { - super(kafkaSharedTdlibClients); - this.kafkaSharedTdlibClients = kafkaSharedTdlibClients; + LiveAtomixReactiveApiClient(ClientsSharedTdlib sharedTdlibClients) { + super(sharedTdlibClients); + this.sharedTdlibClients = sharedTdlibClients; } @Override public Flux clientBoundEvents(String lane) { - return kafkaSharedTdlibClients.events(lane).map(Timestamped::data); + return sharedTdlibClients.events(lane).map(Timestamped::data); } @Override public Map> clientBoundEvents() { - return kafkaSharedTdlibClients.events().entrySet().stream() + return sharedTdlibClients.events().entrySet().stream() .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().map(Timestamped::data))); } } diff --git a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java new file mode 100644 index 0000000..94bcd3e --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java @@ -0,0 +1,63 @@ +package it.tdlight.reactiveapi; + +import com.google.common.net.HostAndPort; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import org.jetbrains.annotations.Nullable; + +public final class RSocketParameters implements ChannelsParameters { + + private final boolean client; + private final HostAndPort host; + private final List lanes; + + public RSocketParameters(InstanceType instanceType, String host, List lanes) { + this.client = instanceType != InstanceType.UPDATES_CONSUMER; + this.host = HostAndPort.fromString(host); + this.lanes = lanes; + } + + @Override + public Set getAllLanes() { + var lanes = new LinkedHashSet(this.lanes.size() + 1); + lanes.add("main"); + lanes.addAll(this.lanes); + return lanes; + } + + public boolean isClient() { + return client; + } + + public HostAndPort host() { + return host; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj == null || obj.getClass() != this.getClass()) { + return false; + } + var that = (RSocketParameters) obj; + return Objects.equals(this.client, that.client) && Objects.equals( + this.host, + that.host) && Objects.equals(this.lanes, that.lanes); + } + + @Override + public int hashCode() { + return Objects.hash(client, host, lanes); + } + + @Override + public String toString() { + return "RSocketParameters[client=" + client + ", " + "host=" + host + ", " + + "lanes=" + lanes + ']'; + } + +} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 7ca3274..94ada35 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -71,7 +71,7 @@ public abstract class ReactiveApiPublisher { private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5); private static final Duration HUNDRED_MS = Duration.ofMillis(100); - private final TdlibChannelsSharedServer kafkaSharedTdlibServers; + private final TdlibChannelsSharedServer sharedTdlibServers; private final Set resultingEventTransformerSet; private final ReactiveTelegramClient rawTelegramClient; private final Flux telegramClient; @@ -85,14 +85,14 @@ public abstract class ReactiveApiPublisher { private final AtomicReference disposable = new AtomicReference<>(); private final AtomicReference path = new AtomicReference<>(); - private ReactiveApiPublisher(TdlibChannelsSharedServer kafkaSharedTdlibServers, + private ReactiveApiPublisher(TdlibChannelsSharedServer sharedTdlibServers, Set resultingEventTransformerSet, long userId, String lane) { - this.kafkaSharedTdlibServers = kafkaSharedTdlibServers; + this.sharedTdlibServers = sharedTdlibServers; this.resultingEventTransformerSet = resultingEventTransformerSet; this.userId = userId; this.lane = Objects.requireNonNull(lane); - this.responses = this.kafkaSharedTdlibServers.responses(); + this.responses = this.sharedTdlibServers.responses(); this.rawTelegramClient = ClientManager.createReactive(); try { Init.start(); @@ -114,20 +114,20 @@ public abstract class ReactiveApiPublisher { }); } - public static ReactiveApiPublisher fromToken(TdlibChannelsSharedServer kafkaSharedTdlibServers, + public static ReactiveApiPublisher fromToken(TdlibChannelsSharedServer sharedTdlibServers, Set resultingEventTransformerSet, long userId, String token, String lane) { - return new ReactiveApiPublisherToken(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, token, lane); + return new ReactiveApiPublisherToken(sharedTdlibServers, resultingEventTransformerSet, userId, token, lane); } - public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers, + public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedServer sharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber, String lane) { - return new ReactiveApiPublisherPhoneNumber(kafkaSharedTdlibServers, + return new ReactiveApiPublisherPhoneNumber(sharedTdlibServers, resultingEventTransformerSet, userId, phoneNumber, @@ -209,7 +209,7 @@ public abstract class ReactiveApiPublisher { // Buffer requests to avoid halting the event loop .onBackpressureBuffer(); - kafkaSharedTdlibServers.events(lane, messagesToSend); + sharedTdlibServers.events(lane, messagesToSend); publishedResultingEvents // Obtain only cluster-bound events @@ -551,12 +551,12 @@ public abstract class ReactiveApiPublisher { private final String botToken; - public ReactiveApiPublisherToken(TdlibChannelsSharedServer kafkaSharedTdlibServers, + public ReactiveApiPublisherToken(TdlibChannelsSharedServer sharedTdlibServers, Set resultingEventTransformerSet, long userId, String botToken, String lane) { - super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, lane); + super(sharedTdlibServers, resultingEventTransformerSet, userId, lane); this.botToken = botToken; } @@ -583,12 +583,12 @@ public abstract class ReactiveApiPublisher { private final long phoneNumber; - public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedServer kafkaSharedTdlibServers, + public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedServer sharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber, String lane) { - super(kafkaSharedTdlibServers, resultingEventTransformerSet, userId, lane); + super(sharedTdlibServers, resultingEventTransformerSet, userId, lane); this.phoneNumber = phoneNumber; } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java index 62a8098..c94ebcb 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java @@ -1,11 +1,32 @@ package it.tdlight.reactiveapi; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.LongConsumer; +import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; +import reactor.util.context.Context; public class ReactorUtils { + @SuppressWarnings("rawtypes") + private static final WaitingSink WAITING_SINK = new WaitingSink<>(); + public static Flux subscribeOnce(Flux f) { AtomicBoolean subscribed = new AtomicBoolean(); return f.doOnSubscribe(s -> { @@ -23,4 +44,144 @@ public class ReactorUtils { } }); } + + public static Flux createLastestSubscriptionFlux(Flux upstream, int maxBufferSize) { + return upstream.transform(parent -> { + AtomicReference subscriptionAtomicReference = new AtomicReference<>(); + AtomicReference> prevEmitterRef = new AtomicReference<>(); + Deque> queue = new ArrayDeque<>(maxBufferSize); + + return Flux.create(emitter -> { + var prevEmitter = prevEmitterRef.getAndSet(emitter); + + if (prevEmitter != null) { + if (prevEmitter != WAITING_SINK) { + prevEmitter.error(new CancellationException()); + } + synchronized (queue) { + Signal next; + while (!emitter.isCancelled() && (next = queue.peek()) != null) { + if (next.isOnNext()) { + queue.poll(); + var nextVal = next.get(); + assert nextVal != null; + emitter.next(nextVal); + } else if (next.isOnError()) { + var throwable = next.getThrowable(); + assert throwable != null; + emitter.error(throwable); + break; + } else if (next.isOnComplete()) { + emitter.complete(); + break; + } else { + throw new UnsupportedOperationException(); + } + } + } + } else { + parent.subscribe(new CoreSubscriber<>() { + @Override + public void onSubscribe(@NotNull Subscription s) { + subscriptionAtomicReference.set(s); + } + + @Override + public void onNext(K payload) { + FluxSink prevEmitter = prevEmitterRef.get(); + if (prevEmitter != WAITING_SINK) { + prevEmitter.next(payload); + } else { + synchronized (queue) { + queue.add(Signal.next(payload)); + } + } + } + + @Override + public void onError(Throwable throwable) { + FluxSink prevEmitter = prevEmitterRef.get(); + synchronized (queue) { + queue.add(Signal.error(throwable)); + } + if (prevEmitter != WAITING_SINK) { + prevEmitter.error(throwable); + } + } + + @Override + public void onComplete() { + FluxSink prevEmitter = prevEmitterRef.get(); + synchronized (queue) { + queue.add(Signal.complete()); + } + if (prevEmitter != WAITING_SINK) { + prevEmitter.complete(); + } + } + }); + } + var s = subscriptionAtomicReference.get(); + emitter.onRequest(n -> { + if (n > maxBufferSize) { + emitter.error(new UnsupportedOperationException("Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize)); + } else { + s.request(n); + } + }); + //noinspection unchecked + emitter.onCancel(() -> prevEmitterRef.compareAndSet(emitter, WAITING_SINK)); + //noinspection unchecked + emitter.onDispose(() -> prevEmitterRef.compareAndSet(emitter, WAITING_SINK)); + }, OverflowStrategy.BUFFER); + }); + } + + private static class WaitingSink implements FluxSink { + + @Override + public @NotNull FluxSink next(@NotNull T t) { + throw new UnsupportedOperationException(); + } + + @Override + public void complete() { + throw new UnsupportedOperationException(); + } + + @Override + public void error(@NotNull Throwable e) { + throw new UnsupportedOperationException(); + } + + @Override + public @NotNull Context currentContext() { + throw new UnsupportedOperationException(); + } + + @Override + public long requestedFromDownstream() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public @NotNull FluxSink onRequest(@NotNull LongConsumer consumer) { + throw new UnsupportedOperationException(); + } + + @Override + public @NotNull FluxSink onCancel(@NotNull Disposable d) { + throw new UnsupportedOperationException(); + } + + @Override + public @NotNull FluxSink onDispose(@NotNull Disposable d) { + throw new UnsupportedOperationException(); + } + } } diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java index e561d82..7f62399 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java @@ -18,21 +18,20 @@ import reactor.util.concurrent.Queues; public class TdlibChannelsSharedServer implements Closeable { - private final TdlibChannelsServers kafkaTdlibServersChannels; + private final TdlibChannelsServers tdServersChannels; private final Disposable responsesSub; private final AtomicReference requestsSub = new AtomicReference<>(); private final Many> responses = Sinks.many().unicast().onBackpressureBuffer( Queues.>get(65535).get()); private final Flux>> requests; - public TdlibChannelsSharedServer(TdlibChannelsServers kafkaTdlibServersChannels) { - this.kafkaTdlibServersChannels = kafkaTdlibServersChannels; - this.responsesSub = kafkaTdlibServersChannels.response() + public TdlibChannelsSharedServer(TdlibChannelsServers tdServersChannels) { + this.tdServersChannels = tdServersChannels; + this.responsesSub = tdServersChannels.response() .sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) .subscribeOn(Schedulers.parallel()) .subscribe(); - this.requests = kafkaTdlibServersChannels.request() - .consumeMessages("td-requests"); + this.requests = tdServersChannels.request().consumeMessages(); } public Flux>> requests() { @@ -42,7 +41,7 @@ public class TdlibChannelsSharedServer implements Closeable { } public Disposable events(String lane, Flux eventFlux) { - return kafkaTdlibServersChannels.events(lane) + return tdServersChannels.events(lane) .sendMessages(eventFlux) .subscribeOn(Schedulers.parallel()) .subscribe(); @@ -59,6 +58,6 @@ public class TdlibChannelsSharedServer implements Closeable { if (requestsSub != null) { requestsSub.dispose(); } - kafkaTdlibServersChannels.close(); + tdServersChannels.close(); } } diff --git a/src/main/java/it/tdlight/reactiveapi/UtfCodec.java b/src/main/java/it/tdlight/reactiveapi/UtfCodec.java new file mode 100644 index 0000000..5485655 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/UtfCodec.java @@ -0,0 +1,28 @@ +package it.tdlight.reactiveapi; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; + +public class UtfCodec implements Serializer, Deserializer { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public String deserialize(String topic, byte[] data) { + return new String(data, StandardCharsets.UTF_8); + } + + @Override + public byte[] serialize(String topic, String data) { + return data.getBytes(StandardCharsets.UTF_8); + } + + @Override + public void close() { + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java index 9928132..b4b6361 100644 --- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java @@ -56,7 +56,7 @@ public final class KafkaConsumer implements EventConsumer { throw new RuntimeException(e); } Map props = new HashMap<>(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBootstrapServersString()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); @@ -81,7 +81,6 @@ public final class KafkaConsumer implements EventConsumer { return KafkaReceiver.create(options); } - @Override public boolean isQuickResponse() { return quickResponse; } @@ -120,12 +119,12 @@ public final class KafkaConsumer implements EventConsumer { } @Override - public Flux> consumeMessages(@NotNull String subGroupId) { - return consumeMessagesInternal(subGroupId); + public Flux> consumeMessages() { + return consumeMessagesInternal(); } - private Flux> consumeMessagesInternal(@NotNull String subGroupId) { - return createReceiver(kafkaParameters.groupId() + (subGroupId.isBlank() ? "" : ("-" + subGroupId))) + private Flux> consumeMessagesInternal() { + return createReceiver(kafkaParameters.groupId() + "-" + channelName) .receiveAutoAck(isQuickResponse() ? 1 : 4) .concatMap(Function.identity()) .log("consume-messages", diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java index 861e4d4..4292504 100644 --- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java @@ -30,7 +30,7 @@ public final class KafkaProducer implements EventProducer { this.channelCodec = channelCodec; this.channelName = channelName; Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.bootstrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaParameters.getBootstrapServersString()); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaParameters.clientId()); props.put(ProducerConfig.ACKS_CONFIG, "1"); props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java new file mode 100644 index 0000000..799ebf9 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java @@ -0,0 +1,75 @@ +package it.tdlight.reactiveapi.rsocket; + +import com.google.common.net.HostAndPort; +import io.netty.buffer.ByteBuf; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.Resume; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.RSocketParameters; +import it.tdlight.reactiveapi.Timestamped; +import java.util.logging.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class RSocketConsumeAsClient implements EventConsumer { + + private static final Logger LOG = LogManager.getLogger(RSocketConsumeAsClient.class); + + private final HostAndPort host; + private final ChannelCodec channelCodec; + private final String channelName; + + public RSocketConsumeAsClient(HostAndPort hostAndPort, + ChannelCodec channelCodec, + String channelName) { + this.channelCodec = channelCodec; + this.channelName = channelName; + this.host = hostAndPort; + } + + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Flux> consumeMessages() { + var deserializer = channelCodec.getNewDeserializer(); + return + RSocketConnector.create() + .resume(new Resume()) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .connect(TcpClientTransport.create(host.getHost(), host.getPort())) + .flatMapMany(socket -> socket + .requestStream(DefaultPayload.create("", "consume")) + .map(payload -> { + ByteBuf slice = payload.sliceData(); + var data = new byte[slice.readableBytes()]; + slice.readBytes(data, 0, data.length); + //noinspection unchecked + return new Timestamped(System.currentTimeMillis(), (T) deserializer.deserialize(null, data)); + }) + .doFinally(signalType -> socket + .fireAndForget(DefaultPayload.create("", "close")) + .then(socket.onClose()) + .doFinally(s -> socket.dispose()) + .subscribeOn(Schedulers.parallel()) + .subscribe())) + .log("RSOCKET_CONSUMER_CLIENT", Level.FINE); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java new file mode 100644 index 0000000..3543b0c --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java @@ -0,0 +1,230 @@ +package it.tdlight.reactiveapi.rsocket; + +import com.google.common.net.HostAndPort; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.core.Resume; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.RSocketParameters; +import it.tdlight.reactiveapi.Timestamped; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.logging.Level; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Empty; +import reactor.core.scheduler.Schedulers; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuple3; +import reactor.util.function.Tuples; +import reactor.util.retry.Retry; + +public class RSocketConsumeAsServer implements EventConsumer { + + private static final Logger LOG = LogManager.getLogger(RSocketConsumeAsServer.class); + + private final HostAndPort host; + private final ChannelCodec channelCodec; + private final String channelName; + + public RSocketConsumeAsServer(HostAndPort hostAndPort, + ChannelCodec channelCodec, + String channelName) { + this.channelCodec = channelCodec; + this.channelName = channelName; + this.host = hostAndPort; + } + + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Flux> consumeMessages() { + Deserializer deserializer = channelCodec.getNewDeserializer(); + return Mono + .>>>create(sink -> { + AtomicReference serverRef = new AtomicReference<>(); + var server = RSocketServer + .create((setup, in) -> { + var inRawFlux = in.requestStream(DefaultPayload.create("", "consume")); + var inFlux = inRawFlux.map(payload -> { + ByteBuf slice = payload.sliceData(); + var data = new byte[slice.readableBytes()]; + slice.readBytes(data, 0, data.length); + return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, data)); + }); + sink.success(Tuples.of(serverRef.get(), in, inFlux)); + + return Mono.just(new RSocket() {}); + }) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .resume(new Resume()) + .bindNow(TcpServerTransport.create(host.getHost(), host.getPort())); + serverRef.set(server); + sink.onCancel(server); + }) + .subscribeOn(Schedulers.boundedElastic()) + .flatMapMany(t -> t.getT3().doFinally(s -> { + t.getT2().dispose(); + t.getT1().dispose(); + })) + .log("RSOCKET_CONSUMER_SERVER", Level.FINE); + } + /*return Flux.defer(() -> { + var deserializer = channelCodec.getNewDeserializer(); + AtomicReference inRef = new AtomicReference<>(); + AtomicReference inSubRef = new AtomicReference<>(); + return Flux.>create(sink -> { + var server = RSocketServer.create((setup, in) -> { + var prev = inRef.getAndSet(in); + if (prev != null) { + prev.dispose(); + } + + var inRawFlux = in.requestStream(DefaultPayload.create("", "consume")); + var inFlux = inRawFlux.map(payload -> { + ByteBuf slice = payload.sliceData(); + var data = new byte[slice.readableBytes()]; + slice.readBytes(data, 0, data.length); + //noinspection unchecked + return new Timestamped<>(System.currentTimeMillis(), (T) deserializer.deserialize(null, data)); + }); + + inFlux.subscribe(new CoreSubscriber<>() { + @Override + public void onSubscribe(@NotNull Subscription s) { + var prevS = inSubRef.getAndSet(s); + if (prevS != null) { + prevS.cancel(); + } else { + sink.onRequest(n -> { + s.request(n); + }); + } + } + + @Override + public void onNext(Timestamped val) { + sink.next(val); + } + + @Override + public void onError(Throwable throwable) { + sink.error(throwable); + } + + @Override + public void onComplete() { + sink.complete(); + } + }); + + return Mono.just(new RSocket() {}); + }).payloadDecoder(PayloadDecoder.ZERO_COPY).bindNow(TcpServerTransport.create(host.getHost(), host.getPort())); + sink.onCancel(() -> { + var inSub = inSubRef.get(); + if (inSub != null) { + inSub.cancel(); + } + }); + sink.onDispose(() -> { + var in = inRef.get(); + if (in != null) { + in.dispose(); + } + server.dispose(); + }); + }).subscribeOn(Schedulers.boundedElastic()).log("RSOCKET_CONSUMER_SERVER", Level.FINE) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to consume as server, retrying. {}", rs))); + });*/ + /* + return Flux.>create(sink -> { + RSocketServer + .create((setup, socket) -> { + socket.requestStream(DefaultPayload.create("", "consume")).map(payload -> { + ByteBuf slice = payload.sliceData(); + var data = new byte[slice.readableBytes()]; + slice.readBytes(data, 0, data.length); + //noinspection unchecked + return new Timestamped<>(System.currentTimeMillis(), (T) deserializer.deserialize(null, data)); + }).subscribe(new CoreSubscriber<>() { + @Override + public void onSubscribe(@NotNull Subscription s) { + sink.onDispose(() -> { + s.cancel(); + socket.dispose(); + }); + sink.onRequest(n -> { + if (n > 8192) { + throw new UnsupportedOperationException( + "Requests count is bigger than max buffer size! " + n + " > " + 8192); + } + s.request(n); + }); + sink.onCancel(() -> s.cancel()); + } + + @Override + public void onNext(Timestamped val) { + sink.next(val); + } + + @Override + public void onError(Throwable throwable) { + sink.error(throwable); + } + + @Override + public void onComplete() { + sink.complete(); + } + }); + return Mono.just(socket); + }) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create(host.getHost(), host.getPort())) + .subscribeOn(Schedulers.parallel()) + .subscribe(v -> { + sink.onDispose(v); + }, sink::error, sink::complete); + }) + .retryWhen(Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(rs -> LOG.warn("Failed to consume as server, retrying. {}", rs))); + */ +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java new file mode 100644 index 0000000..19308c5 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java @@ -0,0 +1,79 @@ +package it.tdlight.reactiveapi.rsocket; + +import com.google.common.net.HostAndPort; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.Resume; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.util.DefaultPayload; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.RSocketParameters; +import it.tdlight.reactiveapi.ReactorUtils; +import it.tdlight.reactiveapi.Timestamped; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.logging.Level; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Empty; +import reactor.util.retry.Retry; + +public final class RSocketProduceAsClient implements EventProducer { + + private static final Logger LOG = LogManager.getLogger(RSocketProduceAsClient.class); + private final ChannelCodec channelCodec; + private final String channelName; + private final HostAndPort host; + private final Empty closeRequest = Sinks.empty(); + + public RSocketProduceAsClient(HostAndPort host, ChannelCodec channelCodec, String channelName) { + this.channelCodec = channelCodec; + this.channelName = channelName; + this.host = host; + } + + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Mono sendMessages(Flux eventsFlux) { + Serializer serializer = channelCodec.getNewSerializer(); + Flux serializedEventsFlux = eventsFlux + .map(event -> DefaultPayload.create(serializer.serialize(null, event))) + .log("RSOCKET_PRODUCER_CLIENT", Level.FINE) + .doFinally(s -> LOG.debug("Events flux ended: {}", s)); + + return + RSocketConnector.create() + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .setupPayload(DefaultPayload.create("", "connect")) + .acceptor(SocketAcceptor.forRequestStream(payload -> serializedEventsFlux)) + .connect(TcpClientTransport.create(host.getHost(), host.getPort())) + .flatMap(rSocket -> rSocket.onClose() + .takeUntilOther(closeRequest.asMono().doFinally(s -> rSocket.dispose()))) + .log("RSOCKET_PRODUCER_CLIENT_Y", Level.FINE); + } + + @Override + public void close() { + closeRequest.tryEmitEmpty(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java new file mode 100644 index 0000000..6da58da --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java @@ -0,0 +1,100 @@ +package it.tdlight.reactiveapi.rsocket; + +import com.google.common.net.HostAndPort; +import io.netty.buffer.Unpooled; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; +import io.rsocket.core.Resume; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.ReactorUtils; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Empty; + +public final class RSocketProduceAsServer implements EventProducer { + + private static final Logger LOG = LogManager.getLogger(RSocketProduceAsServer.class); + private final ChannelCodec channelCodec; + private final String channelName; + private final HostAndPort host; + + private final Empty closeRequest = Sinks.empty(); + + public RSocketProduceAsServer(HostAndPort hostAndPort, ChannelCodec channelCodec, String channelName) { + this.host = hostAndPort; + this.channelCodec = channelCodec; + this.channelName = channelName; + } + + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Mono sendMessages(Flux eventsFlux) { + return Mono.defer(()-> { + AtomicReference serverRef = new AtomicReference<>(); + Serializer serializer = channelCodec.getNewSerializer(); + Flux serializedEventsFlux = eventsFlux + .log("RSOCKET_PRODUCER_SERVER", Level.FINE) + .map(event -> DefaultPayload.create(serializer.serialize(null, event))) + .doFinally(s -> LOG.debug("Events flux ended: {}", s)); + + return RSocketServer + .create(new SocketAcceptor() { + @Override + public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) { + return Mono.just(new RSocket() { + @Override + public @NotNull Mono fireAndForget(@NotNull Payload payload) { + return Mono.fromRunnable(() -> { + var srv = serverRef.get(); + if (srv != null) { + srv.dispose(); + } + }); + } + + @Override + public @NotNull Flux requestStream(@NotNull Payload payload) { + return serializedEventsFlux; + } + }); + } + }) + .resume(new Resume()) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create(host.getHost(), host.getPort())) + .doOnNext(serverRef::set) + .flatMap(closeableChannel -> closeableChannel.onClose() + .takeUntilOther(closeRequest.asMono().doFinally(s -> closeableChannel.dispose()))); + }); + } + + @Override + public void close() { + closeRequest.tryEmitEmpty(); + } +} diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 23581c6..11b01c7 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -1,6 +1,8 @@ module tdlib.reactive.api { exports it.tdlight.reactiveapi; exports it.tdlight.reactiveapi.generated; + exports it.tdlight.reactiveapi.rsocket; + exports it.tdlight.reactiveapi.kafka; requires com.fasterxml.jackson.annotation; requires org.jetbrains.annotations; requires org.slf4j; @@ -22,4 +24,8 @@ module tdlib.reactive.api { requires jdk.unsupported; requires jakarta.xml.bind; requires reactor.core; + requires rsocket.core; + requires rsocket.transport.local; + requires rsocket.transport.netty; + requires io.netty.buffer; } \ No newline at end of file diff --git a/src/test/java/it/tdlight/reactiveapi/test/BombDeserializer.java b/src/test/java/it/tdlight/reactiveapi/test/BombDeserializer.java new file mode 100644 index 0000000..2eecda8 --- /dev/null +++ b/src/test/java/it/tdlight/reactiveapi/test/BombDeserializer.java @@ -0,0 +1,11 @@ +package it.tdlight.reactiveapi.test; + +import org.apache.kafka.common.serialization.Deserializer; + +public final class BombDeserializer implements Deserializer { + + @Override + public Object deserialize(String topic, byte[] data) { + throw new FakeException(); + } +} diff --git a/src/test/java/it/tdlight/reactiveapi/test/BombSerializer.java b/src/test/java/it/tdlight/reactiveapi/test/BombSerializer.java new file mode 100644 index 0000000..ed243b6 --- /dev/null +++ b/src/test/java/it/tdlight/reactiveapi/test/BombSerializer.java @@ -0,0 +1,11 @@ +package it.tdlight.reactiveapi.test; + +import org.apache.kafka.common.serialization.Serializer; + +public final class BombSerializer implements Serializer { + + @Override + public byte[] serialize(String topic, Object data) { + throw new FakeException(); + } +} diff --git a/src/test/java/it/tdlight/reactiveapi/test/FakeException.java b/src/test/java/it/tdlight/reactiveapi/test/FakeException.java new file mode 100644 index 0000000..c1b1c82 --- /dev/null +++ b/src/test/java/it/tdlight/reactiveapi/test/FakeException.java @@ -0,0 +1,3 @@ +package it.tdlight.reactiveapi.test; + +public class FakeException extends RuntimeException {} diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java new file mode 100644 index 0000000..84b5936 --- /dev/null +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -0,0 +1,200 @@ +package it.tdlight.reactiveapi.test; + +import com.google.common.net.HostAndPort; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.Timestamped; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public abstract class TestChannel { + + protected HostAndPort hostAndPort; + protected EventConsumer consumer; + protected EventProducer producer; + protected IntArrayList data; + + @BeforeEach + public void beforeEach() { + hostAndPort = HostAndPort.fromParts("localhost", 25689); + consumer = createConsumer(hostAndPort, false); + producer = createProducer(hostAndPort, false); + data = new IntArrayList(100); + for (int i = 0; i < 100; i++) { + data.add(i); + } + } + + public abstract EventConsumer createConsumer(HostAndPort hostAndPort, boolean bomb); + + public abstract EventProducer createProducer(HostAndPort hostAndPort, boolean bomb); + + @AfterEach + public void afterEach() { + producer.close(); + } + + @Test + public void testSimple() { + Mono sender = producer + .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) + .then(Mono.empty()); + Mono receiver = consumer + .consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + var response = Flux + .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .blockLast(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testException() { + Mono sender = producer + .sendMessages(Flux.concat( + Flux.fromIterable(data).map(Integer::toUnsignedString), + Mono.error(new FakeException()) + )) + .then(Mono.empty()); + Mono receiver = consumer + .consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + Assertions.assertThrows(Exception.class, () -> Flux + .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .blockLast()); + } + + @Test + public void testEmpty() { + Mono sender = producer + .sendMessages(Flux.empty()) + .then(Mono.empty()); + Mono receiver = consumer + .consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + var data = Flux + .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .blockLast(); + Assertions.assertNotNull(data); + Assertions.assertTrue(data.isEmpty()); + } + + @Test + public void testSimpleOneByOne() { + var sender = producer.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); + var receiver = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + var response = Flux + .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .blockLast(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testCancel() { + var sender = producer.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); + var receiver = consumer + .consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .take(50, true) + .collect(Collectors.toCollection(IntArrayList::new)); + var response = Flux + .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .blockLast(); + data.removeElements(50, 100); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testConsumeDelay() { + var sender = producer + .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); + var receiver = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .concatMap(item -> item == 15 ? Mono.just(item).delaySubscription(Duration.ofSeconds(8)) : Mono.just(item)) + .collect(Collectors.toCollection(IntArrayList::new)); + var response = Flux + .merge(isServerSender() ? List.of(sender, receiver) : List.of(receiver, sender)) + .blockLast(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testProduceDelay() { + var sender = producer + .sendMessages(Flux.fromIterable(data) + .concatMap(item -> item == 15 ? Mono.just(item).delaySubscription(Duration.ofSeconds(8)) : Mono.just(item)) + .map(Integer::toUnsignedString)); + var receiver = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + var response = Flux + .merge(isServerSender() ? List.of(sender, receiver) : List.of(receiver, sender)) + .blockLast(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testConsumeMidCancel() { + var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + Mono sender = producer + .sendMessages(dataFlux.map(Integer::toUnsignedString)) + .then(Mono.empty()); + var receiver1 = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .take(10, true) + .collect(Collectors.toCollection(IntArrayList::new)); + var receiverWait = Flux.empty().delaySubscription(Duration.ofSeconds(4)); + var receiver2 = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + Flux part1 = Flux + .merge((isServerSender() ? sender : receiver1), isServerSender() ? receiver1 : sender); + Flux part2 = Flux + .merge((isServerSender() ? sender : receiver2), receiverWait.then(isServerSender() ? receiver2 : sender)); + var response = Flux.concat(part1, part2).reduce((a, b) -> { + a.addAll(b); + return a; + }).block(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + public abstract boolean isServerSender(); +} diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java new file mode 100644 index 0000000..df18f75 --- /dev/null +++ b/src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java @@ -0,0 +1,35 @@ +package it.tdlight.reactiveapi.test; + +import com.google.common.net.HostAndPort; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsServer; +import it.tdlight.reactiveapi.rsocket.RSocketProduceAsClient; + +public class TestClientToServerChannel extends TestChannel { + + @Override + public EventConsumer createConsumer(HostAndPort hostAndPort, boolean bomb) { + ChannelCodec codec = ChannelCodec.UTF8_TEST; + if (bomb) { + codec = new ChannelCodec(codec.getSerializerClass(), BombDeserializer.class); + } + return new RSocketConsumeAsServer<>(hostAndPort, codec, "test"); + } + + @Override + public EventProducer createProducer(HostAndPort hostAndPort, boolean bomb) { + ChannelCodec codec = ChannelCodec.UTF8_TEST; + if (bomb) { + codec = new ChannelCodec(BombSerializer.class, codec.getDeserializerClass()); + } + return new RSocketProduceAsClient<>(hostAndPort, codec, "test"); + } + + @Override + public boolean isServerSender() { + return false; + } + +} diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java new file mode 100644 index 0000000..b109661 --- /dev/null +++ b/src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java @@ -0,0 +1,42 @@ +package it.tdlight.reactiveapi.test; + +import com.google.common.net.HostAndPort; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsClient; +import it.tdlight.reactiveapi.rsocket.RSocketProduceAsServer; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import java.util.List; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class TestServerToClientChannel extends TestChannel { + + @Override + public EventConsumer createConsumer(HostAndPort hostAndPort, boolean bomb) { + ChannelCodec codec = ChannelCodec.UTF8_TEST; + if (bomb) { + codec = new ChannelCodec(codec.getSerializerClass(), BombDeserializer.class); + } + return new RSocketConsumeAsClient<>(hostAndPort, codec, "test"); + } + + @Override + public EventProducer createProducer(HostAndPort hostAndPort, boolean bomb) { + ChannelCodec codec = ChannelCodec.UTF8_TEST; + if (bomb) { + codec = new ChannelCodec(BombSerializer.class, codec.getDeserializerClass()); + } + return new RSocketProduceAsServer<>(hostAndPort, codec, "test"); + } + + @Override + public boolean isServerSender() { + return true; + } +} diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java new file mode 100644 index 0000000..f8bbcff --- /dev/null +++ b/src/test/java/module-info.java @@ -0,0 +1,12 @@ +module tdlib.reactive.api.test { + exports it.tdlight.reactiveapi.test; + requires org.apache.logging.log4j.core; + requires org.slf4j; + requires tdlib.reactive.api; + requires org.junit.jupiter.api; + requires reactor.core; + requires com.google.common; + requires it.unimi.dsi.fastutil; + requires org.reactivestreams; + requires kafka.clients; +} \ No newline at end of file diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml new file mode 100644 index 0000000..46e9337 --- /dev/null +++ b/src/test/resources/log4j2.xml @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + + + + + + + +