diff --git a/pom.xml b/pom.xml index 6586969..cb1f5ab 100644 --- a/pom.xml +++ b/pom.xml @@ -81,6 +81,11 @@ runtime 3.4.23 + + io.projectreactor + reactor-test + test + org.jetbrains annotations diff --git a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java index 4a6d689..d7ea483 100644 --- a/src/main/java/it/tdlight/reactiveapi/EventConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/EventConsumer.java @@ -5,9 +5,5 @@ import reactor.core.publisher.Flux; public interface EventConsumer { - ChannelCodec getChannelCodec(); - - String getChannelName(); - Flux> consumeMessages(); } diff --git a/src/main/java/it/tdlight/reactiveapi/EventProducer.java b/src/main/java/it/tdlight/reactiveapi/EventProducer.java index b1f91f5..faf8a6b 100644 --- a/src/main/java/it/tdlight/reactiveapi/EventProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/EventProducer.java @@ -5,10 +5,6 @@ import reactor.core.publisher.Mono; public interface EventProducer { - ChannelCodec getChannelCodec(); - - String getChannelName(); - Mono sendMessages(Flux eventsFlux); void close(); diff --git a/src/main/java/it/tdlight/reactiveapi/FutureEventConsumer.java b/src/main/java/it/tdlight/reactiveapi/FutureEventConsumer.java new file mode 100644 index 0000000..93a2488 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/FutureEventConsumer.java @@ -0,0 +1,19 @@ +package it.tdlight.reactiveapi; + +import java.util.function.Function; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class FutureEventConsumer implements EventConsumer { + + private final Mono> future; + + public FutureEventConsumer(Mono> future) { + this.future = future.cache(); + } + + @Override + public Flux> consumeMessages() { + return future.flatMapMany(EventConsumer::consumeMessages); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/FutureEventProducer.java b/src/main/java/it/tdlight/reactiveapi/FutureEventProducer.java new file mode 100644 index 0000000..98ab837 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/FutureEventProducer.java @@ -0,0 +1,25 @@ +package it.tdlight.reactiveapi; + +import java.util.function.Function; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class FutureEventProducer implements EventProducer { + + private final Mono> future; + + public FutureEventProducer(Mono> future) { + this.future = future.cache(); + } + + @Override + public Mono sendMessages(Flux eventsFlux) { + return future.flatMap(ep -> ep.sendMessages(eventsFlux)); + } + + @Override + public void close() { + future.doOnNext(EventProducer::close).subscribeOn(Schedulers.parallel()).subscribe(); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/SimpleEventProducer.java b/src/main/java/it/tdlight/reactiveapi/SimpleEventProducer.java new file mode 100644 index 0000000..dac3a34 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/SimpleEventProducer.java @@ -0,0 +1,25 @@ +package it.tdlight.reactiveapi; + +import java.time.Duration; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.Empty; + +public abstract class SimpleEventProducer implements EventProducer { + + private final Empty closeRequest = Sinks.empty(); + + @Override + public final Mono sendMessages(Flux eventsFlux) { + return handleSendMessages(eventsFlux.takeUntilOther(closeRequest.asMono())); + } + + public abstract Mono handleSendMessages(Flux eventsFlux); + + @Override + public final void close() { + closeRequest.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java index b4b6361..9d8daf7 100644 --- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaConsumer.java @@ -85,12 +85,10 @@ public final class KafkaConsumer implements EventConsumer { return quickResponse; } - @Override public ChannelCodec getChannelCodec() { return channelCodec; } - @Override public String getChannelName() { return channelName; } diff --git a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java index 4292504..f76e403 100644 --- a/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java +++ b/src/main/java/it/tdlight/reactiveapi/kafka/KafkaProducer.java @@ -43,12 +43,10 @@ public final class KafkaProducer implements EventProducer { sender = KafkaSender.create(senderOptions.maxInFlight(1024)); } - @Override public ChannelCodec getChannelCodec() { return channelCodec; } - @Override public String getChannelName() { return channelName; } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java new file mode 100644 index 0000000..cc1b914 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ConsumerConnection.java @@ -0,0 +1,90 @@ +package it.tdlight.reactiveapi.rsocket; + +import io.rsocket.Payload; +import it.tdlight.reactiveapi.Timestamped; +import java.time.Duration; +import org.apache.kafka.common.serialization.Deserializer; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.Empty; +import reactor.core.scheduler.Schedulers; + +class ConsumerConnection { + + private Flux> remote; + + private Deserializer local; + + private Empty connected = Sinks.empty(); + + private Empty remoteResult = Sinks.empty(); + + public ConsumerConnection(String channel) { + + } + + public synchronized Flux> connectLocal() { + return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> { + synchronized (ConsumerConnection.this) { + return remote; + } + })); + } + + public synchronized Mono connectRemote() { + return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> { + synchronized (ConsumerConnection.this) { + return remoteResult.asMono(); + } + })); + } + + public synchronized void resetRemote() { + connected = Sinks.empty(); + remoteResult = Sinks.empty(); + remote = null; + local = null; + } + + public synchronized void registerRemote(Flux remote) { + if (this.remote != null) { + throw new IllegalStateException("Remote is already registered"); + } + this.remote = remote + .transformDeferred(flux -> { + synchronized (ConsumerConnection.this) { + assert local != null; + return RSocketUtils.deserialize(flux, local); + } + }) + .map(element -> new Timestamped<>(System.currentTimeMillis(), element)) + .doOnError(ex -> { + synchronized (ConsumerConnection.this) { + remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + }) + .doFinally(s -> { + synchronized (ConsumerConnection.this) { + remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + resetRemote(); + } + }); + onChanged(); + } + + public synchronized void registerLocal(Deserializer local) { + if (this.local != null) { + throw new IllegalStateException("Local is already registered"); + } + this.local = local; + onChanged(); + } + + private synchronized void onChanged() { + if (local != null && remote != null) { + connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java index fabe5b8..2995522 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketClient.java @@ -1,7 +1,6 @@ package it.tdlight.reactiveapi.rsocket; import com.google.common.net.HostAndPort; -import io.rsocket.Closeable; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -9,235 +8,88 @@ import io.rsocket.SocketAcceptor; import io.rsocket.core.RSocketConnector; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.client.TcpClientTransport; -import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.util.DefaultPayload; import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.SimpleEventProducer; import it.tdlight.reactiveapi.Timestamped; -import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ClientPendingEventsToProduce; -import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ServerPendingEventsToProduce; import java.time.Duration; -import java.util.concurrent.CompletableFuture; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.logging.Level; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.CopyOnWriteMap; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; -import reactor.core.Disposable; import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Empty; -import reactor.core.publisher.UnicastProcessor; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; import reactor.util.retry.Retry; import reactor.util.retry.RetryBackoffSpec; -public class MyRSocketClient implements Closeable, RSocketChannelManager, SocketAcceptor { +public class MyRSocketClient implements RSocketChannelManager { - private static final Logger LOG = LogManager.getLogger(MyRSocketClient.class); - - private final Empty closeRequest = Sinks.empty(); - private final AtomicReference clientRef = new AtomicReference<>(); - private final Mono client; - private final ConcurrentMap messagesToProduce = new ConcurrentHashMap<>(); + private final Mono nextClient; + private final AtomicReference lastClient = new AtomicReference<>(); + private final Empty disposeRequest = Sinks.empty(); public MyRSocketClient(HostAndPort baseHost) { RetryBackoffSpec retryStrategy = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(16)).jitter(1.0); var transport = TcpClientTransport.create(baseHost.getHost(), baseHost.getPort()); - this.client = RSocketConnector.create() - .setupPayload(DefaultPayload.create("client", "setup-info")) + this.nextClient = RSocketConnector.create() + //.setupPayload(DefaultPayload.create("client", "setup-info")) .payloadDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(this) + .reconnect(retryStrategy) .connect(transport) - .retryWhen(retryStrategy) - .doOnNext(clientRef::set) - .cacheInvalidateIf(RSocket::isDisposed) - .takeUntilOther(closeRequest.asMono()) - .doOnDiscard(RSocket.class, RSocket::dispose); + .doOnNext(lastClient::set) + .cache(); } @Override public EventConsumer registerConsumer(ChannelCodec channelCodec, String channelName) { - LOG.debug("Registering consumer for channel \"{}\"", channelName); - Deserializer deserializer; - try { - deserializer = channelCodec.getNewDeserializer(); - } catch (Throwable ex) { - LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); - throw new IllegalStateException("Failed to create codec for channel \"" + channelName + "\"", ex); - } + Deserializer deserializer = channelCodec.getNewDeserializer(); return new EventConsumer() { - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; - } - - @Override - public String getChannelName() { - return channelName; - } - @Override public Flux> consumeMessages() { - return client.flatMapMany(client -> { - var rawFlux = client.requestStream(DefaultPayload.create(channelName, "notify-can-consume")); - return rawFlux.map(elementPayload -> { - var slice = elementPayload.sliceData(); - byte[] elementBytes = new byte[slice.readableBytes()]; - slice.readBytes(elementBytes, 0, elementBytes.length); - return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes)); - }).log("CLIENT_CONSUME_MESSAGES", Level.FINE); - }); + return nextClient.flatMapMany(client -> client + .requestStream(DefaultPayload.create(channelName, "channel")) + .transform(flux -> RSocketUtils.deserialize(flux, deserializer)) + .map(event -> new Timestamped<>(System.currentTimeMillis(), event))); } }; } @Override public EventProducer registerProducer(ChannelCodec channelCodec, String channelName) { - LOG.debug("Registering producer for channel \"{}\"", channelName); - Serializer serializer; - try { - serializer = channelCodec.getNewSerializer(); - } catch (Throwable ex) { - LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); - throw new IllegalStateException("Failed to create codec for channel \"" + channelName + "\"", ex); - } - Empty emitCloseRequest = Sinks.empty(); - return new EventProducer() { - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; - } + Serializer serializer = channelCodec.getNewSerializer(); + return new SimpleEventProducer() { @Override - public String getChannelName() { - return channelName; + public Mono handleSendMessages(Flux eventsFlux) { + Flux rawFlux = eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer)); + Flux combinedRawFlux = Flux.just(DefaultPayload.create(channelName, "channel")).concatWith(rawFlux); + return nextClient.flatMapMany(client -> client.requestChannel(combinedRawFlux)).take(1, true).then(); } - @Override - public Mono sendMessages(Flux eventsFlux) { - return client.flatMap(client -> { - LOG.debug("Subscribed to channel \"{}\", sending messages to server", channelName); - var rawFlux = eventsFlux - .map(event -> DefaultPayload.create(serializer.serialize(null, event))) - .log("CLIENT_PRODUCE_MESSAGES", Level.FINE) - .takeUntilOther(emitCloseRequest.asMono().doFinally(s -> LOG.debug("Producer of channel \"{}\" ended the flux because emit close request ended with signal {}", channelName, s))) - .doFinally(s -> LOG.debug("Producer of channel \"{}\" ended the flux with signal {}", channelName, s)) - .doOnError(ex -> LOG.error("Producer of channel \"{}\" ended the flux with an error", channelName, ex)); - final ServerPendingEventsToProduce myPendingEventsToProduce = new ServerPendingEventsToProduce(rawFlux, new CompletableFuture<>(), new CompletableFuture<>()); - var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channelName, n -> myPendingEventsToProduce); - if (pendingEventsToProduce instanceof ClientPendingEventsToProduce clientPendingEventsToProduce) { - if (!clientPendingEventsToProduce.fluxCf().complete(rawFlux)) { - LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); - return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); - } - return Mono - .firstWithSignal(client.onClose(), Mono.fromFuture(clientPendingEventsToProduce::doneCf)) - .doOnError(clientPendingEventsToProduce.doneCf()::completeExceptionally) - .doFinally(s -> { - messagesToProduce.remove(channelName, clientPendingEventsToProduce); - clientPendingEventsToProduce.doneCf().complete(null); - LOG.debug("Producer of channel \"{}\" ended the execution with signal {}", channelName, s); - }); - } else if (pendingEventsToProduce == myPendingEventsToProduce) { - return client - .requestResponse(DefaultPayload.create(channelName, "notify-can-produce")) - .then(Mono.firstWithSignal(client.onClose(), Mono.fromFuture(myPendingEventsToProduce::doneCf))) - .doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally) - .doFinally(s -> { - messagesToProduce.remove(channelName, myPendingEventsToProduce); - myPendingEventsToProduce.doneCf().complete(null); - LOG.debug("Producer of channel \"{}\" ended the execution with signal {}", channelName, s); - }); - } else { - LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); - return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); - } - }); - } - - @Override - public void close() { - emitCloseRequest.tryEmitEmpty(); - } }; } @Override - public @NotNull Mono onClose() { - return closeRequest.asMono().then(Mono.fromSupplier(clientRef::get).flatMap(Closeable::onClose)); + public Mono onClose() { + return disposeRequest.asMono(); } @Override public void dispose() { - var client = clientRef.get(); - if (client != null) { - client.dispose(); - } - closeRequest.tryEmitEmpty(); - } - - @Override - public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) { - return Mono.just(new RSocket() { - @Override - public @NotNull Flux requestStream(@NotNull Payload payload) { - return MyRSocketClient.this.requestStream(sendingSocket, payload); - } - }); - } - - @NotNull - private Flux requestStream(RSocket sendingSocket, Payload payload) { - if (payload.getMetadataUtf8().equals("notify-can-consume")) { - var channel = payload.getDataUtf8(); - LOG.debug("Received request for channel \"{}\", sending stream to server", channel); - - final ClientPendingEventsToProduce myPendingEventsToProduce = new ClientPendingEventsToProduce(new CompletableFuture<>(), - new CompletableFuture<>(), - new CompletableFuture<>() - ); - var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channel, n -> myPendingEventsToProduce); - if (pendingEventsToProduce instanceof ServerPendingEventsToProduce serverPendingEventsToProduce) { - if (serverPendingEventsToProduce.initCf().complete(null)) { - return serverPendingEventsToProduce.events() - .doOnError(serverPendingEventsToProduce.doneCf()::completeExceptionally) - .doFinally(s -> { - messagesToProduce.remove(channel, serverPendingEventsToProduce); - serverPendingEventsToProduce.doneCf().complete(null); - }); - } else { - LOG.error("The channel \"{}\" is already active", channel); - return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); - } - } else if (pendingEventsToProduce == myPendingEventsToProduce) { - if (myPendingEventsToProduce.initCf().complete(null)) { - return Mono - .fromFuture(myPendingEventsToProduce::fluxCf) - .flatMapMany(flux -> flux) - .doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally) - .doFinally(s -> myPendingEventsToProduce.doneCf().complete(null)); - } else { - LOG.error("The channel \"{}\" is already active", channel); - return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); - } - } else { - LOG.error("The channel \"{}\" is already active", channel); - return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); - } - } else { - LOG.warn("Received invalid request stream"); - return Flux.empty(); + disposeRequest.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + var c = lastClient.get(); + if (c != null) { + c.dispose(); } } + } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java index 14a44d5..fbc6c0a 100644 --- a/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/MyRSocketServer.java @@ -1,7 +1,6 @@ package it.tdlight.reactiveapi.rsocket; import com.google.common.net.HostAndPort; -import io.rsocket.Closeable; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -15,174 +14,113 @@ import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.Timestamped; -import it.tdlight.reactiveapi.rsocket.MyRSocketServer.PendingEventsToConsume.ClientPendingEventsToConsume; -import it.tdlight.reactiveapi.rsocket.MyRSocketServer.PendingEventsToConsume.ServerPendingEventsToConsume; -import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ClientPendingEventsToProduce; -import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ServerPendingEventsToProduce; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Function; -import java.util.logging.Level; +import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.utils.CopyOnWriteMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; -public class MyRSocketServer implements Closeable, RSocketChannelManager, SocketAcceptor { +public class MyRSocketServer implements RSocketChannelManager, RSocket { - private static final Logger LOG = LogManager.getLogger(MyRSocketServer.class); + private final Logger logger = LogManager.getLogger(this.getClass()); - private final Mono server; - private final Map> consumers = new CopyOnWriteMap<>(); - private final Map> producers = new CopyOnWriteMap<>(); + private final Mono serverCloseable; - private final ConcurrentMap messagesToConsume - = new ConcurrentHashMap<>(); + protected final Map> consumerRegistry = new ConcurrentHashMap<>(); - sealed interface PendingEventsToConsume { - record ClientPendingEventsToConsume(Flux doneCf, - CompletableFuture initCf) implements PendingEventsToConsume {} - record ServerPendingEventsToConsume(CompletableFuture> doneCf, - CompletableFuture initCf) implements PendingEventsToConsume {} - } - - private final ConcurrentMap messagesToProduce = new ConcurrentHashMap<>(); + protected final Map> producerRegistry = new ConcurrentHashMap<>(); public MyRSocketServer(HostAndPort baseHost) { - this.server = RSocketServer - .create(this) + var serverMono = RSocketServer + .create(SocketAcceptor.with(this)) .payloadDecoder(PayloadDecoder.ZERO_COPY) .bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort())) + .doOnNext(d -> logger.debug("Server up")) .cache(); + + serverMono.subscribeOn(Schedulers.parallel()).subscribe(v -> {}, ex -> logger.warn("Failed to bind server")); + + this.serverCloseable = serverMono; } @Override - public EventConsumer registerConsumer(ChannelCodec channelCodec, String channelName) { - LOG.debug("Registering consumer for channel \"{}\"", channelName); - var consumer = new EventConsumer() { - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; + public @NotNull Flux requestChannel(@NotNull Publisher payloads) { + return Flux.from(payloads).switchOnFirst((first, flux) -> { + if (first.isOnNext()) { + var firstValue = first.get(); + assert firstValue != null; + var meta = firstValue.getMetadataUtf8(); + if (!meta.equals("channel")) { + return Mono.error(new CancelledChannelException("Metadata is wrong")); + } + var channel = firstValue.getDataUtf8(); + var conn = MyRSocketServer.this.consumerRegistry.computeIfAbsent(channel, ConsumerConnection::new); + conn.registerRemote(flux.skip(1)); + return conn.connectRemote().then(Mono.fromSupplier(() -> DefaultPayload.create("ok", "result"))); + } else { + return flux.take(1, true); } + }); + } - @Override - public String getChannelName() { - return channelName; - } + @Override + public @NotNull Flux requestStream(@NotNull Payload payload) { + var channel = payload.getDataUtf8(); + return Flux.defer(() -> { + var conn = MyRSocketServer.this.producerRegistry.computeIfAbsent(channel, ProducerConnection::new); + conn.registerRemote(); + return conn.connectRemote(); + }); + } + @Override + public final EventConsumer registerConsumer(ChannelCodec channelCodec, String channelName) { + logger.debug("Registering consumer for channel \"{}\"", channelName); + Deserializer deserializer; + try { + deserializer = channelCodec.getNewDeserializer(); + } catch (Throwable ex) { + logger.error("Failed to create codec for channel \"{}\"", channelName, ex); + throw new IllegalStateException("Failed to create codec for channel " + channelName); + } + return new EventConsumer() { @Override public Flux> consumeMessages() { - Deserializer deserializer; - try { - deserializer = channelCodec.getNewDeserializer(); - } catch (Throwable ex) { - LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); - return Flux.error(new IllegalStateException("Failed to create codec for channel " + channelName)); - } return Flux.defer(() -> { - var myPendingEventsToConsume = new ServerPendingEventsToConsume(new CompletableFuture<>(), new CompletableFuture<>()); - var pendingEventsToConsume = messagesToConsume.computeIfAbsent(channelName, n -> myPendingEventsToConsume); - if (pendingEventsToConsume instanceof ClientPendingEventsToConsume clientPendingEventsToConsume) { - if (clientPendingEventsToConsume.initCf.complete(null)) { - return server.thenMany(clientPendingEventsToConsume.doneCf - .map(elementPayload -> { - var slice = elementPayload.sliceData(); - byte[] elementBytes = new byte[slice.readableBytes()]; - slice.readBytes(elementBytes, 0, elementBytes.length); - return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes)); - }) - .log("SERVER_CONSUME_MESSAGES", Level.FINE) - .doFinally(s -> messagesToConsume.remove(channelName, clientPendingEventsToConsume))); - } else { - LOG.error("Channel is already consuming"); - return Mono.error(new CancelledChannelException("Channel is already consuming")); - } - } else if (pendingEventsToConsume == myPendingEventsToConsume) { - return server.thenMany(Mono - .fromFuture(myPendingEventsToConsume::doneCf) - .flatMapMany(Function.identity()) - .map(elementPayload -> { - var slice = elementPayload.sliceData(); - byte[] elementBytes = new byte[slice.readableBytes()]; - slice.readBytes(elementBytes, 0, elementBytes.length); - return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes)); - }) - .doFinally(s -> messagesToConsume.remove(channelName, myPendingEventsToConsume))); - } else { - LOG.error("Channel is already consuming"); - return Mono.error(new CancelledChannelException("Channel is already consuming")); - } + //noinspection unchecked + var conn = (ConsumerConnection) consumerRegistry.computeIfAbsent(channelName, ConsumerConnection::new); + conn.registerLocal(deserializer); + return conn.connectLocal(); }); } }; - var prev = this.consumers.put(channelName, consumer); - if (prev != null) { - LOG.error("Consumer \"{}\" was already registered", channelName); - } - return consumer; } @Override public EventProducer registerProducer(ChannelCodec channelCodec, String channelName) { - LOG.debug("Registering producer for channel \"{}\"", channelName); + logger.debug("Registering producer for channel \"{}\"", channelName); Serializer serializer; try { serializer = channelCodec.getNewSerializer(); } catch (Throwable ex) { - LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); - throw new UnsupportedOperationException("Failed to create codec for channel \"" + channelName + "\"", ex); + logger.error("Failed to create codec for channel \"{}\"", channelName, ex); + throw new IllegalStateException("Failed to create codec for channel " + channelName); } - var producer = new EventProducer() { - @Override - public ChannelCodec getChannelCodec() { - return channelCodec; - } - - @Override - public String getChannelName() { - return channelName; - } - + return new EventProducer() { @Override public Mono sendMessages(Flux eventsFlux) { - var serverCloseEvent = server - .flatMap(CloseableChannel::onClose) - .doOnSuccess(s -> - LOG.debug("Channel \"{}\" messages send flux will end because the server is closed", channelName)); return Mono.defer(() -> { - var rawFlux = eventsFlux - .log("SERVER_PRODUCE_MESSAGES", Level.FINE) - .map(element -> DefaultPayload.create(serializer.serialize(null, element))); - final ServerPendingEventsToProduce myPendingEventsToProduce = new ServerPendingEventsToProduce(rawFlux, new CompletableFuture<>(), new CompletableFuture<>()); - var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channelName, n -> myPendingEventsToProduce); - if (pendingEventsToProduce instanceof ClientPendingEventsToProduce clientPendingEventsToProduce) { - if (clientPendingEventsToProduce.fluxCf().complete(rawFlux)) { - return Mono - .firstWithSignal(Mono.fromFuture(clientPendingEventsToProduce::doneCf), serverCloseEvent) - .doFinally(s -> { - messagesToProduce.remove(channelName, clientPendingEventsToProduce); - clientPendingEventsToProduce.doneCf().complete(null); - }); - } else { - LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); - return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); - } - } else if (pendingEventsToProduce == myPendingEventsToProduce) { - return Mono.firstWithSignal(Mono.fromFuture(myPendingEventsToProduce::doneCf), serverCloseEvent) - .doFinally(s -> { - messagesToProduce.remove(channelName, myPendingEventsToProduce); - myPendingEventsToProduce.doneCf().complete(null); - }); - } else { - LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec); - return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\"")); - } + //noinspection unchecked + var conn = (ProducerConnection) producerRegistry.computeIfAbsent(channelName, ProducerConnection::new); + conn.registerLocal(eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer))); + return conn.connectLocal(); }); } @@ -191,124 +129,18 @@ public class MyRSocketServer implements Closeable, RSocketChannelManager, Socket } }; - var prev = this.producers.put(channelName, producer); - if (prev != null) { - LOG.error("Producer \"{}\" was already registered", channelName); - prev.close(); - } - return producer; } @Override public @NotNull Mono onClose() { - return server.flatMap(CloseableChannel::onClose); + return Mono.when(serverCloseable.flatMap(CloseableChannel::onClose)); } @Override public void dispose() { - server.doOnNext(CloseableChannel::dispose).subscribe(n -> {}, ex -> LOG.error("Failed to dispose the server", ex)); - } - - @Override - public @NotNull Mono accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) { - if (!setup.getMetadataUtf8().equals("setup-info") || !setup.getDataUtf8().equals("client")) { - LOG.warn("Invalid setup metadata!"); - return Mono.just(new RSocket() {}); - } - return Mono.just(new RSocket() { - @Override - public @NotNull Flux requestStream(@NotNull Payload payload) { - return MyRSocketServer.this.requestStream(sendingSocket, payload); - } - - @Override - public @NotNull Mono requestResponse(@NotNull Payload payload) { - return MyRSocketServer.this.requestResponse(sendingSocket, payload); - } - }); - } - - private Mono requestResponse(RSocket sendingSocket, Payload payload) { - if (payload.getMetadataUtf8().equals("notify-can-produce")) { - var channel = payload.getDataUtf8(); - var consumer = consumers.get(channel); - if (consumer != null) { - var rawFlux = sendingSocket.requestStream(DefaultPayload.create(channel, "notify-can-consume")); - var myNewPendingEventsToConsume = new ClientPendingEventsToConsume(rawFlux, new CompletableFuture<>()); - var pendingEventsToConsume = messagesToConsume.computeIfAbsent(channel, n -> myNewPendingEventsToConsume); - LOG.debug("Received request for channel \"{}\", requesting stream to client", channel); - if (pendingEventsToConsume instanceof ServerPendingEventsToConsume serverPendingEventsToConsume) { - //messagesToConsume.remove(channel, pendingEventsToConsume); - if (!serverPendingEventsToConsume.doneCf.complete(rawFlux)) { - LOG.error("The server is already producing to channel \"{}\", the request will be rejected", channel); - return Mono.error(new IllegalStateException("The server is already producing to channel \"" + channel + "\"")); - } - return Mono.just(DefaultPayload.create("ok", "response")); - } else if (pendingEventsToConsume == myNewPendingEventsToConsume) { - //messagesToConsume.remove(channel, pendingEventsToConsume); - return Mono - .fromFuture(myNewPendingEventsToConsume::initCf) - .thenReturn(DefaultPayload.create("ok", "response")); - } else { - LOG.warn("Received request for channel \"{}\", but the channel is already active", channel); - return Mono.error(new IllegalStateException("Channel " + channel + " is already active")); - } - } else { - LOG.warn("Received request for channel \"{}\", but no channel with that name is registered", channel); - return Mono.error(new IllegalStateException("Channel " + channel + " does not exist, or it has not been registered")); - } - } else { - LOG.warn("Received invalid request"); - return Mono.error(new UnsupportedOperationException("Invalid request")); - } - } - - @NotNull - private Flux requestStream(RSocket sendingSocket, Payload payload) { - if (payload.getMetadataUtf8().equals("notify-can-consume")) { - var channel = payload.getDataUtf8(); - var producer = producers.get(channel); - if (producer != null) { - final ClientPendingEventsToProduce myPendingEventsToProduce = new ClientPendingEventsToProduce(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>()); - var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channel, n -> myPendingEventsToProduce); - if (pendingEventsToProduce instanceof ServerPendingEventsToProduce serverPendingEventsToProduce) { - if (serverPendingEventsToProduce.initCf().complete(null)) { - return serverPendingEventsToProduce.events() - .doOnError(serverPendingEventsToProduce.doneCf()::completeExceptionally) - .doFinally(s -> { - messagesToProduce.remove(channel, serverPendingEventsToProduce); - serverPendingEventsToProduce.doneCf().complete(null); - }); - } else { - LOG.error("The channel \"{}\" is already active", channel); - return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); - } - } else if (pendingEventsToProduce == myPendingEventsToProduce) { - if (myPendingEventsToProduce.initCf().complete(null)) { - return Mono - .fromFuture(myPendingEventsToProduce::fluxCf) - .flatMapMany(flux -> flux) - .doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally) - .doFinally(s -> { - messagesToProduce.remove(channel, myPendingEventsToProduce); - myPendingEventsToProduce.doneCf().complete(null); - }); - - } else { - LOG.error("The channel \"{}\" is already active", channel); - return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); - } - } else { - LOG.error("The channel \"{}\" is already active", channel); - return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active")); - } - } else { - LOG.warn("No producer registered for channel \"{}\"", channel); - return Flux.error(new CancelledChannelException("No producer registered for channel \"" + channel + "\"")); - } - } else { - LOG.warn("Received invalid request stream"); - return Flux.error(new CancelledChannelException("Received invalid request stream")); - } + serverCloseable + .doOnNext(CloseableChannel::dispose) + .subscribeOn(Schedulers.parallel()) + .subscribe(v -> {}, ex -> logger.error("Failed to dispose the server", ex)); } } diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java new file mode 100644 index 0000000..7c48312 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/ProducerConnection.java @@ -0,0 +1,81 @@ +package it.tdlight.reactiveapi.rsocket; + +import io.rsocket.Payload; +import java.time.Duration; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.EmitFailureHandler; +import reactor.core.publisher.Sinks.Empty; +import reactor.core.scheduler.Schedulers; + +class ProducerConnection { + + private Object remote; + + private Flux local; + + private Empty connected = Sinks.empty(); + + private Empty remoteResult = Sinks.empty(); + + public ProducerConnection(String channel) { + + } + + public synchronized Mono connectLocal() { + return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> { + synchronized (ProducerConnection.this) { + return remoteResult.asMono().doFinally(r -> reset()); + } + })).doOnError(ex -> { + synchronized (ProducerConnection.this) { + remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + }).doFinally(ended -> reset()); + } + + public synchronized Flux connectRemote() { + return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> { + synchronized (ProducerConnection.this) { + return local; + } + }).doOnError(ex -> { + synchronized (ProducerConnection.this) { + remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + })).doFinally(ended -> reset()); + } + + public synchronized void reset() { + if (local != null && remote != null) { + remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + local = null; + remote = null; + connected = Sinks.empty(); + remoteResult = Sinks.empty(); + } + } + + public synchronized void registerRemote() { + if (this.remote != null) { + throw new IllegalStateException("Remote is already registered"); + } + this.remote = new Object(); + onChanged(); + } + + public synchronized void registerLocal(Flux local) { + if (this.local != null) { + throw new IllegalStateException("Local is already registered"); + } + this.local = local; + onChanged(); + } + + private synchronized void onChanged() { + if (local != null && remote != null) { + connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100))); + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java new file mode 100644 index 0000000..51d200d --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/rsocket/RSocketUtils.java @@ -0,0 +1,23 @@ +package it.tdlight.reactiveapi.rsocket; + +import io.rsocket.Payload; +import io.rsocket.util.DefaultPayload; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import reactor.core.publisher.Flux; + +public class RSocketUtils { + + public static Flux deserialize(Flux payloadFlux, Deserializer deserializer) { + return payloadFlux.map(payload -> { + var slice = payload.sliceData(); + byte[] bytes = new byte[slice.readableBytes()]; + slice.readBytes(bytes, 0, bytes.length); + return deserializer.deserialize(null, bytes); + }); + } + + public static Flux serialize(Flux flux, Serializer serializer) { + return flux.map(element -> DefaultPayload.create(serializer.serialize(null, element))); + } +} diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java index 4ed7342..240c4af 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java +++ b/src/test/java/it/tdlight/reactiveapi/test/TestChannel.java @@ -18,6 +18,8 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.function.Function; import java.util.logging.Level; import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -30,6 +32,8 @@ import reactor.util.retry.Retry; public abstract class TestChannel { + private static final Logger LOG = LogManager.getLogger(TestChannel.class); + protected ChannelFactory channelFactory; protected IntArrayList data; protected ConcurrentLinkedDeque closeables = new ConcurrentLinkedDeque<>(); @@ -40,8 +44,8 @@ public abstract class TestChannel { @BeforeEach public void beforeEach() { - var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), "localhost:25689", List.of())); - var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), "localhost:25689", List.of())); + var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), "127.0.0.1:25689", List.of())); + var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), "127.0.0.1:25689", List.of())); closeables.offer(consumerFactory); closeables.offer(producerFactory); @@ -61,11 +65,13 @@ public abstract class TestChannel { @AfterEach public void afterEach() { + System.out.println("Cleaning up..."); + producer.close(); while (!closeables.isEmpty()) { var c = closeables.poll(); try { c.close(); - } catch (IOException e) { + } catch (Throwable e) { e.printStackTrace(); } } @@ -102,7 +108,7 @@ public abstract class TestChannel { .timeout(Duration.ofSeconds(5)); var response = Flux .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) - .blockLast(); + .blockLast(Duration.ofSeconds(5)); Assertions.assertEquals(response, data); System.out.println(response); } @@ -111,15 +117,17 @@ public abstract class TestChannel { public void testSimple() { Mono eventProducer = producer .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) - .then(Mono.empty()); + .doOnSuccess(s -> LOG.warn("Done producer")) + .then(Mono.empty()); Mono eventConsumer = consumer .consumeMessages() .map(Timestamped::data) .map(Integer::parseUnsignedInt) - .collect(Collectors.toCollection(IntArrayList::new)); + .collect(Collectors.toCollection(IntArrayList::new)) + .doOnSuccess(s -> LOG.warn("Done consumer")); var response = Flux .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) - .blockLast(); + .blockLast(Duration.ofSeconds(5)); Assertions.assertEquals(response, data); System.out.println(response); } @@ -128,16 +136,18 @@ public abstract class TestChannel { public void testInvertedSubscription() { Mono eventProducer = producer .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) + .doOnSuccess(s -> LOG.warn("Done producer")) .then(Mono.empty()); Mono eventConsumer = consumer .consumeMessages() .map(Timestamped::data) .map(Integer::parseUnsignedInt) - .collect(Collectors.toCollection(IntArrayList::new)); + .collect(Collectors.toCollection(IntArrayList::new)) + .doOnSuccess(s -> LOG.warn("Done consumer")); var response = Flux .merge(isConsumerClient() ? List.of(eventConsumer, eventProducer.delaySubscription(Duration.ofSeconds(1))) : List.of(eventProducer, eventConsumer.delaySubscription(Duration.ofSeconds(1)))) - .blockLast(); + .blockLast(Duration.ofSeconds(60)); Assertions.assertEquals(response, data); System.out.println(response); } @@ -157,7 +167,7 @@ public abstract class TestChannel { .collect(Collectors.toCollection(IntArrayList::new)); Assertions.assertThrows(Exception.class, () -> Flux .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) - .blockLast()); + .blockLast(Duration.ofSeconds(5))); } @Test @@ -172,7 +182,7 @@ public abstract class TestChannel { .collect(Collectors.toCollection(IntArrayList::new)); var data = Flux .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) - .blockLast(); + .blockLast(Duration.ofSeconds(5)); Assertions.assertNotNull(data); Assertions.assertTrue(data.isEmpty()); } @@ -188,7 +198,7 @@ public abstract class TestChannel { .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) - .blockLast(); + .blockLast(Duration.ofSeconds(5)); Assertions.assertEquals(response, data); System.out.println(response); } @@ -204,7 +214,7 @@ public abstract class TestChannel { .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) - .blockLast(); + .blockLast(Duration.ofSeconds(5)); data.removeElements(50, 100); Assertions.assertEquals(response, data); System.out.println(response); @@ -223,7 +233,7 @@ public abstract class TestChannel { .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux .merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer)) - .blockLast(); + .blockLast(Duration.ofSeconds(12)); Assertions.assertEquals(response, data); System.out.println(response); } @@ -242,7 +252,7 @@ public abstract class TestChannel { .collect(Collectors.toCollection(IntArrayList::new)); var response = Flux .merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer)) - .blockLast(); + .blockLast(Duration.ofSeconds(12)); Assertions.assertEquals(response, data); System.out.println(response); } @@ -343,7 +353,9 @@ public abstract class TestChannel { producer .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) .block(); - data.removeInt(data.size() - 1); + if (numbers.size() < data.size()) { + data.removeInt(data.size() - 1); + } Assertions.assertEquals(data, List.copyOf(numbers)); } finally { eventConsumer.dispose(); @@ -372,13 +384,15 @@ public abstract class TestChannel { throw new FakeException(); } }).map(Integer::toUnsignedString)) - .block(); + .block(Duration.ofSeconds(5)); }); producer .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) - .block(); - data.removeInt(data.size() - 1); + .block(Duration.ofSeconds(5)); data.removeInt(10); + if (numbers.size() < data.size()) { + data.removeInt(data.size() - 1); + } Assertions.assertEquals(data, List.copyOf(numbers)); } finally { eventConsumer.dispose(); @@ -401,7 +415,7 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .take(10, true) .log("consumer-1", Level.INFO) - .blockLast(); + .blockLast(Duration.ofSeconds(5)); Thread.sleep(4000); @@ -411,7 +425,7 @@ public abstract class TestChannel { .map(Timestamped::data) .map(Integer::parseUnsignedInt) .log("consumer-2", Level.INFO) - .blockLast(); + .blockLast(Duration.ofSeconds(5)); } finally { eventProducer.dispose(); } @@ -426,7 +440,7 @@ public abstract class TestChannel { .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) .subscribe(n -> {}, ex -> Assertions.fail(ex)); - Assertions.assertThrows(CancelledChannelException.class, () -> { + Assertions.assertThrows(IllegalStateException.class, () -> { try { Mono .when(consumer @@ -443,7 +457,7 @@ public abstract class TestChannel { .map(Integer::parseUnsignedInt) .log("consumer-2", Level.INFO) .onErrorResume(io.rsocket.exceptions.ApplicationErrorException.class, - ex -> Mono.error(new CancelledChannelException(ex)) + ex -> Mono.error(new IllegalStateException(ex)) ) ) .block(); @@ -470,11 +484,11 @@ public abstract class TestChannel { .take(10, true) .log("producer-1", Level.INFO) .map(Integer::toUnsignedString)) - .block(); + .block(Duration.ofSeconds(5)); Thread.sleep(4000); producer .sendMessages(dataFlux.log("producer-2", Level.INFO).map(Integer::toUnsignedString)) - .block(); + .block(Duration.ofSeconds(5)); } finally { eventConsumer.dispose(); } diff --git a/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java b/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java new file mode 100644 index 0000000..2d4731b --- /dev/null +++ b/src/test/java/it/tdlight/reactiveapi/test/TestRSocket.java @@ -0,0 +1,184 @@ +package it.tdlight.reactiveapi.test; + +import com.google.common.collect.Collections2; +import com.google.common.net.HostAndPort; +import io.rsocket.Payload; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import it.tdlight.reactiveapi.ChannelCodec; +import it.tdlight.reactiveapi.EventConsumer; +import it.tdlight.reactiveapi.EventProducer; +import it.tdlight.reactiveapi.Timestamped; +import it.tdlight.reactiveapi.rsocket.MyRSocketClient; +import it.tdlight.reactiveapi.rsocket.MyRSocketServer; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class TestRSocket { + + @Test + public void testClientOnClose() { + Assertions.assertThrows(IllegalStateException.class, () -> { + var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085)); + try { + client.onClose().block(Duration.ofSeconds(1)); + } finally { + client.dispose(); + } + }); + } + + @Test + public void testServerConsumer() { + var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085)); + try { + var rawClient = RSocketConnector.create() + .setupPayload(DefaultPayload.create("client", "setup-info")) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .connect(TcpClientTransport.create("127.0.0.1", 8085)) + .block(Duration.ofSeconds(5)); + Assertions.assertNotNull(rawClient); + var outputSequence = Flux.just("a", "b", "c").map(DefaultPayload::create); + var disposable = rawClient + .requestChannel(Flux.just(DefaultPayload.create("test", "channel")).concatWith(outputSequence)) + .subscribeOn(Schedulers.parallel()) + .subscribe(v -> {}, ex -> Assertions.fail(ex)); + try { + EventConsumer consumer = server.registerConsumer(ChannelCodec.UTF8_TEST, "test"); + var events = consumer.consumeMessages().collectList().block(Duration.ofSeconds(5)); + Assertions.assertNotNull(events); + var mappedEvents = List.copyOf(Collections2.transform(events, Timestamped::data)); + Assertions.assertEquals(List.of("a", "b", "c"), mappedEvents); + } finally { + disposable.dispose(); + } + } finally { + server.dispose(); + } + } + + @Test + public void testServerProducer() { + var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085)); + try { + var rawClient = RSocketConnector.create() + .setupPayload(DefaultPayload.create("client", "setup-info")) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .connect(TcpClientTransport.create("127.0.0.1", 8085)) + .block(Duration.ofSeconds(5)); + Assertions.assertNotNull(rawClient); + EventProducer producer = server.registerProducer(ChannelCodec.UTF8_TEST, "test"); + var disposable = producer + .sendMessages(Flux.just("a", "b", "c")) + .subscribeOn(Schedulers.parallel()) + .subscribe(v -> {}, ex -> Assertions.fail(ex)); + try { + var events = rawClient + .requestStream(DefaultPayload.create("test", "channel")) + .map(Payload::getDataUtf8) + .collectList() + .block(); + Assertions.assertNotNull(events); + Assertions.assertEquals(List.of("a", "b", "c"), events); + } finally { + disposable.dispose(); + } + } finally { + server.dispose(); + } + } + + @Test + public void testClientConsumer() { + var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085)); + try { + var rawServer = RSocketServer.create(SocketAcceptor.forRequestStream(payload -> { + var metadata = payload.getMetadataUtf8(); + Assertions.assertEquals("channel", metadata); + var data = payload.getDataUtf8(); + Assertions.assertEquals("test", data); + return Flux.just("a", "b", "c").map(DefaultPayload::create); + })) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bindNow(TcpServerTransport.create("127.0.0.1", 8085)); + try { + var events = client + .registerConsumer(ChannelCodec.UTF8_TEST, "test") + .consumeMessages() + .map(Timestamped::data) + .collectList() + .block(Duration.ofSeconds(5)); + Assertions.assertNotNull(events); + Assertions.assertEquals(List.of("a", "b", "c"), events); + } finally { + rawServer.dispose(); + } + } finally { + client.dispose(); + } + } + + @Test + public void testClientProducer() { + AtomicBoolean received = new AtomicBoolean(); + var rawServer = RSocketServer + .create(SocketAcceptor.forRequestChannel(payloads -> Flux.from(payloads).switchOnFirst((first, flux) -> { + if (first.isOnNext()) { + var payload = first.get(); + Assertions.assertNotNull(payload); + var metadata = payload.getMetadataUtf8(); + Assertions.assertEquals("channel", metadata); + var data = payload.getDataUtf8(); + Assertions.assertEquals("test", data); + return flux.skip(1).map(Payload::getDataUtf8).collectList().doOnSuccess(val -> { + received.set(true); + Assertions.assertEquals(List.of("a", "b", "c"), val); + }).then(Mono.just(DefaultPayload.create("ok", "response"))); + } else { + return flux.take(1, true); + } + }))) + .payloadDecoder(PayloadDecoder.ZERO_COPY) + .bindNow(TcpServerTransport.create("127.0.0.1", 8085)); + try { + var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085)); + try { + client + .registerProducer(ChannelCodec.UTF8_TEST, "test") + .sendMessages(Flux.just("a", "b", "c")) + .block(Duration.ofMinutes(1)); + Assertions.assertTrue(received.get()); + } finally { + client.dispose(); + } + } finally { + rawServer.dispose(); + } + } + + @Test + public void testServerOnClose() { + Assertions.assertThrows(IllegalStateException.class, () -> { + var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085)); + try { + server.onClose().block(Duration.ofSeconds(1)); + } finally { + server.dispose(); + } + }); + } +} diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java index 98c9820..936e638 100644 --- a/src/test/java/module-info.java +++ b/src/test/java/module-info.java @@ -11,4 +11,12 @@ module tdlib.reactive.api.test { requires kafka.clients; requires java.logging; requires rsocket.core; + requires org.junit.platform.engine; + requires org.junit.platform.commons; + requires org.junit.jupiter.engine; + requires reactor.tools; + requires rsocket.transport.netty; + requires reactor.test; + requires reactor.netty.core; + requires org.apache.logging.log4j; } \ No newline at end of file diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 0b9f56b..bf120f9 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -15,7 +15,7 @@ - +