package it.tdlight.reactiveapi; import static java.util.Collections.unmodifiableSet; import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.failedFuture; import static reactor.core.publisher.Mono.fromCompletionStage; import; import io.atomix.cluster.messaging.MessagingException; import io.atomix.cluster.messaging.Subscription; import io.atomix.core.Atomix; import io.atomix.core.idgenerator.AsyncAtomicIdGenerator; import io.atomix.core.lock.AsyncAtomicLock; import; import io.atomix.protocols.raft.MultiRaftProtocol; import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest; import; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class AtomixReactiveApi implements ReactiveApi { private static final Logger LOG = LoggerFactory.getLogger(AtomixReactiveApi.class); /** * nodeId is null when DiskSessions is null */ @Nullable private final String nodeId; private final Atomix atomix; private final KafkaProducer kafkaProducer; private final KafkaConsumer kafkaConsumer; private final Set resultingEventTransformerSet; private final AsyncAtomicIdGenerator nextSessionLiveId; private final AsyncAtomicLock sessionModificationLock; private final AsyncAtomicMap userIdToNodeId; /** * live id -> session */ private final ConcurrentMap localLiveSessions = new ConcurrentHashMap<>(); /** * DiskSessions is null when nodeId is null */ @Nullable private final DiskSessionsManager diskSessions; public AtomixReactiveApi(@Nullable String nodeId, Atomix atomix, KafkaParameters kafkaParameters, @Nullable DiskSessionsManager diskSessions, @NotNull Set resultingEventTransformerSet) { this.nodeId = nodeId; this.atomix = atomix; this.kafkaProducer = new KafkaProducer(kafkaParameters); this.kafkaConsumer = new KafkaConsumer(kafkaParameters); this.resultingEventTransformerSet = resultingEventTransformerSet; if (nodeId == null) { if (diskSessions != null) { throw new IllegalArgumentException("A client must not manage disk sessions"); } } else { if (diskSessions == null) { throw new IllegalArgumentException("A node must be able to manage disk sessions"); } } var raftProtocol = MultiRaftProtocol.builder().build(); this.nextSessionLiveId = atomix .atomicIdGeneratorBuilder("session-live-id") .withProtocol(raftProtocol) .build() .async(); this.sessionModificationLock = atomix .atomicLockBuilder("session-modification") .withProtocol(raftProtocol) .build() .async(); this.userIdToNodeId = atomix .atomicMapBuilder("user-id-to-node-id") //.withCacheEnabled(true) //.withCacheSize(4096) .withNullValues(false) .withProtocol(raftProtocol) .build() .async(); this.diskSessions = diskSessions; } @Override public Mono start() { Mono> idsSavedIntoLocalConfiguration = Mono.fromCallable(() -> { if (diskSessions == null) { return Set.of(); } synchronized (diskSessions) { return diskSessions.getSettings().userIdToSession().keySet(); } }); Mono> distributedIds; if (this.nodeId == null) { distributedIds = Mono.just(Set.of()); } else { distributedIds = this .getAllUsers() .flatMapIterable(Map::entrySet) .filter(entry -> entry.getValue().equals(this.nodeId)) .map(Entry::getKey) .collect(Collectors.toUnmodifiableSet()); } record DiskChanges(Set normalIds, Set addedIds, Set removedIds) {} var diskChangesMono =, distributedIds).map(tuple -> { var localSet = tuple.getT1(); var remoteSet = tuple.getT2(); var deletedUsers = new HashSet<>(remoteSet); deletedUsers.removeAll(localSet); var addedUsers = new HashSet<>(localSet); addedUsers.removeAll(remoteSet); var normalUsers = new HashSet<>(localSet); normalUsers.removeAll(addedUsers); for (long user : addedUsers) { LOG.warn("Detected a new user id from the disk configuration file: {}", user); } for (long user : normalUsers) {"Detected a user id from the disk configuration file: {}", user); } for (long user : deletedUsers) { LOG.warn("The user id {} has been deleted from the disk configuration file", user); } return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(deletedUsers)); }).cache(); var removeObsoleteDiskSessions = diskChangesMono .flatMapIterable(diskChanges -> diskChanges.removedIds) .flatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId))) .then(); var addedDiskSessionsFlux = diskChangesMono .flatMapIterable(diskChanges -> diskChanges.addedIds) .flatMap(this::getLocalDiskSession); var normalDiskSessionsFlux = diskChangesMono .flatMapIterable(diskChanges -> diskChanges.normalIds) .flatMap(this::getLocalDiskSession); var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> { var id =; var diskSession = diskSessionAndId.diskSession; return createSession(new LoadSessionFromDiskRequest(id, diskSession.token, diskSession.phoneNumber, true )); }).then(); var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> { var id =; var diskSession = diskSessionAndId.diskSession; return createSession(new LoadSessionFromDiskRequest(id, diskSession.token, diskSession.phoneNumber, false )); }).then(); var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions) .subscribeOn(Schedulers.boundedElastic()) .doOnTerminate(() ->"Loaded all saved sessions from disk")); // Listen for create-session signals Mono createSessionSubscriptionMono; if (nodeId != null) { createSessionSubscriptionMono = fromCompletionStage(() -> atomix .getEventService() .subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> { if (req instanceof LoadSessionFromDiskRequest) { return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster")); } else { return createSession(req).toFuture(); } }, CreateSessionResponse::serializeBytes)); } else { createSessionSubscriptionMono = Mono.empty(); } // Listen for revive-session signals Mono reviveSessionSubscriptionMono; if (nodeId != null) { reviveSessionSubscriptionMono = fromCompletionStage(() -> atomix .getEventService() .subscribe("revive-session", (Long userId) -> this.getLocalDiskSession(userId).flatMap(sessionAndId -> { var diskSession = sessionAndId.diskSession(); var request = new LoadSessionFromDiskRequest(userId, diskSession.token, diskSession.phoneNumber, false); return this.createSession(request); }).onErrorResume(ex -> Mono.empty()).then(Mono.empty()).toFuture())); } else { reviveSessionSubscriptionMono = Mono.empty(); } return diskInitMono.then(Mono.when(createSessionSubscriptionMono, reviveSessionSubscriptionMono)); } private CompletableFuture destroySession(long userId, String nodeId) { LOG.debug("Received session delete request: user_id={}, node_id=\"{}\"", userId, nodeId); // Lock sessions modification return sessionModificationLock .lock() .thenCompose(lockVersion -> { LOG.trace("Obtained session modification lock for session delete request: {} \"{}\"", userId, nodeId); return userIdToNodeId .remove(userId, nodeId) .thenAccept(deleted -> LOG.debug("Deleted session {} \"{}\": {}", userId, nodeId, deleted)); }) .whenComplete((response, error) -> sessionModificationLock .unlock() .thenRun(() -> LOG.trace("Released session modification lock for session delete request: {} \"{}\"", userId, nodeId)) ) .whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex)); } /** * Send a request to the cluster to load that user id from disk */ public Mono tryReviveSession(long userId) { return Mono.fromRunnable(() -> atomix.getEventService().broadcast("revive-session", userId)); } @Override public Mono createSession(CreateSessionRequest req) { LOG.debug("Received create session request: {}", req); if (nodeId == null) { return Mono.error(new UnsupportedOperationException("This is a client, it can't have own sessions")); } Mono unlockedSessionCreationMono = Mono.defer(() -> { LOG.trace("Obtained session modification lock for session request: {}", req); // Generate session id return this .nextFreeLiveId() .flatMap(liveId -> { // Create the session instance ReactiveApiPublisher reactiveApiPublisher; boolean loadedFromDisk; long userId; String botToken; Long phoneNumber; if (req instanceof CreateBotSessionRequest createBotSessionRequest) { loadedFromDisk = false; userId = createBotSessionRequest.userId(); botToken = createBotSessionRequest.token(); phoneNumber = null; reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, botToken ); } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) { loadedFromDisk = false; userId = createUserSessionRequest.userId(); botToken = null; phoneNumber = createUserSessionRequest.phoneNumber(); reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, phoneNumber ); } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) { loadedFromDisk = true; userId = loadSessionFromDiskRequest.userId(); botToken = loadSessionFromDiskRequest.token(); phoneNumber = loadSessionFromDiskRequest.phoneNumber(); if (loadSessionFromDiskRequest.phoneNumber() != null) { reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, phoneNumber ); } else { reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, kafkaProducer, resultingEventTransformerSet, liveId, userId, botToken ); } } else { return Mono.error(new UnsupportedOperationException("Unexpected value: " + req)); } // Register the session instance to the local nodes map var prev = localLiveSessions.put(liveId, reactiveApiPublisher); if (prev != null) { LOG.error("User id \"{}\" was already registered locally! {}", liveId, prev); } // Register the session instance to the distributed nodes map return Mono .fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable)) .flatMap(prevDistributed -> { if (prevDistributed.isPresent() && prevDistributed.get().value() != null && !Objects.equals(this.nodeId, prevDistributed.get().value())) { LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.get().value()); } var saveToDiskMono = Mono .fromCallable(() -> { // Save updated sessions configuration to disk try { Objects.requireNonNull(diskSessions); synchronized (diskSessions) {; return null; } } catch (IOException e) { throw new CompletionException("Failed to save disk sessions configuration", e); } }) .subscribeOn(Schedulers.boundedElastic()); // Start the session instance return Mono .fromCallable(() -> { Objects.requireNonNull(diskSessions); synchronized (diskSessions) { return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path), "Session " + userId + " path is missing"); } }) .subscribeOn(Schedulers.boundedElastic()) .flatMap(baseSessionsPath -> { String diskSessionFolderName = "id" + Long.toUnsignedString(userId); Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName); if (!loadedFromDisk) { // Create the disk session configuration var diskSession = new DiskSession(botToken, phoneNumber); return Mono.fromCallable(() -> { Objects.requireNonNull(diskSessions); synchronized (diskSessions) { diskSessions.getSettings().userIdToSession().put(userId, diskSession); return null; } }).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath); } else { return Mono.just(sessionPath); } }) .doOnNext(path -> reactiveApiPublisher.start(path, () -> AtomixReactiveApi.this.onPublisherClosed(userId, liveId) )) .thenReturn(new CreateSessionResponse(liveId)); }); }); }); // Lock sessions creation return Mono .usingWhen(Mono.fromCompletionStage(sessionModificationLock::lock), lockVersion -> unlockedSessionCreationMono, lockVersion -> Mono .fromCompletionStage(sessionModificationLock::unlock) .doOnTerminate(() -> LOG.trace("Released session modification lock for session request: {}", req)) ) .doOnNext(resp -> LOG.debug("Handled session request {}, the response is: {}", req, resp)) .doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex)); } private void onPublisherClosed(long userId, Long liveId) { this.destroySession(userId, nodeId).whenComplete((result, ex) -> { localLiveSessions.remove(liveId); if (ex != null) { LOG.error("Failed to close the session for user {} after it was closed itself", userId); } else { LOG.debug("Closed the session for user {} after it was closed itself", userId); } }); } private Mono nextFreeLiveId() { return Mono.fromCompletionStage(nextSessionLiveId::nextId); } public Atomix getAtomix() { return atomix; } /** * Get the list of current sessions * @return map of user id -> node id */ @Override public Mono> getAllUsers() { return Flux.defer(() -> { var it = userIdToNodeId.entrySet().iterator(); var hasNextMono = fromCompletionStage(it::hasNext); var strictNextMono = fromCompletionStage(it::next) .map(elem -> Map.entry(elem.getKey(), elem.getValue().value())); var nextOrNothingMono = hasNextMono.flatMap(hasNext -> { if (hasNext) { return strictNextMono; } else { return Mono.empty(); } }); return nextOrNothingMono.repeatWhen(s -> s.takeWhile(n -> n > 0)); }).collectMap(Entry::getKey, Entry::getValue); } @Override public Set getLocalLiveSessionIds() { return localLiveSessions .values() .stream() .map(reactiveApiPublisher -> new UserIdAndLiveId(reactiveApiPublisher.userId, reactiveApiPublisher.liveId)) .collect(Collectors.toUnmodifiableSet()); } @Override public boolean is(String nodeId) { if (this.nodeId == null) { return nodeId == null; } return this.nodeId.equals(nodeId); } @Override public Mono resolveUserLiveId(long userId) { return Mono .fromCompletionStage(() -> atomix .getEventService() .send(SubjectNaming.getDynamicIdResolveSubject(userId), userId, Longs::toByteArray, Longs::fromByteArray, Duration.ofSeconds(1) )) .onErrorResume(ex -> { if (ex instanceof MessagingException.NoRemoteHandler) { return Mono.empty(); } else { return Mono.error(ex); } }); } @Override public ReactiveApiClient dynamicClient(long userId) { return new DynamicAtomixReactiveApiClient(this, kafkaConsumer, userId); } @Override public ReactiveApiClient liveClient(long liveId, long userId) { return new LiveAtomixReactiveApiClient(atomix, kafkaConsumer, liveId, userId); } @Override public ReactiveApiMultiClient multiClient() { return new AtomixReactiveApiMultiClient(this, kafkaConsumer); } @Override public Mono close() { var atomixStopper = Mono.fromCompletionStage(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty()); var kafkaStopper = Mono.fromRunnable(kafkaProducer::close).subscribeOn(Schedulers.boundedElastic()); return Mono.when(atomixStopper, kafkaStopper); } private record DiskSessionAndId(DiskSession diskSession, long id) {} private Mono getLocalDiskSession(Long localUserId) { return Mono.fromCallable(() -> { Objects.requireNonNull(diskSessions); synchronized (diskSessions) { var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localUserId), "Id not found: " + localUserId ); try { diskSession.validate(); } catch (Throwable ex) { LOG.error("Failed to load disk session {}", localUserId, ex); return null; } return new DiskSessionAndId(diskSession, localUserId); } }).subscribeOn(Schedulers.boundedElastic()); } }