diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 6564681..d45112d 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -35,9 +35,9 @@ public class AtomixReactiveApi implements ReactiveApi { private final AtomixReactiveApiMode mode; - private final ClientsSharedTdlib sharedTdlibClients; + private final TdlibChannelsSharedReceive sharedTdlibClients; @Nullable - private final TdlibChannelsSharedServer sharedTdlibServers; + private final TdlibChannelsSharedHost sharedTdlibServers; private final ReactiveApiMultiClient client; private final Set resultingEventTransformerSet; @@ -66,34 +66,34 @@ public class AtomixReactiveApi implements ReactiveApi { this.mode = mode; ChannelFactory channelFactory = ChannelFactory.getFactoryFromParameters(channelsParameters); if (mode != AtomixReactiveApiMode.SERVER) { - EventProducer> tdRequestProducer = ChannelProducerTdlibRequest.create(channelFactory, channelsParameters); - EventConsumer> tdResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory, channelsParameters); + EventProducer> tdRequestProducer = ChannelProducerTdlibRequest.create(channelFactory); + EventConsumer> tdResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory); HashMap> clientBoundConsumers = new HashMap<>(); for (String lane : channelsParameters.getAllLanes()) { - clientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, channelsParameters, lane)); + clientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, lane)); } var tdClientsChannels = new TdlibChannelsClients(tdRequestProducer, tdResponseConsumer, clientBoundConsumers ); - this.sharedTdlibClients = new ClientsSharedTdlib(tdClientsChannels); + this.sharedTdlibClients = new TdlibChannelsSharedReceive(tdClientsChannels); this.client = new LiveAtomixReactiveApiClient(sharedTdlibClients); } else { this.sharedTdlibClients = null; this.client = null; } if (mode != AtomixReactiveApiMode.CLIENT) { - EventConsumer> tdRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory, channelsParameters); - EventProducer> tdResponseProducer = ChannelProducerTdlibResponse.create(channelFactory, channelsParameters); + EventConsumer> tdRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory); + EventProducer> tdResponseProducer = ChannelProducerTdlibResponse.create(channelFactory); var clientBoundProducers = new HashMap>(); for (String lane : channelsParameters.getAllLanes()) { - clientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, channelsParameters, lane)); + clientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, lane)); } var tdServer = new TdlibChannelsServers(tdRequestConsumer, tdResponseProducer, clientBoundProducers ); - this.sharedTdlibServers = new TdlibChannelsSharedServer(tdServer); + this.sharedTdlibServers = new TdlibChannelsSharedHost(channelsParameters.getAllLanes(), tdServer); } else { this.sharedTdlibServers = null; } @@ -143,10 +143,9 @@ public class AtomixReactiveApi implements ReactiveApi { return loadSessions.then(Mono.fromRunnable(() -> { if (sharedTdlibServers != null) { requestsSub = sharedTdlibServers.requests() - .onBackpressureError() .doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data())) .subscribeOn(Schedulers.parallel()) - .subscribe(); + .subscribe(n -> {}, ex -> LOG.error("Requests channel broke unexpectedly", ex)); } })).transform(ReactorUtils::subscribeOnce); } diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 9e71705..0cba26a 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -52,7 +52,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { private final AtomicLong requestId = new AtomicLong(0); private final Disposable subscription; - public BaseAtomixReactiveApiClient(ClientsSharedTdlib sharedTdlibClients) { + public BaseAtomixReactiveApiClient(TdlibChannelsSharedReceive sharedTdlibClients) { this.clientId = System.nanoTime(); this.requests = sharedTdlibClients.requests(); @@ -78,7 +78,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient { } var cf = new CompletableFuture>>(); this.responses.put(requestId, cf); - Mono response = Mono.fromFuture(cf) + Mono response = Mono.fromFuture(() -> cf) .timeout(timeoutDuration, Mono.fromSupplier(() -> new Timestamped<>(requestTimestamp.toEpochMilli(), new Response<>(clientId, requestId, userId, new TdApi.Error(408, "Request Timeout"))))) .handle((responseObj, sink) -> { diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java index ec492fc..32c4cec 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerClientBoundEvent.java @@ -8,15 +8,14 @@ public class ChannelConsumerClientBoundEvent { private ChannelConsumerClientBoundEvent() { } - public static EventConsumer create(ChannelFactory channelFactory, ChannelsParameters channelsParameters, - @NotNull String lane) { + public static EventConsumer create(ChannelFactory channelFactory, @NotNull String lane) { String name; if (lane.isEmpty()) { name = Channel.CLIENT_BOUND_EVENT.getChannelName(); } else { name = Channel.CLIENT_BOUND_EVENT.getChannelName() + "-" + lane; } - return channelFactory.newConsumer(channelsParameters, false, ChannelCodec.CLIENT_BOUND_EVENT, name); + return channelFactory.newConsumer(false, ChannelCodec.CLIENT_BOUND_EVENT, name); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java index 2bcc837..0f74881 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibRequest.java @@ -8,12 +8,8 @@ public class ChannelConsumerTdlibRequest { private ChannelConsumerTdlibRequest() { } - public static EventConsumer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { - return channelFactory.newConsumer(channelsParameters, - true, - ChannelCodec.TDLIB_REQUEST, - Channel.TDLIB_REQUEST.getChannelName() - ); + public static EventConsumer> create(ChannelFactory channelFactory) { + return channelFactory.newConsumer(true, ChannelCodec.TDLIB_REQUEST, Channel.TDLIB_REQUEST.getChannelName()); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java index 1353e19..9da3428 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelConsumerTdlibResponse.java @@ -8,11 +8,7 @@ public class ChannelConsumerTdlibResponse { private ChannelConsumerTdlibResponse() { } - public static EventConsumer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { - return channelFactory.newConsumer(channelsParameters, - true, - ChannelCodec.TDLIB_RESPONSE, - Channel.TDLIB_RESPONSE.getChannelName() - ); + public static EventConsumer> create(ChannelFactory channelFactory) { + return channelFactory.newConsumer(true, ChannelCodec.TDLIB_RESPONSE, Channel.TDLIB_RESPONSE.getChannelName()); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java index e4f58d7..7111e9d 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelFactory.java @@ -1,74 +1,77 @@ package it.tdlight.reactiveapi; +import io.rsocket.core.RSocketClient; import it.tdlight.reactiveapi.kafka.KafkaConsumer; import it.tdlight.reactiveapi.kafka.KafkaProducer; -import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsClient; -import it.tdlight.reactiveapi.rsocket.RSocketProduceAsServer; -import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsServer; -import it.tdlight.reactiveapi.rsocket.RSocketProduceAsClient; +import it.tdlight.reactiveapi.rsocket.MyRSocketClient; +import it.tdlight.reactiveapi.rsocket.MyRSocketServer; +import it.tdlight.reactiveapi.rsocket.RSocketChannelManager; +import java.io.Closeable; +import java.io.IOException; public interface ChannelFactory { static ChannelFactory getFactoryFromParameters(ChannelsParameters channelsParameters) { - if (channelsParameters instanceof KafkaParameters) { - return new KafkaChannelFactory(); + if (channelsParameters instanceof KafkaParameters kafkaParameters) { + return new KafkaChannelFactory(kafkaParameters); + } else if (channelsParameters instanceof RSocketParameters socketParameters) { + return new RSocketChannelFactory(socketParameters); } else { - return new RsocketChannelFactory(); + throw new UnsupportedOperationException("Unsupported parameters type: " + channelsParameters); } } - EventConsumer newConsumer(ChannelsParameters channelsParameters, - boolean quickResponse, - ChannelCodec channelCodec, - String channelName); + EventConsumer newConsumer(boolean quickResponse, ChannelCodec channelCodec, String channelName); - EventProducer newProducer(ChannelsParameters channelsParameters, - ChannelCodec channelCodec, - String channelName); + EventProducer newProducer(ChannelCodec channelCodec, String channelName); class KafkaChannelFactory implements ChannelFactory { - @Override - public EventConsumer newConsumer(ChannelsParameters channelsParameters, - boolean quickResponse, - ChannelCodec channelCodec, - String channelName) { - return new KafkaConsumer<>((KafkaParameters) channelsParameters, quickResponse, channelCodec, channelName); + private final KafkaParameters channelsParameters; + + public KafkaChannelFactory(KafkaParameters channelsParameters) { + this.channelsParameters = channelsParameters; } @Override - public EventProducer newProducer(ChannelsParameters channelsParameters, - ChannelCodec channelCodec, - String channelName) { - return new KafkaProducer<>((KafkaParameters) channelsParameters, channelCodec, channelName); + public EventConsumer newConsumer(boolean quickResponse, ChannelCodec channelCodec, String channelName) { + return new KafkaConsumer<>(channelsParameters, quickResponse, channelCodec, channelName); + } + + @Override + public EventProducer newProducer(ChannelCodec channelCodec, String channelName) { + return new KafkaProducer<>(channelsParameters, channelCodec, channelName); } } - class RsocketChannelFactory implements ChannelFactory { + class RSocketChannelFactory implements ChannelFactory, Closeable { - @Override - public EventConsumer newConsumer(ChannelsParameters channelsParameters, - boolean quickResponse, - ChannelCodec channelCodec, - String channelName) { - var socketParameters = (RSocketParameters) channelsParameters; - if (socketParameters.isClient()) { - return new RSocketConsumeAsClient<>(socketParameters.channelHost(channelName), channelCodec, channelName); + private final RSocketParameters channelsParameters; + private final RSocketChannelManager manager; + + public RSocketChannelFactory(RSocketParameters channelsParameters) { + this.channelsParameters = channelsParameters; + if (channelsParameters.isClient()) { + this.manager = new MyRSocketClient(channelsParameters.baseHost()); } else { - return new RSocketConsumeAsServer<>(socketParameters.channelHost(channelName), channelCodec, channelName); + this.manager = new MyRSocketServer(channelsParameters.baseHost()); } } @Override - public EventProducer newProducer(ChannelsParameters channelsParameters, - ChannelCodec channelCodec, - String channelName) { - var socketParameters = (RSocketParameters) channelsParameters; - if (socketParameters.isClient()) { - return new RSocketProduceAsClient<>(socketParameters.channelHost(channelName), channelCodec, channelName); - } else { - return new RSocketProduceAsServer<>(socketParameters.channelHost(channelName), channelCodec, channelName); - } + public EventConsumer newConsumer(boolean quickResponse, ChannelCodec channelCodec, String channelName) { + return manager.registerConsumer(channelCodec, channelName); + } + + @Override + public EventProducer newProducer(ChannelCodec channelCodec, String channelName) { + return manager.registerProducer(channelCodec, channelName); + } + + @Override + public void close() throws IOException { + manager.dispose(); + manager.onClose().block(); } } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java index d22c9ce..d78682e 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerClientBoundEvent.java @@ -7,13 +7,13 @@ public class ChannelProducerClientBoundEvent { private ChannelProducerClientBoundEvent() { } - public static EventProducer create(ChannelFactory channelFactory, ChannelsParameters channelsParameters, String lane) { + public static EventProducer create(ChannelFactory channelFactory, String lane) { String name; if (lane.isBlank()) { name = Channel.CLIENT_BOUND_EVENT.getChannelName(); } else { name = Channel.CLIENT_BOUND_EVENT.getChannelName() + "-" + lane; } - return channelFactory.newProducer(channelsParameters, ChannelCodec.CLIENT_BOUND_EVENT, name); + return channelFactory.newProducer(ChannelCodec.CLIENT_BOUND_EVENT, name); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java index 9dad97d..9425e8d 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibRequest.java @@ -7,11 +7,8 @@ public class ChannelProducerTdlibRequest { private ChannelProducerTdlibRequest() { } - public static EventProducer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { - return channelFactory.newProducer(channelsParameters, - ChannelCodec.TDLIB_REQUEST, - Channel.TDLIB_REQUEST.getChannelName() - ); + public static EventProducer> create(ChannelFactory channelFactory) { + return channelFactory.newProducer(ChannelCodec.TDLIB_REQUEST, Channel.TDLIB_REQUEST.getChannelName()); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java index 92be3df..5327c48 100644 --- a/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java +++ b/src/main/java/it/tdlight/reactiveapi/ChannelProducerTdlibResponse.java @@ -8,11 +8,8 @@ public class ChannelProducerTdlibResponse { private ChannelProducerTdlibResponse() { } - public static EventProducer> create(ChannelFactory channelFactory, ChannelsParameters channelsParameters) { - return channelFactory.newProducer(channelsParameters, - ChannelCodec.TDLIB_RESPONSE, - Channel.TDLIB_RESPONSE.getChannelName() - ); + public static EventProducer> create(ChannelFactory channelFactory) { + return channelFactory.newProducer(ChannelCodec.TDLIB_RESPONSE, Channel.TDLIB_RESPONSE.getChannelName()); } } diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java index 23affcb..a78bc3d 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java +++ b/src/main/java/it/tdlight/reactiveapi/ClusterSettings.java @@ -31,7 +31,7 @@ public class ClusterSettings { public ChannelsParameters toParameters(String clientId, InstanceType instanceType) { if (rsocketHost != null) { - return new RSocketParameters(instanceType, rsocketHost, lanes); + return new RSocketParameters(instanceType != InstanceType.UPDATES_CONSUMER, rsocketHost, lanes); } else { return new KafkaParameters(clientId, clientId, kafkaBootstrapServers, List.copyOf(lanes)); } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index cb53f63..dfddd0f 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -8,9 +8,9 @@ import reactor.core.publisher.Flux; public class LiveAtomixReactiveApiClient extends BaseAtomixReactiveApiClient { - private final ClientsSharedTdlib sharedTdlibClients; + private final TdlibChannelsSharedReceive sharedTdlibClients; - LiveAtomixReactiveApiClient(ClientsSharedTdlib sharedTdlibClients) { + LiveAtomixReactiveApiClient(TdlibChannelsSharedReceive sharedTdlibClients) { super(sharedTdlibClients); this.sharedTdlibClients = sharedTdlibClients; } diff --git a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java index 44322c8..520713f 100644 --- a/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java +++ b/src/main/java/it/tdlight/reactiveapi/RSocketParameters.java @@ -13,8 +13,8 @@ public final class RSocketParameters implements ChannelsParameters { private final HostAndPort host; private final List lanes; - public RSocketParameters(InstanceType instanceType, String host, List lanes) { - this.client = instanceType != InstanceType.UPDATES_CONSUMER; + public RSocketParameters(boolean client, String host, List lanes) { + this.client = client; this.host = HostAndPort.fromString(host); this.lanes = lanes; } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 94ada35..41b6671 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -71,7 +71,7 @@ public abstract class ReactiveApiPublisher { private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofMinutes(5); private static final Duration HUNDRED_MS = Duration.ofMillis(100); - private final TdlibChannelsSharedServer sharedTdlibServers; + private final TdlibChannelsSharedHost sharedTdlibServers; private final Set resultingEventTransformerSet; private final ReactiveTelegramClient rawTelegramClient; private final Flux telegramClient; @@ -85,7 +85,7 @@ public abstract class ReactiveApiPublisher { private final AtomicReference disposable = new AtomicReference<>(); private final AtomicReference path = new AtomicReference<>(); - private ReactiveApiPublisher(TdlibChannelsSharedServer sharedTdlibServers, + private ReactiveApiPublisher(TdlibChannelsSharedHost sharedTdlibServers, Set resultingEventTransformerSet, long userId, String lane) { this.sharedTdlibServers = sharedTdlibServers; @@ -114,7 +114,7 @@ public abstract class ReactiveApiPublisher { }); } - public static ReactiveApiPublisher fromToken(TdlibChannelsSharedServer sharedTdlibServers, + public static ReactiveApiPublisher fromToken(TdlibChannelsSharedHost sharedTdlibServers, Set resultingEventTransformerSet, long userId, String token, @@ -122,7 +122,7 @@ public abstract class ReactiveApiPublisher { return new ReactiveApiPublisherToken(sharedTdlibServers, resultingEventTransformerSet, userId, token, lane); } - public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedServer sharedTdlibServers, + public static ReactiveApiPublisher fromPhoneNumber(TdlibChannelsSharedHost sharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber, @@ -551,7 +551,7 @@ public abstract class ReactiveApiPublisher { private final String botToken; - public ReactiveApiPublisherToken(TdlibChannelsSharedServer sharedTdlibServers, + public ReactiveApiPublisherToken(TdlibChannelsSharedHost sharedTdlibServers, Set resultingEventTransformerSet, long userId, String botToken, @@ -583,7 +583,7 @@ public abstract class ReactiveApiPublisher { private final long phoneNumber; - public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedServer sharedTdlibServers, + public ReactiveApiPublisherPhoneNumber(TdlibChannelsSharedHost sharedTdlibServers, Set resultingEventTransformerSet, long userId, long phoneNumber, diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java new file mode 100644 index 0000000..af86e9a --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java @@ -0,0 +1,106 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Object; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.OnRequest; +import it.tdlight.reactiveapi.Event.OnResponse; +import java.io.Closeable; +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.SignalType; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.Empty; +import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; + +public class TdlibChannelsSharedHost implements Closeable { + + private static final Logger LOG = LogManager.getLogger(TdlibChannelsSharedHost.class); + private static final RetryBackoffSpec RETRY_STRATEGY = Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(signal -> LOG.warn("Retrying channel with signal {}", signal)); + + private final TdlibChannelsServers tdServersChannels; + private final Disposable responsesSub; + private final AtomicReference requestsSub = new AtomicReference<>(); + private final Many> responses = Sinks.many().multicast().onBackpressureBuffer(65535); + private final Map>> events; + private final Flux>> requests; + + public TdlibChannelsSharedHost(Set allLanes, TdlibChannelsServers tdServersChannels) { + this.tdServersChannels = tdServersChannels; + this.responsesSub = tdServersChannels.response() + .sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .retryWhen(RETRY_STRATEGY) + .subscribeOn(Schedulers.parallel()) + .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex)); + events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> { + Many> sink = Sinks.many().multicast().onBackpressureBuffer(65535); + var outputEventsFlux = Flux + .merge(sink.asFlux().map(flux -> flux.subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE) + .doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s)); + tdServersChannels + .events(lane) + .sendMessages(outputEventsFlux) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .retryWhen(RETRY_STRATEGY) + .subscribeOn(Schedulers.parallel()) + .subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending events to lane {}", lane, ex)); + return sink; + })); + this.requests = tdServersChannels.request().consumeMessages() + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .retryWhen(RETRY_STRATEGY) + .doOnError(ex -> LOG.error("Unexpected error when receiving requests", ex)) + .doFinally(s -> LOG.debug("Input requests flux terminated with signal {}", s)); + } + + public Flux>> requests() { + return requests + //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) + .log("requests", Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT); + } + + public Disposable events(String lane, Flux eventFlux) { + Empty canceller = Sinks.empty(); + var eventsSink = events.get(lane); + if (eventsSink == null) { + throw new IllegalArgumentException("Lane " + lane + " does not exist"); + } + eventsSink.emitNext(eventFlux.takeUntilOther(canceller.asMono()), + EmitFailureHandler.busyLooping(Duration.ofMillis(100)) + ); + return () -> canceller.tryEmitEmpty(); + } + + public Many> responses() { + return responses; + } + + @Override + public void close() { + responsesSub.dispose(); + var requestsSub = this.requestsSub.get(); + if (requestsSub != null) { + requestsSub.dispose(); + } + tdServersChannels.close(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java similarity index 53% rename from src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java rename to src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java index ef2457a..2917991 100644 --- a/src/main/java/it/tdlight/reactiveapi/ClientsSharedTdlib.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedReceive.java @@ -5,6 +5,7 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnResponse; import java.io.Closeable; +import java.time.Duration; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicReference; @@ -13,14 +14,24 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; -public class ClientsSharedTdlib implements Closeable { +public class TdlibChannelsSharedReceive implements Closeable { - private static final Logger LOG = LogManager.getLogger(ClientsSharedTdlib.class); + private static final Logger LOG = LogManager.getLogger(TdlibChannelsSharedReceive.class); + + private static final RetryBackoffSpec RETRY_STRATEGY = Retry + .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .maxBackoff(Duration.ofSeconds(16)) + .jitter(1.0) + .doBeforeRetry(signal -> LOG.warn("Retrying channel with signal {}", signal)); private final TdlibChannelsClients tdClientsChannels; private final AtomicReference responsesSub = new AtomicReference<>(); @@ -28,19 +39,33 @@ public class ClientsSharedTdlib implements Closeable { private final AtomicReference eventsSub = new AtomicReference<>(); private final Flux>> responses; private final Map>> events; - private final Many> requests = Sinks.many().unicast() - .onBackpressureBuffer(Queues.>get(65535).get()); + private final Many> requests = Sinks.many().multicast().onBackpressureBuffer(65535); - public ClientsSharedTdlib(TdlibChannelsClients tdClientsChannels) { + public TdlibChannelsSharedReceive(TdlibChannelsClients tdClientsChannels) { this.tdClientsChannels = tdClientsChannels; - this.responses = tdClientsChannels.response().consumeMessages().repeat(); + this.responses = tdClientsChannels + .response() + .consumeMessages() + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .retryWhen(RETRY_STRATEGY) + .doFinally(s -> LOG.debug("Input responses flux terminated with signal {}", s)); this.events = tdClientsChannels.events().entrySet().stream() - .collect(Collectors.toUnmodifiableMap(Entry::getKey, e -> e.getValue().consumeMessages().repeat())); + .collect(Collectors.toUnmodifiableMap(Entry::getKey, + e -> e + .getValue() + .consumeMessages() + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .retryWhen(RETRY_STRATEGY) + .doFinally(s -> LOG.debug("Input events flux of lane \"{}\" terminated with signal {}", e.getKey(), s)) + )); + var requestsFlux = Flux.defer(() -> requests.asFlux() + .doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s))); this.requestsSub = tdClientsChannels.request() - .sendMessages(requests.asFlux()) - .repeat() + .sendMessages(requestsFlux) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(5))) + .retryWhen(RETRY_STRATEGY) .subscribeOn(Schedulers.parallel()) - .subscribe(); + .subscribe(n -> {}, ex -> requests.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)))); } public Flux>> responses() { diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java deleted file mode 100644 index 2113353..0000000 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedServer.java +++ /dev/null @@ -1,65 +0,0 @@ -package it.tdlight.reactiveapi; - -import it.tdlight.jni.TdApi; -import it.tdlight.jni.TdApi.Object; -import it.tdlight.reactiveapi.Event.ClientBoundEvent; -import it.tdlight.reactiveapi.Event.OnRequest; -import it.tdlight.reactiveapi.Event.OnResponse; -import java.io.Closeable; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.SignalType; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Many; -import reactor.core.scheduler.Schedulers; -import reactor.util.concurrent.Queues; - -public class TdlibChannelsSharedServer implements Closeable { - - private final TdlibChannelsServers tdServersChannels; - private final Disposable responsesSub; - private final AtomicReference requestsSub = new AtomicReference<>(); - private final Many> responses = Sinks.many().unicast().onBackpressureBuffer( - Queues.>get(65535).get()); - private final Flux>> requests; - - public TdlibChannelsSharedServer(TdlibChannelsServers tdServersChannels) { - this.tdServersChannels = tdServersChannels; - this.responsesSub = tdServersChannels.response() - .sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT)) - .repeat() - .subscribeOn(Schedulers.parallel()) - .subscribe(); - this.requests = tdServersChannels.request().consumeMessages().repeat(); - } - - public Flux>> requests() { - return requests - //.onBackpressureBuffer(8192, BufferOverflowStrategy.DROP_OLDEST) - .log("requests", Level.FINEST, SignalType.REQUEST, SignalType.ON_NEXT); - } - - public Disposable events(String lane, Flux eventFlux) { - return tdServersChannels.events(lane) - .sendMessages(eventFlux) - .repeat() - .subscribeOn(Schedulers.parallel()) - .subscribe(); - } - - public Many> responses() { - return responses; - } - - @Override - public void close() { - responsesSub.dispose(); - var requestsSub = this.requestsSub.get(); - if (requestsSub != null) { - requestsSub.dispose(); - } - tdServersChannels.close(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/CancelledChannelException.java b/src/main/java/it/tdlight/reactiveapi/rsocket/CancelledChannelException.java new file mode 100644 index 0000000..dbb52ff --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/CancelledChannelException.java @@ -0,0 +1,22 @@ +package it.tdlight.reactiveapi.rsocket; + +import java.nio.channels.ClosedChannelException; + +public class CancelledChannelException extends java.io.IOException { + + public CancelledChannelException() { + } + + public CancelledChannelException(String message) { + super(message); + } + + public CancelledChannelException(String message, Throwable cause) { + super(message, cause); + } + + public CancelledChannelException(Throwable cause) { + super(cause); + } + +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java new file mode 100644 index 0000000..fabe5b8 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java @@ -0,0 +1,243 @@ +package it.tdlight.reactiveapi.rsocket; + +import com.google.common.net.HostAndPort; +import io.rsocket.Closeable; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.util.DefaultPayload; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ClientPendingEventsToProduce; +import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ServerPendingEventsToProduce; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.logging.Level; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Empty; +import reactor.core.publisher.UnicastProcessor; +import reactor.util.function.Tuple2; +import reactor.util.function.Tuples; +import reactor.util.retry.Retry; +import reactor.util.retry.RetryBackoffSpec; + +public class MyRSocketClient implements Closeable, RSocketChannelManager, SocketAcceptor { + + private static final Logger LOG = LogManager.getLogger(MyRSocketClient.class); + + private final Empty closeRequest = Sinks.empty(); + private final AtomicReference clientRef = new AtomicReference<>(); + private final Mono client; + private final ConcurrentMap messagesToProduce = new ConcurrentHashMap<>(); + + public MyRSocketClient(HostAndPort baseHost) { + RetryBackoffSpec retryStrategy = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(16)).jitter(1.0); + var transport = TcpClientTransport.create(baseHost.getHost(), baseHost.getPort()); + + this.client = RSocketConnector.create() + .setupPayload(DefaultPayload.create("client", "setup-info")) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .acceptor(this) + .connect(transport) + .retryWhen(retryStrategy) + .doOnNext(clientRef::set) + .cacheInvalidateIf(RSocket::isDisposed) + .takeUntilOther(closeRequest.asMono()) + .doOnDiscard(RSocket.class, RSocket::dispose); + } + + @Override + public EventConsumer registerConsumer(ChannelCodec channelCodec, String channelName) { + LOG.debug("Registering consumer for channel \"{}\"", channelName); + Deserializer deserializer; + try { + deserializer = channelCodec.getNewDeserializer(); + } catch (Throwable ex) { + LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); + throw new IllegalStateException("Failed to create codec for channel \"" + channelName + "\"", ex); + } + return new EventConsumer() { + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Flux> consumeMessages() { + return client.flatMapMany(client -> { + var rawFlux = client.requestStream(DefaultPayload.create(channelName, "notify-can-consume")); + return rawFlux.map(elementPayload -> { + var slice = elementPayload.sliceData(); + byte[] elementBytes = new byte[slice.readableBytes()]; + slice.readBytes(elementBytes, 0, elementBytes.length); + return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes)); + }).log("CLIENT_CONSUME_MESSAGES", Level.FINE); + }); + } + }; + } + + @Override + public EventProducer registerProducer(ChannelCodec channelCodec, String channelName) { + LOG.debug("Registering producer for channel \"{}\"", channelName); + Serializer serializer; + try { + serializer = channelCodec.getNewSerializer(); + } catch (Throwable ex) { + LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); + throw new IllegalStateException("Failed to create codec for channel \"" + channelName + "\"", ex); + } + Empty emitCloseRequest = Sinks.empty(); + return new EventProducer() { + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Mono sendMessages(Flux eventsFlux) { + return client.flatMap(client -> { + LOG.debug("Subscribed to channel \"{}\", sending messages to server", channelName); + var rawFlux = eventsFlux + .map(event -> DefaultPayload.create(serializer.serialize(null, event))) + .log("CLIENT_PRODUCE_MESSAGES", Level.FINE) + .takeUntilOther(emitCloseRequest.asMono().doFinally(s -> LOG.debug("Producer of channel \"{}\" ended the flux because emit close request ended with signal {}", channelName, s))) + .doFinally(s -> LOG.debug("Producer of channel \"{}\" ended the flux with signal {}", channelName, s)) + .doOnError(ex -> LOG.error("Producer of channel \"{}\" ended the flux with an error", channelName, ex)); + final ServerPendingEventsToProduce myPendingEventsToProduce = new ServerPendingEventsToProduce(rawFlux, new CompletableFuture<>(), new CompletableFuture<>()); + var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channelName, n -> myPendingEventsToProduce); + if (pendingEventsToProduce instanceof ClientPendingEventsToProduce clientPendingEventsToProduce) { + if (!clientPendingEventsToProduce.fluxCf().complete(rawFlux)) { + LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); + return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); + } + return Mono + .firstWithSignal(client.onClose(), Mono.fromFuture(clientPendingEventsToProduce::doneCf)) + .doOnError(clientPendingEventsToProduce.doneCf()::completeExceptionally) + .doFinally(s -> { + messagesToProduce.remove(channelName, clientPendingEventsToProduce); + clientPendingEventsToProduce.doneCf().complete(null); + LOG.debug("Producer of channel \"{}\" ended the execution with signal {}", channelName, s); + }); + } else if (pendingEventsToProduce == myPendingEventsToProduce) { + return client + .requestResponse(DefaultPayload.create(channelName, "notify-can-produce")) + .then(Mono.firstWithSignal(client.onClose(), Mono.fromFuture(myPendingEventsToProduce::doneCf))) + .doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally) + .doFinally(s -> { + messagesToProduce.remove(channelName, myPendingEventsToProduce); + myPendingEventsToProduce.doneCf().complete(null); + LOG.debug("Producer of channel \"{}\" ended the execution with signal {}", channelName, s); + }); + } else { + LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); + return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); + } + }); + } + + @Override + public void close() { + emitCloseRequest.tryEmitEmpty(); + } + }; + } + + @Override + public @NotNull Mono onClose() { + return closeRequest.asMono().then(Mono.fromSupplier(clientRef::get).flatMap(Closeable::onClose)); + } + + @Override + public void dispose() { + var client = clientRef.get(); + if (client != null) { + client.dispose(); + } + closeRequest.tryEmitEmpty(); + } + + @Override + public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) { + return Mono.just(new RSocket() { + @Override + public @NotNull Flux requestStream(@NotNull Payload payload) { + return MyRSocketClient.this.requestStream(sendingSocket, payload); + } + }); + } + + @NotNull + private Flux requestStream(RSocket sendingSocket, Payload payload) { + if (payload.getMetadataUtf8().equals("notify-can-consume")) { + var channel = payload.getDataUtf8(); + LOG.debug("Received request for channel \"{}\", sending stream to server", channel); + + final ClientPendingEventsToProduce myPendingEventsToProduce = new ClientPendingEventsToProduce(new CompletableFuture<>(), + new CompletableFuture<>(), + new CompletableFuture<>() + ); + var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channel, n -> myPendingEventsToProduce); + if (pendingEventsToProduce instanceof ServerPendingEventsToProduce serverPendingEventsToProduce) { + if (serverPendingEventsToProduce.initCf().complete(null)) { + return serverPendingEventsToProduce.events() + .doOnError(serverPendingEventsToProduce.doneCf()::completeExceptionally) + .doFinally(s -> { + messagesToProduce.remove(channel, serverPendingEventsToProduce); + serverPendingEventsToProduce.doneCf().complete(null); + }); + } else { + LOG.error("The channel \"{}\" is already active", channel); + return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); + } + } else if (pendingEventsToProduce == myPendingEventsToProduce) { + if (myPendingEventsToProduce.initCf().complete(null)) { + return Mono + .fromFuture(myPendingEventsToProduce::fluxCf) + .flatMapMany(flux -> flux) + .doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally) + .doFinally(s -> myPendingEventsToProduce.doneCf().complete(null)); + } else { + LOG.error("The channel \"{}\" is already active", channel); + return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); + } + } else { + LOG.error("The channel \"{}\" is already active", channel); + return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); + } + } else { + LOG.warn("Received invalid request stream"); + return Flux.empty(); + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java new file mode 100644 index 0000000..14a44d5 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java @@ -0,0 +1,314 @@ +package it.tdlight.reactiveapi.rsocket; + +import com.google.common.net.HostAndPort; +import io.rsocket.Closeable; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.rsocket.MyRSocketServer.PendingEventsToConsume.ClientPendingEventsToConsume; +import it.tdlight.reactiveapi.rsocket.MyRSocketServer.PendingEventsToConsume.ServerPendingEventsToConsume; +import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ClientPendingEventsToProduce; +import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ServerPendingEventsToProduce; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import java.util.logging.Level; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.CopyOnWriteMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class MyRSocketServer implements Closeable, RSocketChannelManager, SocketAcceptor { + + private static final Logger LOG = LogManager.getLogger(MyRSocketServer.class); + + private final Mono server; + private final Map> consumers = new CopyOnWriteMap<>(); + private final Map> producers = new CopyOnWriteMap<>(); + + private final ConcurrentMap messagesToConsume + = new ConcurrentHashMap<>(); + + sealed interface PendingEventsToConsume { + record ClientPendingEventsToConsume(Flux doneCf, + CompletableFuture initCf) implements PendingEventsToConsume {} + record ServerPendingEventsToConsume(CompletableFuture> doneCf, + CompletableFuture initCf) implements PendingEventsToConsume {} + } + + private final ConcurrentMap messagesToProduce = new ConcurrentHashMap<>(); + + public MyRSocketServer(HostAndPort baseHost) { + this.server = RSocketServer + .create(this) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort())) + .cache(); + } + + @Override + public EventConsumer registerConsumer(ChannelCodec channelCodec, String channelName) { + LOG.debug("Registering consumer for channel \"{}\"", channelName); + var consumer = new EventConsumer() { + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Flux> consumeMessages() { + Deserializer deserializer; + try { + deserializer = channelCodec.getNewDeserializer(); + } catch (Throwable ex) { + LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); + return Flux.error(new IllegalStateException("Failed to create codec for channel " + channelName)); + } + return Flux.defer(() -> { + var myPendingEventsToConsume = new ServerPendingEventsToConsume(new CompletableFuture<>(), new CompletableFuture<>()); + var pendingEventsToConsume = messagesToConsume.computeIfAbsent(channelName, n -> myPendingEventsToConsume); + if (pendingEventsToConsume instanceof ClientPendingEventsToConsume clientPendingEventsToConsume) { + if (clientPendingEventsToConsume.initCf.complete(null)) { + return server.thenMany(clientPendingEventsToConsume.doneCf + .map(elementPayload -> { + var slice = elementPayload.sliceData(); + byte[] elementBytes = new byte[slice.readableBytes()]; + slice.readBytes(elementBytes, 0, elementBytes.length); + return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes)); + }) + .log("SERVER_CONSUME_MESSAGES", Level.FINE) + .doFinally(s -> messagesToConsume.remove(channelName, clientPendingEventsToConsume))); + } else { + LOG.error("Channel is already consuming"); + return Mono.error(new CancelledChannelException("Channel is already consuming")); + } + } else if (pendingEventsToConsume == myPendingEventsToConsume) { + return server.thenMany(Mono + .fromFuture(myPendingEventsToConsume::doneCf) + .flatMapMany(Function.identity()) + .map(elementPayload -> { + var slice = elementPayload.sliceData(); + byte[] elementBytes = new byte[slice.readableBytes()]; + slice.readBytes(elementBytes, 0, elementBytes.length); + return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes)); + }) + .doFinally(s -> messagesToConsume.remove(channelName, myPendingEventsToConsume))); + } else { + LOG.error("Channel is already consuming"); + return Mono.error(new CancelledChannelException("Channel is already consuming")); + } + }); + } + }; + var prev = this.consumers.put(channelName, consumer); + if (prev != null) { + LOG.error("Consumer \"{}\" was already registered", channelName); + } + return consumer; + } + + @Override + public EventProducer registerProducer(ChannelCodec channelCodec, String channelName) { + LOG.debug("Registering producer for channel \"{}\"", channelName); + Serializer serializer; + try { + serializer = channelCodec.getNewSerializer(); + } catch (Throwable ex) { + LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); + throw new UnsupportedOperationException("Failed to create codec for channel \"" + channelName + "\"", ex); + } + var producer = new EventProducer() { + @Override + public ChannelCodec getChannelCodec() { + return channelCodec; + } + + @Override + public String getChannelName() { + return channelName; + } + + @Override + public Mono sendMessages(Flux eventsFlux) { + var serverCloseEvent = server + .flatMap(CloseableChannel::onClose) + .doOnSuccess(s -> + LOG.debug("Channel \"{}\" messages send flux will end because the server is closed", channelName)); + return Mono.defer(() -> { + var rawFlux = eventsFlux + .log("SERVER_PRODUCE_MESSAGES", Level.FINE) + .map(element -> DefaultPayload.create(serializer.serialize(null, element))); + final ServerPendingEventsToProduce myPendingEventsToProduce = new ServerPendingEventsToProduce(rawFlux, new CompletableFuture<>(), new CompletableFuture<>()); + var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channelName, n -> myPendingEventsToProduce); + if (pendingEventsToProduce instanceof ClientPendingEventsToProduce clientPendingEventsToProduce) { + if (clientPendingEventsToProduce.fluxCf().complete(rawFlux)) { + return Mono + .firstWithSignal(Mono.fromFuture(clientPendingEventsToProduce::doneCf), serverCloseEvent) + .doFinally(s -> { + messagesToProduce.remove(channelName, clientPendingEventsToProduce); + clientPendingEventsToProduce.doneCf().complete(null); + }); + } else { + LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); + return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); + } + } else if (pendingEventsToProduce == myPendingEventsToProduce) { + return Mono.firstWithSignal(Mono.fromFuture(myPendingEventsToProduce::doneCf), serverCloseEvent) + .doFinally(s -> { + messagesToProduce.remove(channelName, myPendingEventsToProduce); + myPendingEventsToProduce.doneCf().complete(null); + }); + } else { + LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); + return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); + } + }); + } + + @Override + public void close() { + + } + }; + var prev = this.producers.put(channelName, producer); + if (prev != null) { + LOG.error("Producer \"{}\" was already registered", channelName); + prev.close(); + } + return producer; + } + + @Override + public @NotNull Mono onClose() { + return server.flatMap(CloseableChannel::onClose); + } + + @Override + public void dispose() { + server.doOnNext(CloseableChannel::dispose).subscribe(n -> {}, ex -> LOG.error("Failed to dispose the server", ex)); + } + + @Override + public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) { + if (!setup.getMetadataUtf8().equals("setup-info") || !setup.getDataUtf8().equals("client")) { + LOG.warn("Invalid setup metadata!"); + return Mono.just(new RSocket() {}); + } + return Mono.just(new RSocket() { + @Override + public @NotNull Flux requestStream(@NotNull Payload payload) { + return MyRSocketServer.this.requestStream(sendingSocket, payload); + } + + @Override + public @NotNull Mono requestResponse(@NotNull Payload payload) { + return MyRSocketServer.this.requestResponse(sendingSocket, payload); + } + }); + } + + private Mono requestResponse(RSocket sendingSocket, Payload payload) { + if (payload.getMetadataUtf8().equals("notify-can-produce")) { + var channel = payload.getDataUtf8(); + var consumer = consumers.get(channel); + if (consumer != null) { + var rawFlux = sendingSocket.requestStream(DefaultPayload.create(channel, "notify-can-consume")); + var myNewPendingEventsToConsume = new ClientPendingEventsToConsume(rawFlux, new CompletableFuture<>()); + var pendingEventsToConsume = messagesToConsume.computeIfAbsent(channel, n -> myNewPendingEventsToConsume); + LOG.debug("Received request for channel \"{}\", requesting stream to client", channel); + if (pendingEventsToConsume instanceof ServerPendingEventsToConsume serverPendingEventsToConsume) { + //messagesToConsume.remove(channel, pendingEventsToConsume); + if (!serverPendingEventsToConsume.doneCf.complete(rawFlux)) { + LOG.error("The server is already producing to channel \"{}\", the request will be rejected", channel); + return Mono.error(new IllegalStateException("The server is already producing to channel \"" + channel + "\"")); + } + return Mono.just(DefaultPayload.create("ok", "response")); + } else if (pendingEventsToConsume == myNewPendingEventsToConsume) { + //messagesToConsume.remove(channel, pendingEventsToConsume); + return Mono + .fromFuture(myNewPendingEventsToConsume::initCf) + .thenReturn(DefaultPayload.create("ok", "response")); + } else { + LOG.warn("Received request for channel \"{}\", but the channel is already active", channel); + return Mono.error(new IllegalStateException("Channel " + channel + " is already active")); + } + } else { + LOG.warn("Received request for channel \"{}\", but no channel with that name is registered", channel); + return Mono.error(new IllegalStateException("Channel " + channel + " does not exist, or it has not been registered")); + } + } else { + LOG.warn("Received invalid request"); + return Mono.error(new UnsupportedOperationException("Invalid request")); + } + } + + @NotNull + private Flux requestStream(RSocket sendingSocket, Payload payload) { + if (payload.getMetadataUtf8().equals("notify-can-consume")) { + var channel = payload.getDataUtf8(); + var producer = producers.get(channel); + if (producer != null) { + final ClientPendingEventsToProduce myPendingEventsToProduce = new ClientPendingEventsToProduce(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>()); + var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channel, n -> myPendingEventsToProduce); + if (pendingEventsToProduce instanceof ServerPendingEventsToProduce serverPendingEventsToProduce) { + if (serverPendingEventsToProduce.initCf().complete(null)) { + return serverPendingEventsToProduce.events() + .doOnError(serverPendingEventsToProduce.doneCf()::completeExceptionally) + .doFinally(s -> { + messagesToProduce.remove(channel, serverPendingEventsToProduce); + serverPendingEventsToProduce.doneCf().complete(null); + }); + } else { + LOG.error("The channel \"{}\" is already active", channel); + return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); + } + } else if (pendingEventsToProduce == myPendingEventsToProduce) { + if (myPendingEventsToProduce.initCf().complete(null)) { + return Mono + .fromFuture(myPendingEventsToProduce::fluxCf) + .flatMapMany(flux -> flux) + .doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally) + .doFinally(s -> { + messagesToProduce.remove(channel, myPendingEventsToProduce); + myPendingEventsToProduce.doneCf().complete(null); + }); + + } else { + LOG.error("The channel \"{}\" is already active", channel); + return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); + } + } else { + LOG.error("The channel \"{}\" is already active", channel); + return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); + } + } else { + LOG.warn("No producer registered for channel \"{}\"", channel); + return Flux.error(new CancelledChannelException("No producer registered for channel \"" + channel + "\"")); + } + } else { + LOG.warn("Received invalid request stream"); + return Flux.error(new CancelledChannelException("Received invalid request stream")); + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/PendingEventsToProduce.java b/src/main/java/it/tdlight/reactiveapi/rsocket/PendingEventsToProduce.java new file mode 100644 index 0000000..a0ff915 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/PendingEventsToProduce.java @@ -0,0 +1,15 @@ +package it.tdlight.reactiveapi.rsocket; + +import io.rsocket.Payload; +import java.util.concurrent.CompletableFuture; +import reactor.core.publisher.Flux; + +sealed interface PendingEventsToProduce { + + record ServerPendingEventsToProduce(Flux events, CompletableFuture initCf, + CompletableFuture doneCf) implements PendingEventsToProduce {} + + record ClientPendingEventsToProduce(CompletableFuture doneCf, + CompletableFuture> fluxCf, + CompletableFuture initCf) implements PendingEventsToProduce {} +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketChannelManager.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketChannelManager.java new file mode 100644 index 0000000..7aeb81c --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketChannelManager.java @@ -0,0 +1,13 @@ +package it.tdlight.reactiveapi.rsocket; + +import io.rsocket.Closeable; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.EventProducer; + +public interface RSocketChannelManager extends Closeable { + + EventConsumer registerConsumer(ChannelCodec channelCodec, String channelName); + + EventProducer registerProducer(ChannelCodec channelCodec, String channelName); +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java deleted file mode 100644 index 55c53aa..0000000 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsClient.java +++ /dev/null @@ -1,94 +0,0 @@ -package it.tdlight.reactiveapi.rsocket; - -import com.google.common.net.HostAndPort; -import io.netty.buffer.ByteBuf; -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.core.RSocketConnector; -import io.rsocket.core.Resume; -import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.client.TcpClientTransport; -import io.rsocket.util.DefaultPayload; -import it.tdlight.reactiveapi.ChannelCodec; -import it.tdlight.reactiveapi.EventConsumer; -import it.tdlight.reactiveapi.RSocketParameters; -import it.tdlight.reactiveapi.Timestamped; -import java.nio.channels.ClosedChannelException; -import java.time.Duration; -import java.util.logging.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.NotNull; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; -import reactor.core.scheduler.Schedulers; -import reactor.util.retry.Retry; - -public class RSocketConsumeAsClient implements EventConsumer { - - private static final Logger LOG = LogManager.getLogger(RSocketConsumeAsClient.class); - - private final HostAndPort host; - private final ChannelCodec channelCodec; - private final String channelName; - - public RSocketConsumeAsClient(HostAndPort hostAndPort, - ChannelCodec channelCodec, - String channelName) { - this.channelCodec = channelCodec; - this.channelName = channelName; - this.host = hostAndPort; - } - - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; - } - - @Override - public String getChannelName() { - return channelName; - } - - @Override - public Flux> consumeMessages() { - var deserializer = channelCodec.getNewDeserializer(); - return - RSocketConnector.create() - //.resume(new Resume()) - .payloadDecoder(PayloadDecoder.ZERO_COPY) - .connect(TcpClientTransport.create(host.getHost(), host.getPort())) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) - .flatMapMany(socket -> socket - .requestStream(DefaultPayload.create("", "consume")) - .map(payload -> { - ByteBuf slice = payload.sliceData(); - var data = new byte[slice.readableBytes()]; - slice.readBytes(data, 0, data.length); - //noinspection unchecked - return new Timestamped(System.currentTimeMillis(), (T) deserializer.deserialize(null, data)); - }) - .doFinally(signalType -> { - socket - .fireAndForget(DefaultPayload.create("", "close")) - .then(socket.onClose().timeout(Duration.ofSeconds(5), Mono.empty())) - .doFinally(s -> socket.dispose()) - .onErrorResume(ex -> Mono.empty()) - .subscribeOn(Schedulers.parallel()) - .subscribe(); - }) - ) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .filter(ex -> ex instanceof ClosedChannelException) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))) - .log("RSOCKET_CONSUMER_CLIENT", Level.FINE); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java deleted file mode 100644 index 437c09a..0000000 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketConsumeAsServer.java +++ /dev/null @@ -1,246 +0,0 @@ -package it.tdlight.reactiveapi.rsocket; - -import com.google.common.net.HostAndPort; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.rsocket.ConnectionSetupPayload; -import io.rsocket.RSocket; -import io.rsocket.SocketAcceptor; -import io.rsocket.core.RSocketConnector; -import io.rsocket.core.RSocketServer; -import io.rsocket.core.Resume; -import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.client.TcpClientTransport; -import io.rsocket.transport.netty.server.CloseableChannel; -import io.rsocket.transport.netty.server.TcpServerTransport; -import io.rsocket.util.DefaultPayload; -import it.tdlight.reactiveapi.ChannelCodec; -import it.tdlight.reactiveapi.EventConsumer; -import it.tdlight.reactiveapi.RSocketParameters; -import it.tdlight.reactiveapi.Timestamped; -import java.nio.channels.ClosedChannelException; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.logging.Level; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.NotNull; -import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; -import reactor.core.Disposable; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Empty; -import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuple3; -import reactor.util.function.Tuples; -import reactor.util.retry.Retry; - -public class RSocketConsumeAsServer implements EventConsumer { - - private static final Logger LOG = LogManager.getLogger(RSocketConsumeAsServer.class); - - private final HostAndPort host; - private final ChannelCodec channelCodec; - private final String channelName; - - public RSocketConsumeAsServer(HostAndPort hostAndPort, - ChannelCodec channelCodec, - String channelName) { - this.channelCodec = channelCodec; - this.channelName = channelName; - this.host = hostAndPort; - } - - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; - } - - @Override - public String getChannelName() { - return channelName; - } - - @Override - public Flux> consumeMessages() { - Deserializer deserializer = channelCodec.getNewDeserializer(); - return Mono - .>>>create(sink -> { - AtomicReference serverRef = new AtomicReference<>(); - var disposable = RSocketServer - .create((setup, in) -> { - var inRawFlux = in.requestStream(DefaultPayload.create("", "consume")); - var inFlux = inRawFlux.map(payload -> { - ByteBuf slice = payload.sliceData(); - var data = new byte[slice.readableBytes()]; - slice.readBytes(data, 0, data.length); - return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, data)); - }); - sink.success(Tuples.of(serverRef.get(), in, inFlux)); - - return Mono.just(new RSocket() {}); - }) - .payloadDecoder(PayloadDecoder.ZERO_COPY) - //.resume(new Resume()) - .bind(TcpServerTransport.create(host.getHost(), host.getPort())) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) - .subscribe(server -> { - serverRef.set(server); - sink.onCancel(server); - }); - sink.onDispose(disposable); - }) - .subscribeOn(Schedulers.boundedElastic()) - .flatMapMany(t -> t.getT3().doFinally(s -> { - t.getT2().dispose(); - t.getT1().dispose(); - })) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .filter(ex -> ex instanceof ClosedChannelException) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))) - .log("RSOCKET_CONSUMER_SERVER", Level.FINE); - } - /*return Flux.defer(() -> { - var deserializer = channelCodec.getNewDeserializer(); - AtomicReference inRef = new AtomicReference<>(); - AtomicReference inSubRef = new AtomicReference<>(); - return Flux.>create(sink -> { - var server = RSocketServer.create((setup, in) -> { - var prev = inRef.getAndSet(in); - if (prev != null) { - prev.dispose(); - } - - var inRawFlux = in.requestStream(DefaultPayload.create("", "consume")); - var inFlux = inRawFlux.map(payload -> { - ByteBuf slice = payload.sliceData(); - var data = new byte[slice.readableBytes()]; - slice.readBytes(data, 0, data.length); - //noinspection unchecked - return new Timestamped<>(System.currentTimeMillis(), (T) deserializer.deserialize(null, data)); - }); - - inFlux.subscribe(new CoreSubscriber<>() { - @Override - public void onSubscribe(@NotNull Subscription s) { - var prevS = inSubRef.getAndSet(s); - if (prevS != null) { - prevS.cancel(); - } else { - sink.onRequest(n -> { - s.request(n); - }); - } - } - - @Override - public void onNext(Timestamped val) { - sink.next(val); - } - - @Override - public void onError(Throwable throwable) { - sink.error(throwable); - } - - @Override - public void onComplete() { - sink.complete(); - } - }); - - return Mono.just(new RSocket() {}); - }).payloadDecoder(PayloadDecoder.ZERO_COPY).bindNow(TcpServerTransport.create(host.getHost(), host.getPort())); - sink.onCancel(() -> { - var inSub = inSubRef.get(); - if (inSub != null) { - inSub.cancel(); - } - }); - sink.onDispose(() -> { - var in = inRef.get(); - if (in != null) { - in.dispose(); - } - server.dispose(); - }); - }).subscribeOn(Schedulers.boundedElastic()).log("RSOCKET_CONSUMER_SERVER", Level.FINE) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to consume as server, retrying. {}", rs))); - });*/ - /* - return Flux.>create(sink -> { - RSocketServer - .create((setup, socket) -> { - socket.requestStream(DefaultPayload.create("", "consume")).map(payload -> { - ByteBuf slice = payload.sliceData(); - var data = new byte[slice.readableBytes()]; - slice.readBytes(data, 0, data.length); - //noinspection unchecked - return new Timestamped<>(System.currentTimeMillis(), (T) deserializer.deserialize(null, data)); - }).subscribe(new CoreSubscriber<>() { - @Override - public void onSubscribe(@NotNull Subscription s) { - sink.onDispose(() -> { - s.cancel(); - socket.dispose(); - }); - sink.onRequest(n -> { - if (n > 8192) { - throw new UnsupportedOperationException( - "Requests count is bigger than max buffer size! " + n + " > " + 8192); - } - s.request(n); - }); - sink.onCancel(() -> s.cancel()); - } - - @Override - public void onNext(Timestamped val) { - sink.next(val); - } - - @Override - public void onError(Throwable throwable) { - sink.error(throwable); - } - - @Override - public void onComplete() { - sink.complete(); - } - }); - return Mono.just(socket); - }) - .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bind(TcpServerTransport.create(host.getHost(), host.getPort())) - .subscribeOn(Schedulers.parallel()) - .subscribe(v -> { - sink.onDispose(v); - }, sink::error, sink::complete); - }) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to consume as server, retrying. {}", rs))); - */ -} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java deleted file mode 100644 index e34e3e1..0000000 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsClient.java +++ /dev/null @@ -1,92 +0,0 @@ -package it.tdlight.reactiveapi.rsocket; - -import com.google.common.net.HostAndPort; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.SocketAcceptor; -import io.rsocket.core.RSocketConnector; -import io.rsocket.core.Resume; -import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.client.TcpClientTransport; -import io.rsocket.util.DefaultPayload; -import it.tdlight.reactiveapi.ChannelCodec; -import it.tdlight.reactiveapi.EventProducer; -import it.tdlight.reactiveapi.RSocketParameters; -import it.tdlight.reactiveapi.ReactorUtils; -import it.tdlight.reactiveapi.Timestamped; -import java.nio.channels.ClosedChannelException; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.logging.Level; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.NotNull; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Empty; -import reactor.util.retry.Retry; - -public final class RSocketProduceAsClient implements EventProducer { - - private static final Logger LOG = LogManager.getLogger(RSocketProduceAsClient.class); - private final ChannelCodec channelCodec; - private final String channelName; - private final HostAndPort host; - private final Empty closeRequest = Sinks.empty(); - - public RSocketProduceAsClient(HostAndPort host, ChannelCodec channelCodec, String channelName) { - this.channelCodec = channelCodec; - this.channelName = channelName; - this.host = host; - } - - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; - } - - @Override - public String getChannelName() { - return channelName; - } - - @Override - public Mono sendMessages(Flux eventsFlux) { - Serializer serializer = channelCodec.getNewSerializer(); - Flux serializedEventsFlux = eventsFlux - .map(event -> DefaultPayload.create(serializer.serialize(null, event))) - .log("RSOCKET_PRODUCER_CLIENT", Level.FINE) - .doFinally(s -> LOG.debug("Events flux ended: {}", s)); - - return - RSocketConnector.create() - .payloadDecoder(PayloadDecoder.ZERO_COPY) - .setupPayload(DefaultPayload.create("", "connect")) - .acceptor(SocketAcceptor.forRequestStream(payload -> serializedEventsFlux)) - //.resume(new Resume()) - .connect(TcpClientTransport.create(host.getHost(), host.getPort())) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) - .flatMap(rSocket -> rSocket.onClose() - .takeUntilOther(closeRequest.asMono().doFinally(s -> rSocket.dispose()))) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .filter(ex -> ex instanceof ClosedChannelException) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))) - .log("RSOCKET_PRODUCER_CLIENT_Y", Level.FINE); - } - - @Override - public void close() { - closeRequest.tryEmitEmpty(); - } -} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java deleted file mode 100644 index b9dd7c1..0000000 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketProduceAsServer.java +++ /dev/null @@ -1,114 +0,0 @@ -package it.tdlight.reactiveapi.rsocket; - -import com.google.common.net.HostAndPort; -import io.netty.buffer.Unpooled; -import io.rsocket.ConnectionSetupPayload; -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.SocketAcceptor; -import io.rsocket.core.RSocketServer; -import io.rsocket.core.Resume; -import io.rsocket.frame.decoder.PayloadDecoder; -import io.rsocket.transport.netty.server.CloseableChannel; -import io.rsocket.transport.netty.server.TcpServerTransport; -import io.rsocket.util.DefaultPayload; -import it.tdlight.reactiveapi.ChannelCodec; -import it.tdlight.reactiveapi.EventProducer; -import it.tdlight.reactiveapi.ReactorUtils; -import java.nio.channels.ClosedChannelException; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Level; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.jetbrains.annotations.NotNull; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.Empty; -import reactor.util.retry.Retry; - -public final class RSocketProduceAsServer implements EventProducer { - - private static final Logger LOG = LogManager.getLogger(RSocketProduceAsServer.class); - private final ChannelCodec channelCodec; - private final String channelName; - private final HostAndPort host; - - private final Empty closeRequest = Sinks.empty(); - - public RSocketProduceAsServer(HostAndPort hostAndPort, ChannelCodec channelCodec, String channelName) { - this.host = hostAndPort; - this.channelCodec = channelCodec; - this.channelName = channelName; - } - - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; - } - - @Override - public String getChannelName() { - return channelName; - } - - @Override - public Mono sendMessages(Flux eventsFlux) { - return Mono.defer(()-> { - AtomicReference serverRef = new AtomicReference<>(); - Serializer serializer = channelCodec.getNewSerializer(); - Flux serializedEventsFlux = eventsFlux - .log("RSOCKET_PRODUCER_SERVER", Level.FINE) - .map(event -> DefaultPayload.create(serializer.serialize(null, event))) - .doFinally(s -> LOG.debug("Events flux ended: {}", s)); - - return RSocketServer - .create(new SocketAcceptor() { - @Override - public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) { - return Mono.just(new RSocket() { - @Override - public @NotNull Mono fireAndForget(@NotNull Payload payload) { - return Mono.fromRunnable(() -> { - var srv = serverRef.get(); - if (srv != null) { - srv.dispose(); - } - }); - } - - @Override - public @NotNull Flux requestStream(@NotNull Payload payload) { - return serializedEventsFlux; - } - }); - } - }) - //.resume(new Resume()) - .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bind(TcpServerTransport.create(host.getHost(), host.getPort())) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to bind, retrying. {}", rs))) - .doOnNext(serverRef::set) - .flatMap(closeableChannel -> closeableChannel.onClose() - .takeUntilOther(closeRequest.asMono().doFinally(s -> closeableChannel.dispose()))) - .retryWhen(Retry - .backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .filter(ex -> ex instanceof ClosedChannelException) - .maxBackoff(Duration.ofSeconds(16)) - .jitter(1.0) - .doBeforeRetry(rs -> LOG.warn("Failed to communicate, retrying. {}", rs))); - }); - } - - @Override - public void close() { - closeRequest.tryEmitEmpty(); - } -} diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 46e9337..0b9f56b 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -14,13 +14,14 @@ - + + - + - + diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index 906cd7a..4ed7342 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -1,59 +1,142 @@ package it.tdlight.reactiveapi.test; -import com.google.common.net.HostAndPort; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.ChannelFactory; +import it.tdlight.reactiveapi.ChannelFactory.RSocketChannelFactory; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.RSocketParameters; import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.rsocket.CancelledChannelException; import it.unimi.dsi.fastutil.ints.IntArrayList; +import java.io.Closeable; +import java.io.IOException; import java.time.Duration; +import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.function.Function; +import java.util.logging.Level; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.retry.Retry; public abstract class TestChannel { - protected HostAndPort hostAndPort; - protected EventConsumer consumer; - protected EventProducer producer; + protected ChannelFactory channelFactory; protected IntArrayList data; + protected ConcurrentLinkedDeque closeables = new ConcurrentLinkedDeque<>(); + protected Function> consumerFactory; + protected Function> producerFactory; + private EventConsumer consumer; + private EventProducer producer; @BeforeEach public void beforeEach() { - hostAndPort = HostAndPort.fromParts("localhost", 25689); - consumer = createConsumer(hostAndPort, false); - producer = createProducer(hostAndPort, false); + var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), "localhost:25689", List.of())); + var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), "localhost:25689", List.of())); + + closeables.offer(consumerFactory); + closeables.offer(producerFactory); + this.consumerFactory = name -> consumerFactory.newConsumer(true, ChannelCodec.UTF8_TEST, name); + this.producerFactory = name -> { + EventProducer p = producerFactory.newProducer(ChannelCodec.UTF8_TEST, name); + closeables.addFirst(p::close); + return p; + }; + consumer = this.consumerFactory.apply("test"); + producer = this.producerFactory.apply("test"); data = new IntArrayList(100); for (int i = 0; i < 100; i++) { data.add(i); } } - public abstract EventConsumer createConsumer(HostAndPort hostAndPort, boolean bomb); - - public abstract EventProducer createProducer(HostAndPort hostAndPort, boolean bomb); - @AfterEach public void afterEach() { - producer.close(); + while (!closeables.isEmpty()) { + var c = closeables.poll(); + try { + c.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } } @Test - public void testSimple() { - Mono sender = producer + public void testProducerEndsSuccessfully() { + Mono eventProducer = producer .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) - .then(Mono.empty()); - Mono receiver = consumer + .then(Mono.empty()) + .timeout(Duration.ofSeconds(5)); + Mono eventConsumer = consumer .consumeMessages() .map(Timestamped::data) .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux - .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) + .blockLast(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testConsumerEndsSuccessfully() { + Mono eventProducer = producer + .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) + .then(Mono.empty()); + Mono eventConsumer = consumer + .consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)) + .timeout(Duration.ofSeconds(5)); + var response = Flux + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) + .blockLast(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testSimple() { + Mono eventProducer = producer + .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) + .then(Mono.empty()); + Mono eventConsumer = consumer + .consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + var response = Flux + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) + .blockLast(); + Assertions.assertEquals(response, data); + System.out.println(response); + } + + @Test + public void testInvertedSubscription() { + Mono eventProducer = producer + .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) + .then(Mono.empty()); + Mono eventConsumer = consumer + .consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)); + var response = Flux + .merge(isConsumerClient() ? List.of(eventConsumer, eventProducer.delaySubscription(Duration.ofSeconds(1))) + : List.of(eventProducer, eventConsumer.delaySubscription(Duration.ofSeconds(1)))) .blockLast(); Assertions.assertEquals(response, data); System.out.println(response); @@ -61,34 +144,34 @@ public abstract class TestChannel { @Test public void testException() { - Mono sender = producer + Mono eventProducer = producer .sendMessages(Flux.concat( Flux.fromIterable(data).map(Integer::toUnsignedString), Mono.error(new FakeException()) )) .then(Mono.empty()); - Mono receiver = consumer + Mono eventConsumer = consumer .consumeMessages() .map(Timestamped::data) .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)); Assertions.assertThrows(Exception.class, () -> Flux - .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .blockLast()); } @Test public void testEmpty() { - Mono sender = producer + Mono eventProducer = producer .sendMessages(Flux.empty()) .then(Mono.empty()); - Mono receiver = consumer + Mono eventConsumer = consumer .consumeMessages() .map(Timestamped::data) .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)); var data = Flux - .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .blockLast(); Assertions.assertNotNull(data); Assertions.assertTrue(data.isEmpty()); @@ -96,15 +179,15 @@ public abstract class TestChannel { @Test public void testSimpleOneByOne() { - var sender = producer.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); - var receiver = consumer + var eventProducer = producer.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); + var eventConsumer = consumer .consumeMessages() .limitRate(1) .map(Timestamped::data) .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux - .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .blockLast(); Assertions.assertEquals(response, data); System.out.println(response); @@ -112,15 +195,15 @@ public abstract class TestChannel { @Test public void testCancel() { - var sender = producer.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); - var receiver = consumer + var eventProducer = producer.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); + var eventConsumer = consumer .consumeMessages() .map(Timestamped::data) .map(Integer::parseUnsignedInt) .take(50, true) .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux - .merge(isServerSender() ? (List.of(sender, receiver)) : List.of(receiver, sender)) + .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .blockLast(); data.removeElements(50, 100); Assertions.assertEquals(response, data); @@ -129,9 +212,9 @@ public abstract class TestChannel { @Test public void testConsumeDelay() { - var sender = producer + var eventProducer = producer .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)); - var receiver = consumer + var eventConsumer = consumer .consumeMessages() .limitRate(1) .map(Timestamped::data) @@ -139,7 +222,7 @@ public abstract class TestChannel { .concatMap(item -> item == 15 ? Mono.just(item).delaySubscription(Duration.ofSeconds(8)) : Mono.just(item)) .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux - .merge(isServerSender() ? List.of(sender, receiver) : List.of(receiver, sender)) + .merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer)) .blockLast(); Assertions.assertEquals(response, data); System.out.println(response); @@ -147,18 +230,18 @@ public abstract class TestChannel { @Test public void testProduceDelay() { - var sender = producer + var eventProducer = producer .sendMessages(Flux.fromIterable(data) .concatMap(item -> item == 15 ? Mono.just(item).delaySubscription(Duration.ofSeconds(8)) : Mono.just(item)) .map(Integer::toUnsignedString)); - var receiver = consumer + var eventConsumer = consumer .consumeMessages() .limitRate(1) .map(Timestamped::data) .map(Integer::parseUnsignedInt) .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux - .merge(isServerSender() ? List.of(sender, receiver) : List.of(receiver, sender)) + .merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer)) .blockLast(); Assertions.assertEquals(response, data); System.out.println(response); @@ -167,33 +250,235 @@ public abstract class TestChannel { @Test public void testConsumeMidCancel() { var dataFlux = Flux.fromIterable(data).publish().autoConnect(); - Mono sender = producer + var eventProducer = producer .sendMessages(dataFlux.map(Integer::toUnsignedString)) - .then(Mono.empty()); - var receiver1 = consumer - .consumeMessages() - .limitRate(1) - .map(Timestamped::data) - .map(Integer::parseUnsignedInt) - .take(10, true) - .collect(Collectors.toCollection(IntArrayList::new)); - var receiver2 = consumer - .consumeMessages() - .limitRate(1) - .map(Timestamped::data) - .map(Integer::parseUnsignedInt) - .collect(Collectors.toCollection(IntArrayList::new)); - Flux part1 = Flux - .merge((isServerSender() ? sender : receiver1), isServerSender() ? receiver1 : sender); - Flux part2 = Flux - .merge((isServerSender() ? sender : receiver2), isServerSender() ? receiver2 : sender); - var response = Flux.concat(part1, part2).reduce((a, b) -> { - a.addAll(b); - return a; - }).block(); - Assertions.assertEquals(response, data); - System.out.println(response); + .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) + .subscribeOn(Schedulers.parallel()) + .subscribe(n -> {}, ex -> Assertions.fail(ex)); + try { + var receiver1 = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .take(10, true) + .collect(Collectors.toCollection(IntArrayList::new)) + .block(); + var receiver2 = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)) + .block(); + Assertions.assertNotNull(receiver1); + Assertions.assertNotNull(receiver2); + receiver1.addAll(receiver2); + Assertions.assertEquals(data, receiver1); + System.out.println(receiver1); + } finally { + eventProducer.dispose(); + } } - public abstract boolean isServerSender(); + @Test + public void testConsumeMidFail() { + var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var eventProducer = producer + .sendMessages(dataFlux.map(Integer::toUnsignedString)) + .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) + .subscribeOn(Schedulers.parallel()) + .subscribe(n -> {}, ex -> Assertions.fail(ex)); + try { + Assertions.assertThrows(FakeException.class, () -> { + consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .doOnNext(n -> { + if (n == 10) { + throw new FakeException(); + } + }) + .collect(Collectors.toCollection(IntArrayList::new)) + .block(); + }); + var receiver2 = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .collect(Collectors.toCollection(IntArrayList::new)) + .block(); + Assertions.assertNotNull(receiver2); + data.removeElements(0, 11); + Assertions.assertEquals(data, receiver2); + System.out.println(receiver2); + } finally { + eventProducer.dispose(); + } + } + + @Test + public void testProduceMidCancel() { + var dataFlux = Flux.fromIterable(data).publish(1).autoConnect(); + var numbers = new ConcurrentLinkedDeque(); + var eventConsumer = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .doOnNext(numbers::offer) + .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) + .subscribeOn(Schedulers.parallel()) + .subscribe(n -> {}, ex -> Assertions.fail(ex)); + try { + producer + .sendMessages(dataFlux.limitRate(1).take(10, true).map(Integer::toUnsignedString)) + .block(); + producer + .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) + .block(); + data.removeInt(data.size() - 1); + Assertions.assertEquals(data, List.copyOf(numbers)); + } finally { + eventConsumer.dispose(); + } + } + + @Test + public void testProduceMidFail() { + var dataFlux = Flux.fromIterable(data).publish(1).autoConnect(); + var numbers = new ConcurrentLinkedDeque(); + var eventConsumer = consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .doOnNext(numbers::offer) + .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1))) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) + .subscribeOn(Schedulers.parallel()) + .subscribe(n -> {}, ex -> Assertions.fail(ex)); + try { + Assertions.assertThrows(FakeException.class, () -> { + producer + .sendMessages(dataFlux.limitRate(1).doOnNext(i -> { + if (i == 10) { + throw new FakeException(); + } + }).map(Integer::toUnsignedString)) + .block(); + }); + producer + .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) + .block(); + data.removeInt(data.size() - 1); + data.removeInt(10); + Assertions.assertEquals(data, List.copyOf(numbers)); + } finally { + eventConsumer.dispose(); + } + } + + @Test + public void testResubscribe() throws InterruptedException { + var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var eventProducer = producer + .sendMessages(dataFlux.map(Integer::toUnsignedString)) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) + .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) + .subscribe(n -> {}, ex -> Assertions.fail(ex)); + try { + consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .take(10, true) + .log("consumer-1", Level.INFO) + .blockLast(); + + Thread.sleep(4000); + + consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .log("consumer-2", Level.INFO) + .blockLast(); + } finally { + eventProducer.dispose(); + } + } + + @Test + public void testFailTwoSubscribers() { + var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var eventProducer = producer + .sendMessages(dataFlux.map(Integer::toUnsignedString)) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) + .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) + .subscribe(n -> {}, ex -> Assertions.fail(ex)); + + Assertions.assertThrows(CancelledChannelException.class, () -> { + try { + Mono + .when(consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .log("consumer-1", Level.INFO) + .doOnError(ex -> Assertions.fail(ex)), + consumer + .consumeMessages() + .limitRate(1) + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .log("consumer-2", Level.INFO) + .onErrorResume(io.rsocket.exceptions.ApplicationErrorException.class, + ex -> Mono.error(new CancelledChannelException(ex)) + ) + ) + .block(); + } catch (RuntimeException ex) { + throw Exceptions.unwrap(ex); + } finally { + eventProducer.dispose(); + } + }); + } + + @Test + public void testRepublish() throws InterruptedException { + var dataFlux = Flux.fromIterable(data).publish().autoConnect(); + var eventConsumer = consumer.consumeMessages() + .map(Timestamped::data) + .map(Integer::parseUnsignedInt) + .repeatWhen(n -> n.delayElements(Duration.ofSeconds(1))) + .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) + .subscribe(n -> {}, ex -> Assertions.fail(ex)); + try { + producer + .sendMessages(dataFlux + .take(10, true) + .log("producer-1", Level.INFO) + .map(Integer::toUnsignedString)) + .block(); + Thread.sleep(4000); + producer + .sendMessages(dataFlux.log("producer-2", Level.INFO).map(Integer::toUnsignedString)) + .block(); + } finally { + eventConsumer.dispose(); + } + } + + public abstract boolean isConsumerClient(); } diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java index df18f75..4e8f018 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestClientToServerChannel.java @@ -4,31 +4,11 @@ import com.google.common.net.HostAndPort; import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; -import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsServer; -import it.tdlight.reactiveapi.rsocket.RSocketProduceAsClient; public class TestClientToServerChannel extends TestChannel { @Override - public EventConsumer createConsumer(HostAndPort hostAndPort, boolean bomb) { - ChannelCodec codec = ChannelCodec.UTF8_TEST; - if (bomb) { - codec = new ChannelCodec(codec.getSerializerClass(), BombDeserializer.class); - } - return new RSocketConsumeAsServer<>(hostAndPort, codec, "test"); - } - - @Override - public EventProducer createProducer(HostAndPort hostAndPort, boolean bomb) { - ChannelCodec codec = ChannelCodec.UTF8_TEST; - if (bomb) { - codec = new ChannelCodec(BombSerializer.class, codec.getDeserializerClass()); - } - return new RSocketProduceAsClient<>(hostAndPort, codec, "test"); - } - - @Override - public boolean isServerSender() { + public boolean isConsumerClient() { return false; } diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java index b109661..ab4f00b 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestServerToClientChannel.java @@ -5,8 +5,6 @@ import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.Timestamped; -import it.tdlight.reactiveapi.rsocket.RSocketConsumeAsClient; -import it.tdlight.reactiveapi.rsocket.RSocketProduceAsServer; import it.unimi.dsi.fastutil.ints.IntArrayList; import java.util.List; import java.util.stream.Collectors; @@ -18,25 +16,7 @@ import reactor.core.publisher.Mono; public class TestServerToClientChannel extends TestChannel { @Override - public EventConsumer createConsumer(HostAndPort hostAndPort, boolean bomb) { - ChannelCodec codec = ChannelCodec.UTF8_TEST; - if (bomb) { - codec = new ChannelCodec(codec.getSerializerClass(), BombDeserializer.class); - } - return new RSocketConsumeAsClient<>(hostAndPort, codec, "test"); - } - - @Override - public EventProducer createProducer(HostAndPort hostAndPort, boolean bomb) { - ChannelCodec codec = ChannelCodec.UTF8_TEST; - if (bomb) { - codec = new ChannelCodec(BombSerializer.class, codec.getDeserializerClass()); - } - return new RSocketProduceAsServer<>(hostAndPort, codec, "test"); - } - - @Override - public boolean isServerSender() { + public boolean isConsumerClient() { return true; } } diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java index f8bbcff..98c9820 100644 --- a/src/test/java/module-info.java +++ b/src/test/java/module-info.java @@ -9,4 +9,6 @@ module tdlib.reactive.api.test { requires it.unimi.dsi.fastutil; requires org.reactivestreams; requires kafka.clients; + requires java.logging; + requires rsocket.core; } \ No newline at end of file diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 46e9337..0b9f56b 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -14,13 +14,14 @@ - + + - + - +