diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index 726b02a..b11f606 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -1,9 +1,9 @@ package it.tdlight.reactiveapi; +import static it.tdlight.reactiveapi.AtomixUtils.fromCf; import static java.util.Collections.unmodifiableSet; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.failedFuture; -import static reactor.core.publisher.Mono.fromCompletionStage; import com.google.common.primitives.Longs; import io.atomix.cluster.messaging.MessagingException; @@ -163,7 +163,7 @@ public class AtomixReactiveApi implements ReactiveApi { var removeObsoleteDiskSessions = diskChangesMono .flatMapIterable(diskChanges -> diskChanges.removedIds) - .concatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId))) + .concatMap(removedIds -> fromCf(() -> destroySession(removedIds, nodeId))) .then(); var addedDiskSessionsFlux = diskChangesMono @@ -200,7 +200,7 @@ public class AtomixReactiveApi implements ReactiveApi { // Listen for create-session signals Mono createSessionSubscriptionMono; if (nodeId != null) { - createSessionSubscriptionMono = fromCompletionStage(() -> atomix + createSessionSubscriptionMono = fromCf(() -> atomix .getEventService() .subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> { if (req instanceof LoadSessionFromDiskRequest) { @@ -216,7 +216,7 @@ public class AtomixReactiveApi implements ReactiveApi { // Listen for revive-session signals Mono reviveSessionSubscriptionMono; if (nodeId != null) { - reviveSessionSubscriptionMono = fromCompletionStage(() -> atomix + reviveSessionSubscriptionMono = fromCf(() -> atomix .getEventService() .subscribe("revive-session", (Long userId) -> this.getLocalDiskSession(userId).flatMap(sessionAndId -> { var diskSession = sessionAndId.diskSession(); @@ -325,8 +325,8 @@ public class AtomixReactiveApi implements ReactiveApi { } // Register the session instance to the distributed nodes map - return Mono - .fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable)) + return AtomixUtils + .fromCf(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable)) .flatMap(prevDistributed -> { if (prevDistributed.isPresent() && prevDistributed.get().value() != null && !Objects.equals(this.nodeId, prevDistributed.get().value())) { @@ -388,10 +388,10 @@ public class AtomixReactiveApi implements ReactiveApi { // Lock sessions creation return Mono - .usingWhen(Mono.fromCompletionStage(sessionModificationLock::lock), + .usingWhen(AtomixUtils.fromCf(sessionModificationLock::lock), lockVersion -> unlockedSessionCreationMono, - lockVersion -> Mono - .fromCompletionStage(sessionModificationLock::unlock) + lockVersion -> AtomixUtils + .fromCf(sessionModificationLock::unlock) .doOnTerminate(() -> LOG.trace("Released session modification lock for session request: {}", req)) ) .doOnNext(resp -> LOG.debug("Handled session request {}, the response is: {}", req, resp)) @@ -410,7 +410,7 @@ public class AtomixReactiveApi implements ReactiveApi { } private Mono nextFreeLiveId() { - return Mono.fromCompletionStage(nextSessionLiveId::nextId); + return fromCf(nextSessionLiveId::nextId); } public Atomix getAtomix() { @@ -425,8 +425,8 @@ public class AtomixReactiveApi implements ReactiveApi { public Mono> getAllUsers() { return Flux.defer(() -> { var it = userIdToNodeId.entrySet().iterator(); - var hasNextMono = fromCompletionStage(it::hasNext); - var strictNextMono = fromCompletionStage(it::next) + var hasNextMono = fromCf(it::hasNext); + var strictNextMono = fromCf(it::next) .map(elem -> Map.entry(elem.getKey(), elem.getValue().value())); var nextOrNothingMono = hasNextMono.flatMap(hasNext -> { @@ -459,8 +459,8 @@ public class AtomixReactiveApi implements ReactiveApi { @Override public Mono resolveUserLiveId(long userId) { - return Mono - .fromCompletionStage(() -> atomix + return AtomixUtils + .fromCf(() -> atomix .getEventService() .send(SubjectNaming.getDynamicIdResolveSubject(userId), userId, @@ -494,7 +494,7 @@ public class AtomixReactiveApi implements ReactiveApi { @Override public Mono close() { - var atomixStopper = Mono.fromCompletionStage(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty()); + var atomixStopper = fromCf(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty()); var kafkaStopper = Mono.fromRunnable(kafkaProducer::close).subscribeOn(Schedulers.boundedElastic()); return Mono.when(atomixStopper, kafkaStopper); } diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java index 7129844..ea80246 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -1,5 +1,7 @@ package it.tdlight.reactiveapi; +import static it.tdlight.reactiveapi.AtomixUtils.fromCf; + import io.atomix.cluster.messaging.ClusterEventService; import io.atomix.cluster.messaging.MessagingException; import it.tdlight.jni.TdApi; @@ -8,6 +10,8 @@ import it.tdlight.reactiveapi.Event.Request; import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -36,7 +40,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut @Override public Mono request(long userId, long liveId, TdApi.Function request, Instant timeout) { - return Mono.fromCompletionStage(() -> { + return fromCf(() -> { if (closed) { return CompletableFuture.failedFuture(new TdError(500, "Session is closed")); } @@ -56,6 +60,10 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut }).onErrorMap(ex -> { if (ex instanceof MessagingException.NoRemoteHandler) { return new TdError(404, "Bot #IDU" + userId + " (live id: " + liveId + ") is not found on the cluster"); + } else if (ex instanceof CompletionException && ex.getCause() instanceof TimeoutException) { + return new TdError(408, "Request Timeout", ex); + } else if (ex instanceof TimeoutException) { + return new TdError(408, "Request Timeout", ex); } else { return ex; } diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixUtils.java b/src/main/java/it/tdlight/reactiveapi/AtomixUtils.java new file mode 100644 index 0000000..e03f6d4 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/AtomixUtils.java @@ -0,0 +1,29 @@ +package it.tdlight.reactiveapi; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.function.Supplier; +import reactor.core.publisher.Mono; + +public class AtomixUtils { + + public static Mono fromCf(Supplier> completableFutureSupplier) { + return Mono.create(sink -> { + var cf = completableFutureSupplier.get(); + cf.whenComplete((result, ex) -> { + if (ex != null) { + if (ex instanceof CompletionException) { + sink.error(ex.getCause()); + } else { + sink.error(ex); + } + } else if (result != null) { + sink.success(result); + } else { + sink.success(); + } + }); + sink.onCancel(() -> cf.cancel(true)); + }); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java index 32802a8..db42be0 100644 --- a/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/BaseAtomixReactiveApiClient.java @@ -21,6 +21,7 @@ import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.SerializationException; import org.slf4j.Logger; @@ -62,8 +63,8 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo .take(1, true) .singleOrEmpty() .switchIfEmpty(emptyIdErrorMono) - .flatMap(liveId -> Mono - .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", + .flatMap(liveId -> AtomixUtils + .fromCf(() -> eventService.send("session-" + liveId + "-requests", new Request<>(liveId, request, timeout), LiveAtomixReactiveApiClient::serializeRequest, LiveAtomixReactiveApiClient::deserializeResponse, @@ -73,6 +74,8 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo .onErrorMap(ex -> { if (ex instanceof MessagingException.NoRemoteHandler) { return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster"); + } else if (ex instanceof CompletionException && ex.getCause() instanceof TimeoutException) { + return new TdError(408, "Request Timeout", ex); } else if (ex instanceof TimeoutException) { return new TdError(408, "Request Timeout", ex); } else { @@ -80,13 +83,24 @@ abstract class BaseAtomixReactiveApiClient implements ReactiveApiClient, AutoClo } }) ) - .handle((item, sink) -> { + .handle((item, sink) -> { if (item instanceof TdApi.Error error) { sink.error(new TdError(error.code, error.message)); } else { //noinspection unchecked sink.next((T) item); } + }) + .onErrorMap(ex -> { + if (ex instanceof MessagingException.NoRemoteHandler) { + return new TdError(404, "Bot #IDU" + this.userId + " is not found on the cluster"); + } else if (ex instanceof CompletionException && ex.getCause() instanceof TimeoutException) { + return new TdError(408, "Request Timeout", ex); + } else if (ex instanceof TimeoutException) { + return new TdError(408, "Request Timeout", ex); + } else { + return ex; + } }); }