Sockets are working

This commit is contained in:
Andrea Cavalli 2022-10-07 00:48:10 +02:00
parent 0a74e1ab1a
commit 705e5ca65e
13 changed files with 305 additions and 161 deletions

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.kafka.common.errors.SerializationException;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@ -46,7 +47,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
// Temporary id used to make requests
private final long clientId;
private final Many<OnRequest<?>> requests;
private final Consumer<OnRequest<?>> requests;
private final Map<Long, CompletableFuture<Timestamped<OnResponse<TdApi.Object>>>> responses
= new ConcurrentHashMap<>();
private final AtomicLong requestId = new AtomicLong(0);
@ -54,7 +55,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
public BaseAtomixReactiveApiClient(TdlibChannelsSharedReceive sharedTdlibClients) {
this.clientId = System.nanoTime();
this.requests = sharedTdlibClients.requests();
this.requests = sharedTdlibClients::emitRequest;
this.subscription = sharedTdlibClients.responses().doOnNext(response -> {
var responseSink = responses.get(response.data().requestId());
@ -63,7 +64,8 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
return;
}
responseSink.complete(response);
}).subscribeOn(Schedulers.parallel()).subscribe();
}).subscribeOn(Schedulers.parallel())
.subscribe(v -> {}, ex -> LOG.error("Reactive api client responses flux has failed unexpectedly!", ex));
}
@Override
@ -100,9 +102,7 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiMultiClient {
}
})
.doFinally(s -> this.responses.remove(requestId));
synchronized (requests) {
requests.emitNext(new Request<>(userId, clientId, requestId, request, timeout), EmitFailureHandler.FAIL_FAST);
}
requests.accept(new Request<>(userId, clientId, requestId, request, timeout));
return response;
});
}

View File

@ -1,19 +0,0 @@
package it.tdlight.reactiveapi;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class FutureEventConsumer<K> implements EventConsumer<K> {
private final Mono<EventConsumer<K>> future;
public FutureEventConsumer(Mono<EventConsumer<K>> future) {
this.future = future.cache();
}
@Override
public Flux<Timestamped<K>> consumeMessages() {
return future.flatMapMany(EventConsumer::consumeMessages);
}
}

View File

@ -1,25 +0,0 @@
package it.tdlight.reactiveapi;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class FutureEventProducer<K> implements EventProducer<K> {
private final Mono<EventProducer<K>> future;
public FutureEventProducer(Mono<EventProducer<K>> future) {
this.future = future.cache();
}
@Override
public Mono<Void> sendMessages(Flux<K> eventsFlux) {
return future.flatMap(ep -> ep.sendMessages(eventsFlux));
}
@Override
public void close() {
future.doOnNext(EventProducer::close).subscribeOn(Schedulers.parallel()).subscribe();
}
}

View File

@ -198,7 +198,7 @@ public abstract class ReactiveApiPublisher {
.then(Mono.empty())
)
.subscribeOn(Schedulers.parallel())
.subscribe();
.subscribe(v -> {}, ex -> LOG.error("Resulting events flux has failed unexpectedly! (1)", ex));
var messagesToSend = publishedResultingEvents
// Obtain only client-bound events
@ -229,7 +229,7 @@ public abstract class ReactiveApiPublisher {
} else {
LOG.error("Unknown cluster-bound event: {}", clusterBoundEvent);
}
});
}, ex -> LOG.error("Resulting events flux has failed unexpectedly! (2)", ex));
var prev = this.disposable.getAndSet(publishedResultingEvents.connect());

View File

@ -1,6 +1,9 @@
package it.tdlight.reactiveapi;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
@ -9,17 +12,20 @@ import reactor.core.publisher.Sinks.Empty;
public abstract class SimpleEventProducer<K> implements EventProducer<K> {
private final Empty<Void> closeRequest = Sinks.empty();
private AtomicReference<Subscription> closeRequest = new AtomicReference<>();
@Override
public final Mono<Void> sendMessages(Flux<K> eventsFlux) {
return handleSendMessages(eventsFlux.takeUntilOther(closeRequest.asMono()));
return handleSendMessages(eventsFlux).doOnSubscribe(s -> closeRequest.set(s));
}
public abstract Mono<Void> handleSendMessages(Flux<K> eventsFlux);
@Override
public final void close() {
closeRequest.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
var s = closeRequest.get();
if (s != null) {
s.cancel();
}
}
}

View File

@ -17,6 +17,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
@ -30,43 +31,48 @@ import reactor.util.retry.RetryBackoffSpec;
public class TdlibChannelsSharedHost implements Closeable {
private static final Logger LOG = LogManager.getLogger(TdlibChannelsSharedHost.class);
private static final RetryBackoffSpec RETRY_STRATEGY = Retry
public static final RetryBackoffSpec RETRY_STRATEGY = Retry
.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(16))
.jitter(1.0)
.doBeforeRetry(signal -> LOG.warn("Retrying channel with signal {}", signal));
.doBeforeRetry(signal -> LogManager.getLogger("Channels").warn("Retrying channel with signal {}", signal));
public static final Function<Flux<Long>, Flux<Long>> REPEAT_STRATEGY = n -> n
.doOnNext(i -> LogManager.getLogger("Channels").debug("Resubscribing to channel"))
.delayElements(Duration.ofSeconds(5));
private final TdlibChannelsServers tdServersChannels;
private final Disposable responsesSub;
private final AtomicReference<Disposable> requestsSub = new AtomicReference<>();
private final Many<OnResponse<TdApi.Object>> responses = Sinks.many().multicast().onBackpressureBuffer(65535);
private final Many<OnResponse<TdApi.Object>> responses = Sinks.many().multicast().directAllOrNothing();
private final Map<String, Many<Flux<ClientBoundEvent>>> events;
private final Flux<Timestamped<OnRequest<Object>>> requests;
public TdlibChannelsSharedHost(Set<String> allLanes, TdlibChannelsServers tdServersChannels) {
this.tdServersChannels = tdServersChannels;
this.responsesSub = tdServersChannels.response()
.sendMessages(responses.asFlux().log("responses", Level.FINEST, SignalType.ON_NEXT))
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(5)))
this.responsesSub = Mono.defer(() -> tdServersChannels.response()
.sendMessages(responses.asFlux().log("responses", Level.FINE)))
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending responses", ex));
events = allLanes.stream().collect(Collectors.toUnmodifiableMap(Function.identity(), lane -> {
Many<Flux<ClientBoundEvent>> sink = Sinks.many().multicast().onBackpressureBuffer(65535);
var outputEventsFlux = Flux
.merge(sink.asFlux().map(flux -> flux.subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE)
.merge(sink.asFlux().cache().map(flux -> flux.publish().autoConnect().subscribeOn(Schedulers.parallel())), Integer.MAX_VALUE)
.doFinally(s -> LOG.debug("Output events flux of lane \"{}\" terminated with signal {}", lane, s));
tdServersChannels
Mono.defer(() -> tdServersChannels
.events(lane)
.sendMessages(outputEventsFlux)
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(5)))
.sendMessages(outputEventsFlux))
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> LOG.error("Unexpected error when sending events to lane {}", lane, ex));
return sink;
}));
this.requests = tdServersChannels.request().consumeMessages()
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(5)))
.doFinally(s -> LOG.debug("Input requests consumer terminated with signal {}", s))
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.doOnError(ex -> LOG.error("Unexpected error when receiving requests", ex))
.doFinally(s -> LOG.debug("Input requests flux terminated with signal {}", s));

View File

@ -1,5 +1,8 @@
package it.tdlight.reactiveapi;
import static it.tdlight.reactiveapi.TdlibChannelsSharedHost.REPEAT_STRATEGY;
import static it.tdlight.reactiveapi.TdlibChannelsSharedHost.RETRY_STRATEGY;
import it.tdlight.jni.TdApi.Object;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnRequest;
@ -9,6 +12,7 @@ import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -27,45 +31,46 @@ public class TdlibChannelsSharedReceive implements Closeable {
private static final Logger LOG = LogManager.getLogger(TdlibChannelsSharedReceive.class);
private static final RetryBackoffSpec RETRY_STRATEGY = Retry
.backoff(Long.MAX_VALUE, Duration.ofSeconds(1))
.maxBackoff(Duration.ofSeconds(16))
.jitter(1.0)
.doBeforeRetry(signal -> LOG.warn("Retrying channel with signal {}", signal));
private final TdlibChannelsClients tdClientsChannels;
private final AtomicReference<Disposable> responsesSub = new AtomicReference<>();
private final Disposable requestsSub;
private final AtomicReference<Disposable> eventsSub = new AtomicReference<>();
private final Flux<Timestamped<OnResponse<Object>>> responses;
private final Map<String, Flux<Timestamped<ClientBoundEvent>>> events;
private final Many<OnRequest<?>> requests = Sinks.many().multicast().onBackpressureBuffer(65535);
private final Many<OnRequest<?>> requests = Sinks.many().multicast().directAllOrNothing();
public TdlibChannelsSharedReceive(TdlibChannelsClients tdClientsChannels) {
this.tdClientsChannels = tdClientsChannels;
this.responses = tdClientsChannels
.response()
.consumeMessages()
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(5)))
this.responses = Flux
.defer(() -> tdClientsChannels.response().consumeMessages())
.log("responses", Level.FINE)
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.publish()
.autoConnect()
.doFinally(s -> LOG.debug("Input responses flux terminated with signal {}", s));
this.events = tdClientsChannels.events().entrySet().stream()
.collect(Collectors.toUnmodifiableMap(Entry::getKey,
e -> e
.getValue()
.consumeMessages()
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(5)))
e -> Flux
.defer(() -> e.getValue().consumeMessages())
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.doFinally(s -> LOG.debug("Input events flux of lane \"{}\" terminated with signal {}", e.getKey(), s))
));
var requestsFlux = Flux.defer(() -> requests.asFlux()
.doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s)));
this.requestsSub = tdClientsChannels.request()
.sendMessages(requestsFlux)
.repeatWhen(n -> n.delayElements(Duration.ofSeconds(5)))
this.requestsSub = tdClientsChannels
.request()
.sendMessages(Flux
.defer(() -> requests
.asFlux()
.doFinally(s -> LOG.debug("Output requests flux terminated with signal {}", s))))
.doFinally(s -> LOG.debug("Output requests sender terminated with signal {}", s))
.repeatWhen(REPEAT_STRATEGY)
.retryWhen(RETRY_STRATEGY)
.subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> requests.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100))));
.subscribe(n -> {}, ex -> {
LOG.error("An error when handling requests killed the requests subscriber!", ex);
requests.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
});
}
public Flux<Timestamped<OnResponse<Object>>> responses() {
@ -84,8 +89,10 @@ public class TdlibChannelsSharedReceive implements Closeable {
return events;
}
public Many<OnRequest<?>> requests() {
return requests;
public void emitRequest(OnRequest<?> request) {
synchronized (requests) {
requests.emitNext(request, EmitFailureHandler.FAIL_FAST);
}
}
@Override

View File

@ -3,88 +3,165 @@ package it.tdlight.reactiveapi.rsocket;
import io.rsocket.Payload;
import it.tdlight.reactiveapi.Timestamped;
import java.time.Duration;
import java.util.Optional;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Schedulers;
class ConsumerConnection<T> {
public class ConsumerConnection<T> {
private Flux<Timestamped<T>> remote;
private static final Logger LOG = LogManager.getLogger(ConsumerConnection.class);
private final String channel;
private Flux<Payload> remote;
private Deserializer<T> local;
private Empty<Void> connected = Sinks.empty();
private Empty<Void> remoteResult = Sinks.empty();
private boolean connectedState = false;
private Empty<Void> connectedSink = Sinks.empty();
private Optional<Throwable> localTerminationState = null;
private Empty<Void> localTerminationSink = Sinks.empty();
public ConsumerConnection(String channel) {
this.channel = channel;
if (LOG.isDebugEnabled()) LOG.debug("{} Create new blank connection", this.printStatus());
}
private String printStatus() {
return "[\"%s\" (%d)%s%s%s]".formatted(channel,
System.identityHashCode(this),
local != null ? ", local" : "",
remote != null ? ", remote" : "",
connectedState ? ((localTerminationState != null) ? (localTerminationState.isPresent() ? ", done with error" : ", done") : ", connected") : ", waiting"
);
}
public synchronized Flux<Timestamped<T>> connectLocal() {
return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> {
if (LOG.isDebugEnabled()) LOG.debug("{} Local is asking to connect", this.printStatus());
return Mono.defer(() -> {
synchronized (ConsumerConnection.this) {
return remote;
return connectedSink.asMono();
}
}));
}).publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> {
synchronized (ConsumerConnection.this) {
if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus());
return RSocketUtils.deserialize(remote, local)
.map(element -> new Timestamped<>(System.currentTimeMillis(), element));
}
})).doOnError(ex -> {
synchronized (ConsumerConnection.this) {
if (remote != null && localTerminationState == null) {
localTerminationState = Optional.of(ex);
if (LOG.isDebugEnabled()) LOG.debug("%s Local connection ended with failure, emitting termination failure".formatted(this.printStatus()), ex);
var sink = localTerminationSink;
reset(true);
sink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("%s Local connection ended with failure, emitted termination failure".formatted(this.printStatus()));
}
}
}).doFinally(s -> {
if (s != SignalType.ON_ERROR) {
synchronized (ConsumerConnection.this) {
if (remote != null && localTerminationState == null) {
assert connectedState;
localTerminationState = Optional.empty();
LOG.debug("{} Remote connection ended with status {}, emitting termination complete", this::printStatus, () -> s);
if (s == SignalType.CANCEL) {
localTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
} else {
localTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this::printStatus, () -> s);
}
reset(false);
}
}
});
}
public synchronized Mono<Void> connectRemote() {
return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote is asking to connect", this.printStatus());
return Mono.defer(() -> {
synchronized (ConsumerConnection.this) {
return remoteResult.asMono();
return connectedSink.asMono();
}
}));
}).publishOn(Schedulers.parallel()).then(Mono.defer(() -> {
synchronized (ConsumerConnection.this) {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote is connected", this.printStatus());
return localTerminationSink.asMono().publishOn(Schedulers.parallel());
}
})).doFinally(s -> {
if (s != SignalType.ON_ERROR) {
synchronized (ConsumerConnection.this) {
//reset(true);
}
}
});
}
public synchronized void resetRemote() {
connected = Sinks.empty();
remoteResult = Sinks.empty();
remote = null;
public synchronized void reset(boolean resettingFromRemote) {
if (LOG.isDebugEnabled()) LOG.debug("{} Reset started", this.printStatus());
if (connectedState) {
if (localTerminationState == null) {
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as open but not terminated, interrupting it", this.printStatus());
var ex = new InterruptedException();
localTerminationState = Optional.of(ex);
localTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus());
}
} else {
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as waiting for a connection, interrupting it", this.printStatus());
localTerminationState = Optional.empty();
localTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus());
}
local = null;
remote = null;
connectedState = false;
connectedSink = Sinks.empty();
localTerminationState = null;
localTerminationSink = Sinks.empty();
if (LOG.isDebugEnabled()) LOG.debug("{} Reset ended", this.printStatus());
}
public synchronized void registerRemote(Flux<Payload> remote) {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus());
if (this.remote != null) {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote was already registered", this.printStatus());
throw new IllegalStateException("Remote is already registered");
}
this.remote = remote
.transformDeferred(flux -> {
synchronized (ConsumerConnection.this) {
assert local != null;
return RSocketUtils.deserialize(flux, local);
}
})
.map(element -> new Timestamped<>(System.currentTimeMillis(), element))
.doOnError(ex -> {
synchronized (ConsumerConnection.this) {
remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
})
.doFinally(s -> {
synchronized (ConsumerConnection.this) {
remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
resetRemote();
}
});
this.remote = remote;
if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus());
onChanged();
}
public synchronized void registerLocal(Deserializer<T> local) {
if (LOG.isDebugEnabled()) LOG.debug("{} Local is trying to register", this.printStatus());
if (this.local != null) {
if (LOG.isDebugEnabled()) LOG.debug("{} Local was already registered", this.printStatus());
throw new IllegalStateException("Local is already registered");
}
this.local = local;
if (LOG.isDebugEnabled()) LOG.debug("{} Local registered", this.printStatus());
onChanged();
}
private synchronized void onChanged() {
if (LOG.isDebugEnabled()) LOG.debug("{} Checking connection changes", this.printStatus());
if (local != null && remote != null) {
connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
connectedState = true;
if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitting connected event", this.printStatus());
connectedSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitted connected event", this.printStatus());
} else {
if (LOG.isDebugEnabled()) LOG.debug("{} Still not connected", this.printStatus());
}
}
}

View File

@ -43,10 +43,10 @@ public class MyRSocketClient implements RSocketChannelManager {
this.nextClient = RSocketConnector.create()
//.setupPayload(DefaultPayload.create("client", "setup-info"))
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.reconnect(retryStrategy)
//.reconnect(retryStrategy)
.connect(transport)
.doOnNext(lastClient::set)
.cache();
.cacheInvalidateIf(RSocket::isDisposed);
}
@Override
@ -70,9 +70,11 @@ public class MyRSocketClient implements RSocketChannelManager {
@Override
public Mono<Void> handleSendMessages(Flux<K> eventsFlux) {
Flux<Payload> rawFlux = eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer));
Flux<Payload> combinedRawFlux = Flux.just(DefaultPayload.create(channelName, "channel")).concatWith(rawFlux);
return nextClient.flatMapMany(client -> client.requestChannel(combinedRawFlux)).take(1, true).then();
return Mono.defer(() -> {
Flux<Payload> rawFlux = eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer));
Flux<Payload> combinedRawFlux = Flux.just(DefaultPayload.create(channelName, "channel")).concatWith(rawFlux);
return nextClient.flatMapMany(client -> client.requestChannel(combinedRawFlux).take(1, true)).then();
});
}
};

View File

@ -43,7 +43,7 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket {
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.bind(TcpServerTransport.create(baseHost.getHost(), baseHost.getPort()))
.doOnNext(d -> logger.debug("Server up"))
.cache();
.cacheInvalidateIf(CloseableChannel::isDisposed);
serverMono.subscribeOn(Schedulers.parallel()).subscribe(v -> {}, ex -> logger.warn("Failed to bind server"));
@ -93,7 +93,7 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket {
return new EventConsumer<K>() {
@Override
public Flux<Timestamped<K>> consumeMessages() {
return Flux.defer(() -> {
return serverCloseable.flatMapMany(x -> {
//noinspection unchecked
var conn = (ConsumerConnection<K>) consumerRegistry.computeIfAbsent(channelName, ConsumerConnection::new);
conn.registerLocal(deserializer);
@ -116,7 +116,7 @@ public class MyRSocketServer implements RSocketChannelManager, RSocket {
return new EventProducer<K>() {
@Override
public Mono<Void> sendMessages(Flux<K> eventsFlux) {
return Mono.defer(() -> {
return serverCloseable.flatMap(x -> {
//noinspection unchecked
var conn = (ProducerConnection<K>) producerRegistry.computeIfAbsent(channelName, ProducerConnection::new);
conn.registerLocal(eventsFlux.transform(flux -> RSocketUtils.serialize(flux, serializer)));

View File

@ -2,80 +2,167 @@ package it.tdlight.reactiveapi.rsocket;
import io.rsocket.Payload;
import java.time.Duration;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.scheduler.Schedulers;
class ProducerConnection<T> {
public class ProducerConnection<T> {
private static final Logger LOG = LogManager.getLogger(ProducerConnection.class);
private final String channel;
private Object remote;
private Flux<Payload> local;
private Empty<Void> connected = Sinks.empty();
private Empty<Void> remoteResult = Sinks.empty();
private boolean connectedState = false;
private Empty<Void> connectedSink = Sinks.empty();
private Optional<Throwable> remoteTerminationState = null;
private Empty<Void> remoteTerminationSink = Sinks.empty();
public ProducerConnection(String channel) {
this.channel = channel;
if (LOG.isDebugEnabled()) LOG.debug("{} Create new blank connection", this.printStatus());
}
private synchronized String printStatus() {
return "[\"%s\" (%d)%s%s%s]".formatted(channel,
System.identityHashCode(this),
local != null ? ", local" : "",
remote != null ? ", remote" : "",
connectedState ? ((remoteTerminationState != null) ? (remoteTerminationState.isPresent() ? ", done with error" : ", done") : ", connected") : ", waiting"
);
}
public synchronized Mono<Void> connectLocal() {
return connected.asMono().publishOn(Schedulers.parallel()).then(Mono.defer(() -> {
if (LOG.isDebugEnabled()) LOG.debug("{} Local is asking to connect", this.printStatus());
return Mono.defer(() -> {
synchronized (ProducerConnection.this) {
return remoteResult.asMono().doFinally(r -> reset());
return connectedSink.asMono();
}
})).doOnError(ex -> {
}).publishOn(Schedulers.parallel()).then(Mono.defer(() -> {
synchronized (ProducerConnection.this) {
remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("{} Local is connected", this.printStatus());
return remoteTerminationSink.asMono().publishOn(Schedulers.parallel());
}
}).doFinally(ended -> reset());
})).doFinally(s -> {
if (s != SignalType.ON_ERROR) {
synchronized (ProducerConnection.this) {
//reset(false);
}
}
});
}
public synchronized Flux<Payload> connectRemote() {
return connected.asMono().publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote is asking to connect", this.printStatus());
return Mono.defer(() -> {
synchronized (ProducerConnection.this) {
return connectedSink.asMono();
}
}).publishOn(Schedulers.parallel()).thenMany(Flux.defer(() -> {
synchronized (ProducerConnection.this) {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote is connected", this.printStatus());
return local;
}
}).doOnError(ex -> {
})).doOnError(ex -> {
synchronized (ProducerConnection.this) {
remoteResult.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (local != null && remoteTerminationState == null) {
remoteTerminationState = Optional.of(ex);
if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitting termination failure".formatted(this.printStatus()), ex);
var sink = remoteTerminationSink;
reset(true);
sink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("%s Remote connection ended with failure, emitted termination failure".formatted(this.printStatus()));
}
}
})).doFinally(ended -> reset());
}).doFinally(s -> {
if (s != SignalType.ON_ERROR) {
synchronized (ProducerConnection.this) {
if (local != null && remoteTerminationState == null) {
assert connectedState;
remoteTerminationState = Optional.empty();
if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitting termination complete", this.printStatus(), s);
if (s == SignalType.CANCEL) {
remoteTerminationSink.emitError(new CancelledChannelException(), EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
} else {
remoteTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
}
if (LOG.isDebugEnabled()) LOG.debug("{} Remote connection ended with status {}, emitted termination complete", this.printStatus(), s);
}
reset(true);
}
}
});
}
public synchronized void reset() {
if (local != null && remote != null) {
remoteResult.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
local = null;
remote = null;
connected = Sinks.empty();
remoteResult = Sinks.empty();
public synchronized void reset(boolean resettingFromRemote) {
if (LOG.isDebugEnabled()) LOG.debug("{} Reset started", this.printStatus());
if (connectedState) {
if (remoteTerminationState == null) {
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as open but not terminated, interrupting it", this.printStatus());
var ex = new InterruptedException();
remoteTerminationState = Optional.of(ex);
remoteTerminationSink.emitError(ex, EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus());
}
} else {
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection is still marked as waiting for a connection, interrupting it", this.printStatus());
remoteTerminationState = Optional.empty();
remoteTerminationSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("{} The previous connection has been interrupted", this.printStatus());
}
local = null;
remote = null;
connectedState = false;
connectedSink = Sinks.empty();
remoteTerminationState = null;
remoteTerminationSink = Sinks.empty();
if (LOG.isDebugEnabled()) LOG.debug("{} Reset ended", this.printStatus());
}
public synchronized void registerRemote() {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote is trying to register", this.printStatus());
if (this.remote != null) {
if (LOG.isDebugEnabled()) LOG.debug("{} Remote was already registered", this.printStatus());
throw new IllegalStateException("Remote is already registered");
}
this.remote = new Object();
if (LOG.isDebugEnabled()) LOG.debug("{} Remote registered", this.printStatus());
onChanged();
}
public synchronized void registerLocal(Flux<Payload> local) {
if (LOG.isDebugEnabled()) LOG.debug("{} Local is trying to register", this.printStatus());
if (this.local != null) {
if (LOG.isDebugEnabled()) LOG.debug("{} Local was already registered", this.printStatus());
throw new IllegalStateException("Local is already registered");
}
this.local = local;
if (LOG.isDebugEnabled()) LOG.debug("{} Local registered", this.printStatus());
onChanged();
}
private synchronized void onChanged() {
if (LOG.isDebugEnabled()) LOG.debug("{} Checking connection changes", this.printStatus());
if (local != null && remote != null) {
connected.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
connectedState = true;
if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitting connected event", this.printStatus());
connectedSink.emitEmpty(EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
if (LOG.isDebugEnabled()) LOG.debug("{} Connected successfully! Emitted connected event", this.printStatus());
} else {
if (LOG.isDebugEnabled()) LOG.debug("{} Still not connected", this.printStatus());
}
}
}

View File

@ -16,6 +16,7 @@
<!-- log only INFO, WARN, ERROR and FATAL logging by classes in this package -->
<AsyncLogger name="io.netty" level="INFO" additivity="false"/>
<AsyncLogger name="io.rsocket" level="INFO" additivity="false"/>
<AsyncLogger name="it.tdlight.reactiveapi.rsocket" level="DEBUG"/>
<AsyncRoot level="DEBUG">
<filters>

View File

@ -212,12 +212,14 @@ public abstract class TestChannel {
.map(Integer::parseUnsignedInt)
.take(50, true)
.collect(Collectors.toCollection(IntArrayList::new));
var response = Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast(Duration.ofSeconds(5));
data.removeElements(50, 100);
Assertions.assertEquals(response, data);
System.out.println(response);
Assertions.assertThrows(Throwable.class, () -> {
var response = Flux
.merge(isConsumerClient() ? (List.of(eventProducer, eventConsumer)) : List.of(eventConsumer, eventProducer))
.blockLast(Duration.ofSeconds(5));
data.removeElements(50, 100);
Assertions.assertEquals(response, data);
System.out.println(response);
});
}
@Test
@ -274,14 +276,14 @@ public abstract class TestChannel {
.map(Integer::parseUnsignedInt)
.take(10, true)
.collect(Collectors.toCollection(IntArrayList::new))
.block();
.block(Duration.ofSeconds(5));
var receiver2 = consumer
.consumeMessages()
.limitRate(1)
.map(Timestamped::data)
.map(Integer::parseUnsignedInt)
.collect(Collectors.toCollection(IntArrayList::new))
.block();
.block(Duration.ofSeconds(5));
Assertions.assertNotNull(receiver1);
Assertions.assertNotNull(receiver2);
receiver1.addAll(receiver2);