Fix sockets

This commit is contained in:
Andrea Cavalli 2022-10-06 19:06:35 +02:00
parent 5dc543f090
commit 0a74e1ab1a
17 changed files with 600 additions and 454 deletions

View File

@ -81,6 +81,11 @@
<scope>runtime</scope> <scope>runtime</scope>
<version>3.4.23</version> <version>3.4.23</version>
</dependency> </dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency> <dependency>
<groupId>org.jetbrains</groupId> <groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId> <artifactId>annotations</artifactId>

View File

@ -5,9 +5,5 @@ import reactor.core.publisher.Flux;
public interface EventConsumer<K> { public interface EventConsumer<K> {
ChannelCodec getChannelCodec();
String getChannelName();
Flux<Timestamped<K>> consumeMessages(); Flux<Timestamped<K>> consumeMessages();
} }

View File

@ -5,10 +5,6 @@ import reactor.core.publisher.Mono;
public interface EventProducer<K> { public interface EventProducer<K> {
ChannelCodec getChannelCodec();
String getChannelName();
Mono<Void> sendMessages(Flux<K> eventsFlux); Mono<Void> sendMessages(Flux<K> eventsFlux);
void close(); void close();

View File

@ -0,0 +1,19 @@
package it.tdlight.reactiveapi;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class FutureEventConsumer<K> implements EventConsumer<K> {
private final Mono<EventConsumer<K>> future;
public FutureEventConsumer(Mono<EventConsumer<K>> future) {
this.future = future.cache();
}
@Override
public Flux<Timestamped<K>> consumeMessages() {
return future.flatMapMany(EventConsumer::consumeMessages);
}
}

View File

@ -0,0 +1,25 @@
package it.tdlight.reactiveapi;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class FutureEventProducer<K> implements EventProducer<K> {
private final Mono<EventProducer<K>> future;
public FutureEventProducer(Mono<EventProducer<K>> future) {
this.future = future.cache();
}
@Override
public Mono<Void> sendMessages(Flux<K> eventsFlux) {
return future.flatMap(ep -> ep.sendMessages(eventsFlux));
}
@Override
public void close() {
future.doOnNext(EventProducer::close).subscribeOn(Schedulers.parallel()).subscribe();
}
}

View File

@ -0,0 +1,25 @@
package it.tdlight.reactiveapi;
import java.time.Duration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Empty;
public abstract class SimpleEventProducer<K> implements EventProducer<K> {
private final Empty<Void> closeRequest = Sinks.empty();
@Override
public final Mono<Void> sendMessages(Flux<K> eventsFlux) {
return handleSendMessages(eventsFlux.takeUntilOther(closeRequest.asMono()));
}
public abstract Mono<Void> handleSendMessages(Flux<K> eventsFlux);
@Override
public final void close() {
closeRequest.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
}

View File

@ -85,12 +85,10 @@ public final class KafkaConsumer<K> implements EventConsumer<K> {
return quickResponse; return quickResponse;
} }
@Override
public ChannelCodec getChannelCodec() { public ChannelCodec getChannelCodec() {
return channelCodec; return channelCodec;
} }
@Override
public String getChannelName() { public String getChannelName() {
return channelName; return channelName;
} }

View File

@ -43,12 +43,10 @@ public final class KafkaProducer<K> implements EventProducer<K> {
sender = KafkaSender.create(senderOptions.maxInFlight(1024)); sender = KafkaSender.create(senderOptions.maxInFlight(1024));
} }
@Override
public ChannelCodec getChannelCodec() { public ChannelCodec getChannelCodec() {
return channelCodec; return channelCodec;
} }
@Override
public String getChannelName() { public String getChannelName() {
return channelName; return channelName;
} }

View File

@ -0,0 +1,90 @@
package it.tdlight.reactiveapi.rsocket;
import io.rsocket.Payload;
import it.tdlight.reactiveapi.Timestamped;
import java.time.Duration;
import org.apache.kafka.common.serialization.Deserializer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Schedulers;
class ConsumerConnection<T> {
private Flux<Timestamped<T>> remote;
private Deserializer<T> local;
private Empty<Void> connected = Sinks.empty();
private Empty<Void> remoteResult = Sinks.empty();
public ConsumerConnection(String channel) {
}
public synchronized Flux<Timestamped<T>> connectLocal() {
return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> {
synchronized (ConsumerConnection.this) {
return remote;
}
}));
}
public synchronized Mono<Void> connectRemote() {
return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> {
synchronized (ConsumerConnection.this) {
return remoteResult.asMono();
}
}));
}
public synchronized void resetRemote() {
connected = Sinks.empty();
remoteResult = Sinks.empty();
remote = null;
local = null;
}
public synchronized void registerRemote(Flux<Payload> remote) {
if (this.remote != null) {
throw new IllegalStateException("Remote is already registered");
}
this.remote = remote
.transformDeferred(flux -> {
synchronized (ConsumerConnection.this) {
assert local != null;
return RSocketUtils.deserialize(flux, local);
}
})
.map(element -> new Timestamped<>(System.currentTimeMillis(), element))
.doOnError(ex -> {
synchronized (ConsumerConnection.this) {
remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
})
.doFinally(s -> {
synchronized (ConsumerConnection.this) {
remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
resetRemote();
}
});
onChanged();
}
public synchronized void registerLocal(Deserializer<T> local) {
if (this.local != null) {
throw new IllegalStateException("Local is already registered");
}
this.local = local;
onChanged();
}
private synchronized void onChanged() {
if (local != null && remote != null) {
connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
}
}

View File

@ -1,7 +1,6 @@
package it.tdlight.reactiveapi.rsocket; package it.tdlight.reactiveapi.rsocket;
import com.google.common.net.HostAndPort; import com.google.common.net.HostAndPort;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload; import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload; import io.rsocket.Payload;
import io.rsocket.RSocket; import io.rsocket.RSocket;
@ -9,235 +8,88 @@ import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector; import io.rsocket.core.RSocketConnector;
import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.util.DefaultPayload; import io.rsocket.util.DefaultPayload;
import it.tdlight.reactiveapi.ChannelCodec; import it.tdlight.reactiveapi.ChannelCodec;
import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventConsumer;
import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.EventProducer;
import it.tdlight.reactiveapi.SimpleEventProducer;
import it.tdlight.reactiveapi.Timestamped; import it.tdlight.reactiveapi.Timestamped;
import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ClientPendingEventsToProduce;
import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ServerPendingEventsToProduce;
import java.time.Duration; import java.time.Duration;
import java.util.concurrent.CompletableFuture; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.logging.Level;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import reactor.core.Disposable;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks; import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Empty; import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.UnicastProcessor;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry; import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec; import reactor.util.retry.RetryBackoffSpec;
public class MyRSocketClient implements Closeable, RSocketChannelManager, SocketAcceptor { public class MyRSocketClient implements RSocketChannelManager {
private static final Logger LOG = LogManager.getLogger(MyRSocketClient.class); private final Mono<RSocket> nextClient;
private final AtomicReference<RSocket> lastClient = new AtomicReference<>();
private final Empty<Void> closeRequest = Sinks.empty(); private final Empty<Void> disposeRequest = Sinks.empty();
private final AtomicReference<Closeable> clientRef = new AtomicReference<>();
private final Mono<RSocket> client;
private final ConcurrentMap<String, PendingEventsToProduce> messagesToProduce = new ConcurrentHashMap<>();
public MyRSocketClient(HostAndPort baseHost) { public MyRSocketClient(HostAndPort baseHost) {
RetryBackoffSpec retryStrategy = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(16)).jitter(1.0); RetryBackoffSpec retryStrategy = Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)).maxBackoff(Duration.ofSeconds(16)).jitter(1.0);
var transport = TcpClientTransport.create(baseHost.getHost(), baseHost.getPort()); var transport = TcpClientTransport.create(baseHost.getHost(), baseHost.getPort());
this.client = RSocketConnector.create() this.nextClient = RSocketConnector.create()
.setupPayload(DefaultPayload.create("client", "setup-info")) //.setupPayload(DefaultPayload.create("client", "setup-info"))
.payloadDecoder(PayloadDecoder.ZERO_COPY) .payloadDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(this) .reconnect(retryStrategy)
.connect(transport) .connect(transport)
.retryWhen(retryStrategy) .doOnNext(lastClient::set)
.doOnNext(clientRef::set) .cache();
.cacheInvalidateIf(RSocket::isDisposed)
.takeUntilOther(closeRequest.asMono())
.doOnDiscard(RSocket.class, RSocket::dispose);
} }
@Override @Override
public <K> EventConsumer<K> registerConsumer(ChannelCodec channelCodec, String channelName) { public <K> EventConsumer<K> registerConsumer(ChannelCodec channelCodec, String channelName) {
LOG.debug("Registering consumer for channel \"{}\"", channelName); Deserializer<K> deserializer = channelCodec.getNewDeserializer();
Deserializer<K> deserializer;
try {
deserializer = channelCodec.getNewDeserializer();
} catch (Throwable ex) {
LOG.error("Failed to create codec for channel \"{}\"", channelName, ex);
throw new IllegalStateException("Failed to create codec for channel \"" + channelName + "\"", ex);
}
return new EventConsumer<K>() { return new EventConsumer<K>() {
@Override
public ChannelCodec getChannelCodec() {
return channelCodec;
}
@Override
public String getChannelName() {
return channelName;
}
@Override @Override
public Flux<Timestamped<K>> consumeMessages() { public Flux<Timestamped<K>> consumeMessages() {
return client.flatMapMany(client -> { return nextClient.flatMapMany(client -> client
var rawFlux = client.requestStream(DefaultPayload.create(channelName, "notify-can-consume")); .requestStream(DefaultPayload.create(channelName, "channel"))
return rawFlux.map(elementPayload -> { .transform(flux -> RSocketUtils.deserialize(flux, deserializer))
var slice = elementPayload.sliceData(); .map(event -> new Timestamped<>(System.currentTimeMillis(), event)));
byte[] elementBytes = new byte[slice.readableBytes()];
slice.readBytes(elementBytes, 0, elementBytes.length);
return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes));
}).log("CLIENT_CONSUME_MESSAGES", Level.FINE);
});
} }
}; };
} }
@Override @Override
public <K> EventProducer<K> registerProducer(ChannelCodec channelCodec, String channelName) { public <K> EventProducer<K> registerProducer(ChannelCodec channelCodec, String channelName) {
LOG.debug("Registering producer for channel \"{}\"", channelName); Serializer<K> serializer = channelCodec.getNewSerializer();
Serializer<K> serializer; return new SimpleEventProducer<K>() {
try {
serializer = channelCodec.getNewSerializer();
} catch (Throwable ex) {
LOG.error("Failed to create codec for channel \"{}\"", channelName, ex);
throw new IllegalStateException("Failed to create codec for channel \"" + channelName + "\"", ex);
}
Empty<Void> emitCloseRequest = Sinks.empty();
return new EventProducer<K>() {
@Override
public ChannelCodec getChannelCodec() {
return channelCodec;
}
@Override @Override
public String getChannelName() { public Mono<Void> handleSendMessages(Flux<K> eventsFlux) {
return channelName; Flux<Payload> rawFlux = eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer));
Flux<Payload> combinedRawFlux = Flux.just(DefaultPayload.create(channelName, "channel")).concatWith(rawFlux);
return nextClient.flatMapMany(client -> client.requestChannel(combinedRawFlux)).take(1, true).then();
} }
@Override
public Mono<Void> sendMessages(Flux<K> eventsFlux) {
return client.flatMap(client -> {
LOG.debug("Subscribed to channel \"{}\", sending messages to server", channelName);
var rawFlux = eventsFlux
.map(event -> DefaultPayload.create(serializer.serialize(null, event)))
.log("CLIENT_PRODUCE_MESSAGES", Level.FINE)
.takeUntilOther(emitCloseRequest.asMono().doFinally(s -> LOG.debug("Producer of channel \"{}\" ended the flux because emit close request ended with signal {}", channelName, s)))
.doFinally(s -> LOG.debug("Producer of channel \"{}\" ended the flux with signal {}", channelName, s))
.doOnError(ex -> LOG.error("Producer of channel \"{}\" ended the flux with an error", channelName, ex));
final ServerPendingEventsToProduce myPendingEventsToProduce = new ServerPendingEventsToProduce(rawFlux, new CompletableFuture<>(), new CompletableFuture<>());
var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channelName, n -> myPendingEventsToProduce);
if (pendingEventsToProduce instanceof ClientPendingEventsToProduce clientPendingEventsToProduce) {
if (!clientPendingEventsToProduce.fluxCf().complete(rawFlux)) {
LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec);
return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\""));
}
return Mono
.firstWithSignal(client.onClose(), Mono.fromFuture(clientPendingEventsToProduce::doneCf))
.doOnError(clientPendingEventsToProduce.doneCf()::completeExceptionally)
.doFinally(s -> {
messagesToProduce.remove(channelName, clientPendingEventsToProduce);
clientPendingEventsToProduce.doneCf().complete(null);
LOG.debug("Producer of channel \"{}\" ended the execution with signal {}", channelName, s);
});
} else if (pendingEventsToProduce == myPendingEventsToProduce) {
return client
.requestResponse(DefaultPayload.create(channelName, "notify-can-produce"))
.then(Mono.firstWithSignal(client.onClose(), Mono.fromFuture(myPendingEventsToProduce::doneCf)))
.doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally)
.doFinally(s -> {
messagesToProduce.remove(channelName, myPendingEventsToProduce);
myPendingEventsToProduce.doneCf().complete(null);
LOG.debug("Producer of channel \"{}\" ended the execution with signal {}", channelName, s);
});
} else {
LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec);
return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\""));
}
});
}
@Override
public void close() {
emitCloseRequest.tryEmitEmpty();
}
}; };
} }
@Override @Override
public @NotNull Mono<Void> onClose() { public Mono<Void> onClose() {
return closeRequest.asMono().then(Mono.fromSupplier(clientRef::get).flatMap(Closeable::onClose)); return disposeRequest.asMono();
} }
@Override @Override
public void dispose() { public void dispose() {
var client = clientRef.get(); disposeRequest.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (client != null) { var c = lastClient.get();
client.dispose(); if (c != null) {
} c.dispose();
closeRequest.tryEmitEmpty();
}
@Override
public @NotNull Mono<RSocket> accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) {
return Mono.just(new RSocket() {
@Override
public @NotNull Flux<Payload> requestStream(@NotNull Payload payload) {
return MyRSocketClient.this.requestStream(sendingSocket, payload);
}
});
}
@NotNull
private Flux<Payload> requestStream(RSocket sendingSocket, Payload payload) {
if (payload.getMetadataUtf8().equals("notify-can-consume")) {
var channel = payload.getDataUtf8();
LOG.debug("Received request for channel \"{}\", sending stream to server", channel);
final ClientPendingEventsToProduce myPendingEventsToProduce = new ClientPendingEventsToProduce(new CompletableFuture<>(),
new CompletableFuture<>(),
new CompletableFuture<>()
);
var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channel, n -> myPendingEventsToProduce);
if (pendingEventsToProduce instanceof ServerPendingEventsToProduce serverPendingEventsToProduce) {
if (serverPendingEventsToProduce.initCf().complete(null)) {
return serverPendingEventsToProduce.events()
.doOnError(serverPendingEventsToProduce.doneCf()::completeExceptionally)
.doFinally(s -> {
messagesToProduce.remove(channel, serverPendingEventsToProduce);
serverPendingEventsToProduce.doneCf().complete(null);
});
} else {
LOG.error("The channel \"{}\" is already active", channel);
return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active"));
}
} else if (pendingEventsToProduce == myPendingEventsToProduce) {
if (myPendingEventsToProduce.initCf().complete(null)) {
return Mono
.fromFuture(myPendingEventsToProduce::fluxCf)
.flatMapMany(flux -> flux)
.doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally)
.doFinally(s -> myPendingEventsToProduce.doneCf().complete(null));
} else {
LOG.error("The channel \"{}\" is already active", channel);
return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active"));
}
} else {
LOG.error("The channel \"{}\" is already active", channel);
return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active"));
}
} else {
LOG.warn("Received invalid request stream");
return Flux.empty();
} }
} }
} }

View File

@ -1,7 +1,6 @@
package it.tdlight.reactiveapi.rsocket; package it.tdlight.reactiveapi.rsocket;
import com.google.common.net.HostAndPort; import com.google.common.net.HostAndPort;
import io.rsocket.Closeable;
import io.rsocket.ConnectionSetupPayload; import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload; import io.rsocket.Payload;
import io.rsocket.RSocket; import io.rsocket.RSocket;
@ -15,174 +14,113 @@ import it.tdlight.reactiveapi.ChannelCodec;
import it.tdlight.reactiveapi.EventConsumer; import it.tdlight.reactiveapi.EventConsumer;
import it.tdlight.reactiveapi.EventProducer; import it.tdlight.reactiveapi.EventProducer;
import it.tdlight.reactiveapi.Timestamped; import it.tdlight.reactiveapi.Timestamped;
import it.tdlight.reactiveapi.rsocket.MyRSocketServer.PendingEventsToConsume.ClientPendingEventsToConsume;
import it.tdlight.reactiveapi.rsocket.MyRSocketServer.PendingEventsToConsume.ServerPendingEventsToConsume;
import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ClientPendingEventsToProduce;
import it.tdlight.reactiveapi.rsocket.PendingEventsToProduce.ServerPendingEventsToProduce;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.logging.Level;
import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class MyRSocketServer implements Closeable, RSocketChannelManager, SocketAcceptor { public class MyRSocketServer implements RSocketChannelManager, RSocket {
private static final Logger LOG = LogManager.getLogger(MyRSocketServer.class); private final Logger logger = LogManager.getLogger(this.getClass());
private final Mono<CloseableChannel> server; private final Mono<CloseableChannel> serverCloseable;
private final Map<String, EventConsumer<?>> consumers = new CopyOnWriteMap<>();
private final Map<String, EventProducer<?>> producers = new CopyOnWriteMap<>();
private final ConcurrentMap<String, PendingEventsToConsume> messagesToConsume protected final Map<String, ConsumerConnection<?>> consumerRegistry = new ConcurrentHashMap<>();
= new ConcurrentHashMap<>();
sealed interface PendingEventsToConsume { protected final Map<String, ProducerConnection<?>> producerRegistry = new ConcurrentHashMap<>();
record ClientPendingEventsToConsume(Flux<Payload> doneCf,
CompletableFuture<Void> initCf) implements PendingEventsToConsume {}
record ServerPendingEventsToConsume(CompletableFuture<Flux<Payload>> doneCf,
CompletableFuture<Void> initCf) implements PendingEventsToConsume {}
}
private final ConcurrentMap<String, PendingEventsToProduce> messagesToProduce = new ConcurrentHashMap<>();
public MyRSocketServer(HostAndPort baseHost) { public MyRSocketServer(HostAndPort baseHost) {
this.server = RSocketServer var serverMono = RSocketServer
.create(this) .create(SocketAcceptor.with(this))
.payloadDecoder(PayloadDecoder.ZERO_COPY) .payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort())) .bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort()))
.doOnNext(d -> logger.debug("Server up"))
.cache(); .cache();
serverMono.subscribeOn(Schedulers.parallel()).subscribe(v -> {}, ex -> logger.warn("Failed to bind server"));
this.serverCloseable = serverMono;
} }
@Override @Override
public <K> EventConsumer<K> registerConsumer(ChannelCodec channelCodec, String channelName) { public @NotNull Flux<Payload> requestChannel(@NotNull Publisher<Payload> payloads) {
LOG.debug("Registering consumer for channel \"{}\"", channelName); return Flux.from(payloads).switchOnFirst((first, flux) -> {
var consumer = new EventConsumer<K>() { if (first.isOnNext()) {
@Override var firstValue = first.get();
public ChannelCodec getChannelCodec() { assert firstValue != null;
return channelCodec; var meta = firstValue.getMetadataUtf8();
if (!meta.equals("channel")) {
return Mono.error(new CancelledChannelException("Metadata is wrong"));
}
var channel = firstValue.getDataUtf8();
var conn = MyRSocketServer.this.consumerRegistry.computeIfAbsent(channel, ConsumerConnection::new);
conn.registerRemote(flux.skip(1));
return conn.connectRemote().then(Mono.fromSupplier(() -> DefaultPayload.create("ok", "result")));
} else {
return flux.take(1, true);
} }
});
}
@Override @Override
public String getChannelName() { public @NotNull Flux<Payload> requestStream(@NotNull Payload payload) {
return channelName; var channel = payload.getDataUtf8();
} return Flux.defer(() -> {
var conn = MyRSocketServer.this.producerRegistry.computeIfAbsent(channel, ProducerConnection::new);
conn.registerRemote();
return conn.connectRemote();
});
}
@Override
public final <K> EventConsumer<K> registerConsumer(ChannelCodec channelCodec, String channelName) {
logger.debug("Registering consumer for channel \"{}\"", channelName);
Deserializer<K> deserializer;
try {
deserializer = channelCodec.getNewDeserializer();
} catch (Throwable ex) {
logger.error("Failed to create codec for channel \"{}\"", channelName, ex);
throw new IllegalStateException("Failed to create codec for channel " + channelName);
}
return new EventConsumer<K>() {
@Override @Override
public Flux<Timestamped<K>> consumeMessages() { public Flux<Timestamped<K>> consumeMessages() {
Deserializer<K> deserializer;
try {
deserializer = channelCodec.getNewDeserializer();
} catch (Throwable ex) {
LOG.error("Failed to create codec for channel \"{}\"", channelName, ex);
return Flux.error(new IllegalStateException("Failed to create codec for channel " + channelName));
}
return Flux.defer(() -> { return Flux.defer(() -> {
var myPendingEventsToConsume = new ServerPendingEventsToConsume(new CompletableFuture<>(), new CompletableFuture<>()); //noinspection unchecked
var pendingEventsToConsume = messagesToConsume.computeIfAbsent(channelName, n -> myPendingEventsToConsume); var conn = (ConsumerConnection<K>) consumerRegistry.computeIfAbsent(channelName, ConsumerConnection::new);
if (pendingEventsToConsume instanceof ClientPendingEventsToConsume clientPendingEventsToConsume) { conn.registerLocal(deserializer);
if (clientPendingEventsToConsume.initCf.complete(null)) { return conn.connectLocal();
return server.thenMany(clientPendingEventsToConsume.doneCf
.map(elementPayload -> {
var slice = elementPayload.sliceData();
byte[] elementBytes = new byte[slice.readableBytes()];
slice.readBytes(elementBytes, 0, elementBytes.length);
return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes));
})
.log("SERVER_CONSUME_MESSAGES", Level.FINE)
.doFinally(s -> messagesToConsume.remove(channelName, clientPendingEventsToConsume)));
} else {
LOG.error("Channel is already consuming");
return Mono.error(new CancelledChannelException("Channel is already consuming"));
}
} else if (pendingEventsToConsume == myPendingEventsToConsume) {
return server.thenMany(Mono
.fromFuture(myPendingEventsToConsume::doneCf)
.flatMapMany(Function.identity())
.map(elementPayload -> {
var slice = elementPayload.sliceData();
byte[] elementBytes = new byte[slice.readableBytes()];
slice.readBytes(elementBytes, 0, elementBytes.length);
return new Timestamped<>(System.currentTimeMillis(), deserializer.deserialize(null, elementBytes));
})
.doFinally(s -> messagesToConsume.remove(channelName, myPendingEventsToConsume)));
} else {
LOG.error("Channel is already consuming");
return Mono.error(new CancelledChannelException("Channel is already consuming"));
}
}); });
} }
}; };
var prev = this.consumers.put(channelName, consumer);
if (prev != null) {
LOG.error("Consumer \"{}\" was already registered", channelName);
}
return consumer;
} }
@Override @Override
public <K> EventProducer<K> registerProducer(ChannelCodec channelCodec, String channelName) { public <K> EventProducer<K> registerProducer(ChannelCodec channelCodec, String channelName) {
LOG.debug("Registering producer for channel \"{}\"", channelName); logger.debug("Registering producer for channel \"{}\"", channelName);
Serializer<K> serializer; Serializer<K> serializer;
try { try {
serializer = channelCodec.getNewSerializer(); serializer = channelCodec.getNewSerializer();
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.error("Failed to create codec for channel \"{}\"", channelName, ex); logger.error("Failed to create codec for channel \"{}\"", channelName, ex);
throw new UnsupportedOperationException("Failed to create codec for channel \"" + channelName + "\"", ex); throw new IllegalStateException("Failed to create codec for channel " + channelName);
} }
var producer = new EventProducer<K>() { return new EventProducer<K>() {
@Override
public ChannelCodec getChannelCodec() {
return channelCodec;
}
@Override
public String getChannelName() {
return channelName;
}
@Override @Override
public Mono<Void> sendMessages(Flux<K> eventsFlux) { public Mono<Void> sendMessages(Flux<K> eventsFlux) {
var serverCloseEvent = server
.flatMap(CloseableChannel::onClose)
.doOnSuccess(s ->
LOG.debug("Channel \"{}\" messages send flux will end because the server is closed", channelName));
return Mono.defer(() -> { return Mono.defer(() -> {
var rawFlux = eventsFlux //noinspection unchecked
.log("SERVER_PRODUCE_MESSAGES", Level.FINE) var conn = (ProducerConnection<K>) producerRegistry.computeIfAbsent(channelName, ProducerConnection::new);
.map(element -> DefaultPayload.create(serializer.serialize(null, element))); conn.registerLocal(eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer)));
final ServerPendingEventsToProduce myPendingEventsToProduce = new ServerPendingEventsToProduce(rawFlux, new CompletableFuture<>(), new CompletableFuture<>()); return conn.connectLocal();
var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channelName, n -> myPendingEventsToProduce);
if (pendingEventsToProduce instanceof ClientPendingEventsToProduce clientPendingEventsToProduce) {
if (clientPendingEventsToProduce.fluxCf().complete(rawFlux)) {
return Mono
.firstWithSignal(Mono.fromFuture(clientPendingEventsToProduce::doneCf), serverCloseEvent)
.doFinally(s -> {
messagesToProduce.remove(channelName, clientPendingEventsToProduce);
clientPendingEventsToProduce.doneCf().complete(null);
});
} else {
LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec);
return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\""));
}
} else if (pendingEventsToProduce == myPendingEventsToProduce) {
return Mono.firstWithSignal(Mono.fromFuture(myPendingEventsToProduce::doneCf), serverCloseEvent)
.doFinally(s -> {
messagesToProduce.remove(channelName, myPendingEventsToProduce);
myPendingEventsToProduce.doneCf().complete(null);
});
} else {
LOG.error("Called sendMessage twice for channel \"{}\"", channelCodec);
return Mono.error(new CancelledChannelException("Called sendMessage twice for channel \"" + channelName + "\""));
}
}); });
} }
@ -191,124 +129,18 @@ public class MyRSocketServer implements Closeable, RSocketChannelManager, Socket
} }
}; };
var prev = this.producers.put(channelName, producer);
if (prev != null) {
LOG.error("Producer \"{}\" was already registered", channelName);
prev.close();
}
return producer;
} }
@Override @Override
public @NotNull Mono<Void> onClose() { public @NotNull Mono<Void> onClose() {
return server.flatMap(CloseableChannel::onClose); return Mono.when(serverCloseable.flatMap(CloseableChannel::onClose));
} }
@Override @Override
public void dispose() { public void dispose() {
server.doOnNext(CloseableChannel::dispose).subscribe(n -> {}, ex -> LOG.error("Failed to dispose the server", ex)); serverCloseable
} .doOnNext(CloseableChannel::dispose)
.subscribeOn(Schedulers.parallel())
@Override .subscribe(v -> {}, ex -> logger.error("Failed to dispose the server", ex));
public @NotNull Mono<RSocket> accept(@NotNull ConnectionSetupPayload setup, @NotNull RSocket sendingSocket) {
if (!setup.getMetadataUtf8().equals("setup-info") || !setup.getDataUtf8().equals("client")) {
LOG.warn("Invalid setup metadata!");
return Mono.just(new RSocket() {});
}
return Mono.just(new RSocket() {
@Override
public @NotNull Flux<Payload> requestStream(@NotNull Payload payload) {
return MyRSocketServer.this.requestStream(sendingSocket, payload);
}
@Override
public @NotNull Mono<Payload> requestResponse(@NotNull Payload payload) {
return MyRSocketServer.this.requestResponse(sendingSocket, payload);
}
});
}
private Mono<Payload> requestResponse(RSocket sendingSocket, Payload payload) {
if (payload.getMetadataUtf8().equals("notify-can-produce")) {
var channel = payload.getDataUtf8();
var consumer = consumers.get(channel);
if (consumer != null) {
var rawFlux = sendingSocket.requestStream(DefaultPayload.create(channel, "notify-can-consume"));
var myNewPendingEventsToConsume = new ClientPendingEventsToConsume(rawFlux, new CompletableFuture<>());
var pendingEventsToConsume = messagesToConsume.computeIfAbsent(channel, n -> myNewPendingEventsToConsume);
LOG.debug("Received request for channel \"{}\", requesting stream to client", channel);
if (pendingEventsToConsume instanceof ServerPendingEventsToConsume serverPendingEventsToConsume) {
//messagesToConsume.remove(channel, pendingEventsToConsume);
if (!serverPendingEventsToConsume.doneCf.complete(rawFlux)) {
LOG.error("The server is already producing to channel \"{}\", the request will be rejected", channel);
return Mono.error(new IllegalStateException("The server is already producing to channel \"" + channel + "\""));
}
return Mono.just(DefaultPayload.create("ok", "response"));
} else if (pendingEventsToConsume == myNewPendingEventsToConsume) {
//messagesToConsume.remove(channel, pendingEventsToConsume);
return Mono
.fromFuture(myNewPendingEventsToConsume::initCf)
.thenReturn(DefaultPayload.create("ok", "response"));
} else {
LOG.warn("Received request for channel \"{}\", but the channel is already active", channel);
return Mono.error(new IllegalStateException("Channel " + channel + " is already active"));
}
} else {
LOG.warn("Received request for channel \"{}\", but no channel with that name is registered", channel);
return Mono.error(new IllegalStateException("Channel " + channel + " does not exist, or it has not been registered"));
}
} else {
LOG.warn("Received invalid request");
return Mono.error(new UnsupportedOperationException("Invalid request"));
}
}
@NotNull
private <T> Flux<Payload> requestStream(RSocket sendingSocket, Payload payload) {
if (payload.getMetadataUtf8().equals("notify-can-consume")) {
var channel = payload.getDataUtf8();
var producer = producers.get(channel);
if (producer != null) {
final ClientPendingEventsToProduce myPendingEventsToProduce = new ClientPendingEventsToProduce(new CompletableFuture<>(), new CompletableFuture<>(), new CompletableFuture<>());
var pendingEventsToProduce = messagesToProduce.computeIfAbsent(channel, n -> myPendingEventsToProduce);
if (pendingEventsToProduce instanceof ServerPendingEventsToProduce serverPendingEventsToProduce) {
if (serverPendingEventsToProduce.initCf().complete(null)) {
return serverPendingEventsToProduce.events()
.doOnError(serverPendingEventsToProduce.doneCf()::completeExceptionally)
.doFinally(s -> {
messagesToProduce.remove(channel, serverPendingEventsToProduce);
serverPendingEventsToProduce.doneCf().complete(null);
});
} else {
LOG.error("The channel \"{}\" is already active", channel);
return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active"));
}
} else if (pendingEventsToProduce == myPendingEventsToProduce) {
if (myPendingEventsToProduce.initCf().complete(null)) {
return Mono
.fromFuture(myPendingEventsToProduce::fluxCf)
.flatMapMany(flux -> flux)
.doOnError(myPendingEventsToProduce.doneCf()::completeExceptionally)
.doFinally(s -> {
messagesToProduce.remove(channel, myPendingEventsToProduce);
myPendingEventsToProduce.doneCf().complete(null);
});
} else {
LOG.error("The channel \"{}\" is already active", channel);
return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active"));
}
} else {
LOG.error("The channel \"{}\" is already active", channel);
return Flux.error(new CancelledChannelException("The channel \"" + channel + "\" is already active"));
}
} else {
LOG.warn("No producer registered for channel \"{}\"", channel);
return Flux.error(new CancelledChannelException("No producer registered for channel \"" + channel + "\""));
}
} else {
LOG.warn("Received invalid request stream");
return Flux.error(new CancelledChannelException("Received invalid request stream"));
}
} }
} }

View File

@ -0,0 +1,81 @@
package it.tdlight.reactiveapi.rsocket;
import io.rsocket.Payload;
import java.time.Duration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Schedulers;
class ProducerConnection<T> {
private Object remote;
private Flux<Payload> local;
private Empty<Void> connected = Sinks.empty();
private Empty<Void> remoteResult = Sinks.empty();
public ProducerConnection(String channel) {
}
public synchronized Mono<Void> connectLocal() {
return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> {
synchronized (ProducerConnection.this) {
return remoteResult.asMono().doFinally(r -> reset());
}
})).doOnError(ex -> {
synchronized (ProducerConnection.this) {
remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
}).doFinally(ended -> reset());
}
public synchronized Flux<Payload> connectRemote() {
return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> {
synchronized (ProducerConnection.this) {
return local;
}
}).doOnError(ex -> {
synchronized (ProducerConnection.this) {
remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
})).doFinally(ended -> reset());
}
public synchronized void reset() {
if (local != null && remote != null) {
remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
local = null;
remote = null;
connected = Sinks.empty();
remoteResult = Sinks.empty();
}
}
public synchronized void registerRemote() {
if (this.remote != null) {
throw new IllegalStateException("Remote is already registered");
}
this.remote = new Object();
onChanged();
}
public synchronized void registerLocal(Flux<Payload> local) {
if (this.local != null) {
throw new IllegalStateException("Local is already registered");
}
this.local = local;
onChanged();
}
private synchronized void onChanged() {
if (local != null && remote != null) {
connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
}
}

View File

@ -0,0 +1,23 @@
package it.tdlight.reactiveapi.rsocket;
import io.rsocket.Payload;
import io.rsocket.util.DefaultPayload;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import reactor.core.publisher.Flux;
public class RSocketUtils {
public static <T> Flux<T> deserialize(Flux<Payload> payloadFlux, Deserializer<T> deserializer) {
return payloadFlux.map(payload -> {
var slice = payload.sliceData();
byte[] bytes = new byte[slice.readableBytes()];
slice.readBytes(bytes, 0, bytes.length);
return deserializer.deserialize(null, bytes);
});
}
public static <T> Flux<Payload> serialize(Flux<T> flux, Serializer<T> serializer) {
return flux.map(element -> DefaultPayload.create(serializer.serialize(null, element)));
}
}

View File

@ -18,6 +18,8 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Function; import java.util.function.Function;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
@ -30,6 +32,8 @@ import reactor.util.retry.Retry;
public abstract class TestChannel { public abstract class TestChannel {
private static final Logger LOG = LogManager.getLogger(TestChannel.class);
protected ChannelFactory channelFactory; protected ChannelFactory channelFactory;
protected IntArrayList data; protected IntArrayList data;
protected ConcurrentLinkedDeque<Closeable> closeables = new ConcurrentLinkedDeque<>(); protected ConcurrentLinkedDeque<Closeable> closeables = new ConcurrentLinkedDeque<>();
@ -40,8 +44,8 @@ public abstract class TestChannel {
@BeforeEach @BeforeEach
public void beforeEach() { public void beforeEach() {
var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), "localhost:25689", List.of())); var consumerFactory = new RSocketChannelFactory(new RSocketParameters(isConsumerClient(), "127.0.0.1:25689", List.of()));
var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), "localhost:25689", List.of())); var producerFactory = new RSocketChannelFactory(new RSocketParameters(!isConsumerClient(), "127.0.0.1:25689", List.of()));
closeables.offer(consumerFactory); closeables.offer(consumerFactory);
closeables.offer(producerFactory); closeables.offer(producerFactory);
@ -61,11 +65,13 @@ public abstract class TestChannel {
@AfterEach @AfterEach
public void afterEach() { public void afterEach() {
System.out.println("Cleaning up...");
producer.close();
while (!closeables.isEmpty()) { while (!closeables.isEmpty()) {
var c = closeables.poll(); var c = closeables.poll();
try { try {
c.close(); c.close();
} catch (IOException e) { } catch (Throwable e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@ -102,7 +108,7 @@ public abstract class TestChannel {
.timeout(Duration.ofSeconds(5)); .timeout(Duration.ofSeconds(5));
var response = Flux var response = Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast(); .blockLast(Duration.ofSeconds(5));
Assertions.assertEquals(response, data); Assertions.assertEquals(response, data);
System.out.println(response); System.out.println(response);
} }
@ -111,15 +117,17 @@ public abstract class TestChannel {
public void testSimple() { public void testSimple() {
Mono<IntArrayList> eventProducer = producer Mono<IntArrayList> eventProducer = producer
.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString))
.then(Mono.empty()); .doOnSuccess(s -> LOG.warn("Done producer"))
.then(Mono.<IntArrayList>empty());
Mono<IntArrayList> eventConsumer = consumer Mono<IntArrayList> eventConsumer = consumer
.consumeMessages() .consumeMessages()
.map(Timestamped::data) .map(Timestamped::data)
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new))
.doOnSuccess(s -> LOG.warn("Done consumer"));
var response = Flux var response = Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast(); .blockLast(Duration.ofSeconds(5));
Assertions.assertEquals(response, data); Assertions.assertEquals(response, data);
System.out.println(response); System.out.println(response);
} }
@ -128,16 +136,18 @@ public abstract class TestChannel {
public void testInvertedSubscription() { public void testInvertedSubscription() {
Mono<IntArrayList> eventProducer = producer Mono<IntArrayList> eventProducer = producer
.sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString)) .sendMessages(Flux.fromIterable(data).map(Integer::toUnsignedString))
.doOnSuccess(s -> LOG.warn("Done producer"))
.then(Mono.empty()); .then(Mono.empty());
Mono<IntArrayList> eventConsumer = consumer Mono<IntArrayList> eventConsumer = consumer
.consumeMessages() .consumeMessages()
.map(Timestamped::data) .map(Timestamped::data)
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new))
.doOnSuccess(s -> LOG.warn("Done consumer"));
var response = Flux var response = Flux
.merge(isConsumerClient() ? List.of(eventConsumer, eventProducer.delaySubscription(Duration.ofSeconds(1))) .merge(isConsumerClient() ? List.of(eventConsumer, eventProducer.delaySubscription(Duration.ofSeconds(1)))
: List.of(eventProducer, eventConsumer.delaySubscription(Duration.ofSeconds(1)))) : List.of(eventProducer, eventConsumer.delaySubscription(Duration.ofSeconds(1))))
.blockLast(); .blockLast(Duration.ofSeconds(60));
Assertions.assertEquals(response, data); Assertions.assertEquals(response, data);
System.out.println(response); System.out.println(response);
} }
@ -157,7 +167,7 @@ public abstract class TestChannel {
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new));
Assertions.assertThrows(Exception.class, () -> Flux Assertions.assertThrows(Exception.class, () -> Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast()); .blockLast(Duration.ofSeconds(5)));
} }
@Test @Test
@ -172,7 +182,7 @@ public abstract class TestChannel {
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new));
var data = Flux var data = Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast(); .blockLast(Duration.ofSeconds(5));
Assertions.assertNotNull(data); Assertions.assertNotNull(data);
Assertions.assertTrue(data.isEmpty()); Assertions.assertTrue(data.isEmpty());
} }
@ -188,7 +198,7 @@ public abstract class TestChannel {
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new));
var response = Flux var response = Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast(); .blockLast(Duration.ofSeconds(5));
Assertions.assertEquals(response, data); Assertions.assertEquals(response, data);
System.out.println(response); System.out.println(response);
} }
@ -204,7 +214,7 @@ public abstract class TestChannel {
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new));
var response = Flux var response = Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast(); .blockLast(Duration.ofSeconds(5));
data.removeElements(50, 100); data.removeElements(50, 100);
Assertions.assertEquals(response, data); Assertions.assertEquals(response, data);
System.out.println(response); System.out.println(response);
@ -223,7 +233,7 @@ public abstract class TestChannel {
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new));
var response = Flux var response = Flux
.merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer))
.blockLast(); .blockLast(Duration.ofSeconds(12));
Assertions.assertEquals(response, data); Assertions.assertEquals(response, data);
System.out.println(response); System.out.println(response);
} }
@ -242,7 +252,7 @@ public abstract class TestChannel {
.collect(Collectors.toCollection(IntArrayList::new)); .collect(Collectors.toCollection(IntArrayList::new));
var response = Flux var response = Flux
.merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer)) .merge(isConsumerClient() ? List.of(eventProducer, eventConsumer) : List.of(eventConsumer, eventProducer))
.blockLast(); .blockLast(Duration.ofSeconds(12));
Assertions.assertEquals(response, data); Assertions.assertEquals(response, data);
System.out.println(response); System.out.println(response);
} }
@ -343,7 +353,9 @@ public abstract class TestChannel {
producer producer
.sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString))
.block(); .block();
data.removeInt(data.size() - 1); if (numbers.size() < data.size()) {
data.removeInt(data.size() - 1);
}
Assertions.assertEquals(data, List.copyOf(numbers)); Assertions.assertEquals(data, List.copyOf(numbers));
} finally { } finally {
eventConsumer.dispose(); eventConsumer.dispose();
@ -372,13 +384,15 @@ public abstract class TestChannel {
throw new FakeException(); throw new FakeException();
} }
}).map(Integer::toUnsignedString)) }).map(Integer::toUnsignedString))
.block(); .block(Duration.ofSeconds(5));
}); });
producer producer
.sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString)) .sendMessages(dataFlux.limitRate(1).map(Integer::toUnsignedString))
.block(); .block(Duration.ofSeconds(5));
data.removeInt(data.size() - 1);
data.removeInt(10); data.removeInt(10);
if (numbers.size() < data.size()) {
data.removeInt(data.size() - 1);
}
Assertions.assertEquals(data, List.copyOf(numbers)); Assertions.assertEquals(data, List.copyOf(numbers));
} finally { } finally {
eventConsumer.dispose(); eventConsumer.dispose();
@ -401,7 +415,7 @@ public abstract class TestChannel {
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.take(10, true) .take(10, true)
.log("consumer-1", Level.INFO) .log("consumer-1", Level.INFO)
.blockLast(); .blockLast(Duration.ofSeconds(5));
Thread.sleep(4000); Thread.sleep(4000);
@ -411,7 +425,7 @@ public abstract class TestChannel {
.map(Timestamped::data) .map(Timestamped::data)
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.log("consumer-2", Level.INFO) .log("consumer-2", Level.INFO)
.blockLast(); .blockLast(Duration.ofSeconds(5));
} finally { } finally {
eventProducer.dispose(); eventProducer.dispose();
} }
@ -426,7 +440,7 @@ public abstract class TestChannel {
.retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1))) .retryWhen(Retry.fixedDelay(5, Duration.ofSeconds(1)))
.subscribe(n -> {}, ex -> Assertions.fail(ex)); .subscribe(n -> {}, ex -> Assertions.fail(ex));
Assertions.assertThrows(CancelledChannelException.class, () -> { Assertions.assertThrows(IllegalStateException.class, () -> {
try { try {
Mono Mono
.when(consumer .when(consumer
@ -443,7 +457,7 @@ public abstract class TestChannel {
.map(Integer::parseUnsignedInt) .map(Integer::parseUnsignedInt)
.log("consumer-2", Level.INFO) .log("consumer-2", Level.INFO)
.onErrorResume(io.rsocket.exceptions.ApplicationErrorException.class, .onErrorResume(io.rsocket.exceptions.ApplicationErrorException.class,
ex -> Mono.error(new CancelledChannelException(ex)) ex -> Mono.error(new IllegalStateException(ex))
) )
) )
.block(); .block();
@ -470,11 +484,11 @@ public abstract class TestChannel {
.take(10, true) .take(10, true)
.log("producer-1", Level.INFO) .log("producer-1", Level.INFO)
.map(Integer::toUnsignedString)) .map(Integer::toUnsignedString))
.block(); .block(Duration.ofSeconds(5));
Thread.sleep(4000); Thread.sleep(4000);
producer producer
.sendMessages(dataFlux.log("producer-2", Level.INFO).map(Integer::toUnsignedString)) .sendMessages(dataFlux.log("producer-2", Level.INFO).map(Integer::toUnsignedString))
.block(); .block(Duration.ofSeconds(5));
} finally { } finally {
eventConsumer.dispose(); eventConsumer.dispose();
} }

View File

@ -0,0 +1,184 @@
package it.tdlight.reactiveapi.test;
import com.google.common.collect.Collections2;
import com.google.common.net.HostAndPort;
import io.rsocket.Payload;
import io.rsocket.SocketAcceptor;
import io.rsocket.core.RSocketConnector;
import io.rsocket.core.RSocketServer;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import it.tdlight.reactiveapi.ChannelCodec;
import it.tdlight.reactiveapi.EventConsumer;
import it.tdlight.reactiveapi.EventProducer;
import it.tdlight.reactiveapi.Timestamped;
import it.tdlight.reactiveapi.rsocket.MyRSocketClient;
import it.tdlight.reactiveapi.rsocket.MyRSocketServer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class TestRSocket {
@Test
public void testClientOnClose() {
Assertions.assertThrows(IllegalStateException.class, () -> {
var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085));
try {
client.onClose().block(Duration.ofSeconds(1));
} finally {
client.dispose();
}
});
}
@Test
public void testServerConsumer() {
var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085));
try {
var rawClient = RSocketConnector.create()
.setupPayload(DefaultPayload.create("client", "setup-info"))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("127.0.0.1", 8085))
.block(Duration.ofSeconds(5));
Assertions.assertNotNull(rawClient);
var outputSequence = Flux.just("a", "b", "c").map(DefaultPayload::create);
var disposable = rawClient
.requestChannel(Flux.just(DefaultPayload.create("test", "channel")).concatWith(outputSequence))
.subscribeOn(Schedulers.parallel())
.subscribe(v -> {}, ex -> Assertions.fail(ex));
try {
EventConsumer<String> consumer = server.registerConsumer(ChannelCodec.UTF8_TEST, "test");
var events = consumer.consumeMessages().collectList().block(Duration.ofSeconds(5));
Assertions.assertNotNull(events);
var mappedEvents = List.copyOf(Collections2.transform(events, Timestamped::data));
Assertions.assertEquals(List.of("a", "b", "c"), mappedEvents);
} finally {
disposable.dispose();
}
} finally {
server.dispose();
}
}
@Test
public void testServerProducer() {
var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085));
try {
var rawClient = RSocketConnector.create()
.setupPayload(DefaultPayload.create("client", "setup-info"))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(TcpClientTransport.create("127.0.0.1", 8085))
.block(Duration.ofSeconds(5));
Assertions.assertNotNull(rawClient);
EventProducer<String> producer = server.registerProducer(ChannelCodec.UTF8_TEST, "test");
var disposable = producer
.sendMessages(Flux.just("a", "b", "c"))
.subscribeOn(Schedulers.parallel())
.subscribe(v -> {}, ex -> Assertions.fail(ex));
try {
var events = rawClient
.requestStream(DefaultPayload.create("test", "channel"))
.map(Payload::getDataUtf8)
.collectList()
.block();
Assertions.assertNotNull(events);
Assertions.assertEquals(List.of("a", "b", "c"), events);
} finally {
disposable.dispose();
}
} finally {
server.dispose();
}
}
@Test
public void testClientConsumer() {
var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085));
try {
var rawServer = RSocketServer.create(SocketAcceptor.forRequestStream(payload -> {
var metadata = payload.getMetadataUtf8();
Assertions.assertEquals("channel", metadata);
var data = payload.getDataUtf8();
Assertions.assertEquals("test", data);
return Flux.just("a", "b", "c").map(DefaultPayload::create);
}))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bindNow(TcpServerTransport.create("127.0.0.1", 8085));
try {
var events = client
.<String>registerConsumer(ChannelCodec.UTF8_TEST, "test")
.consumeMessages()
.map(Timestamped::data)
.collectList()
.block(Duration.ofSeconds(5));
Assertions.assertNotNull(events);
Assertions.assertEquals(List.of("a", "b", "c"), events);
} finally {
rawServer.dispose();
}
} finally {
client.dispose();
}
}
@Test
public void testClientProducer() {
AtomicBoolean received = new AtomicBoolean();
var rawServer = RSocketServer
.create(SocketAcceptor.forRequestChannel(payloads -> Flux.from(payloads).switchOnFirst((first, flux) -> {
if (first.isOnNext()) {
var payload = first.get();
Assertions.assertNotNull(payload);
var metadata = payload.getMetadataUtf8();
Assertions.assertEquals("channel", metadata);
var data = payload.getDataUtf8();
Assertions.assertEquals("test", data);
return flux.skip(1).map(Payload::getDataUtf8).collectList().doOnSuccess(val -> {
received.set(true);
Assertions.assertEquals(List.of("a", "b", "c"), val);
}).then(Mono.just(DefaultPayload.create("ok", "response")));
} else {
return flux.take(1, true);
}
})))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bindNow(TcpServerTransport.create("127.0.0.1", 8085));
try {
var client = new MyRSocketClient(HostAndPort.fromParts("127.0.0.1", 8085));
try {
client
.<String>registerProducer(ChannelCodec.UTF8_TEST, "test")
.sendMessages(Flux.just("a", "b", "c"))
.block(Duration.ofMinutes(1));
Assertions.assertTrue(received.get());
} finally {
client.dispose();
}
} finally {
rawServer.dispose();
}
}
@Test
public void testServerOnClose() {
Assertions.assertThrows(IllegalStateException.class, () -> {
var server = new MyRSocketServer(HostAndPort.fromParts("127.0.0.1", 8085));
try {
server.onClose().block(Duration.ofSeconds(1));
} finally {
server.dispose();
}
});
}
}

View File

@ -11,4 +11,12 @@ module tdlib.reactive.api.test {
requires kafka.clients; requires kafka.clients;
requires java.logging; requires java.logging;
requires rsocket.core; requires rsocket.core;
requires org.junit.platform.engine;
requires org.junit.platform.commons;
requires org.junit.jupiter.engine;
requires reactor.tools;
requires rsocket.transport.netty;
requires reactor.test;
requires reactor.netty.core;
requires org.apache.logging.log4j;
} }

View File

@ -15,7 +15,7 @@
<AsyncLogger name="org.apache.kafka" level="WARN" additivity="false"/> <AsyncLogger name="org.apache.kafka" level="WARN" additivity="false"/>
<!-- log only INFO, WARN, ERROR and FATAL logging by classes in this package --> <!-- log only INFO, WARN, ERROR and FATAL logging by classes in this package -->
<AsyncLogger name="io.netty" level="INFO" additivity="false"/> <AsyncLogger name="io.netty" level="INFO" additivity="false"/>
<AsyncLogger name="io.rsocket" level="INFO" additivity="false"/> <!--<AsyncLogger name="io.rsocket" level="INFO" additivity="false"/>-->
<AsyncRoot level="DEBUG"> <AsyncRoot level="DEBUG">
<filters> <filters>