From 07c6bd11401c09e86e12a3a54aa4d0fab6aab03b Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 5 Dec 2021 23:47:54 +0100 Subject: [PATCH] Rewrite local sessions management --- src/main/java/it/tdlight/reactiveapi/Cli.java | 46 +- .../reactiveapi/CreateSessionRequest.java | 5 +- .../it/tdlight/reactiveapi/DiskSession.java | 5 +- .../it/tdlight/reactiveapi/DiskSessions.java | 11 +- .../it/tdlight/reactiveapi/Entrypoint.java | 33 +- .../it/tdlight/reactiveapi/ReactiveApi.java | 417 ++++++++++++------ .../reactiveapi/ReactiveApiPublisher.java | 37 +- src/main/resources/log4j2.xml | 6 +- 8 files changed, 382 insertions(+), 178 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/Cli.java b/src/main/java/it/tdlight/reactiveapi/Cli.java index 89aa951..490c683 100644 --- a/src/main/java/it/tdlight/reactiveapi/Cli.java +++ b/src/main/java/it/tdlight/reactiveapi/Cli.java @@ -1,11 +1,12 @@ package it.tdlight.reactiveapi; import io.atomix.core.Atomix; -import io.atomix.core.AtomixBuilder; import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; import java.io.IOException; +import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import net.minecrell.terminalconsole.SimpleTerminalConsole; import org.jline.reader.LineReader; @@ -34,13 +35,19 @@ public class Cli { var console = new SimpleTerminalConsole() { - private static final Set commands = Set.of("exit", "stop", "createsession", "help", "man", "?"); + private static final Set commands = Set.of("exit", + "stop", + "createsession", + "help", + "man", + "?", + "sessions", + "localsessions" + ); @Override protected LineReader buildReader(LineReaderBuilder builder) { - var reader = super.buildReader(builder); - reader.addCommandsInBuffer(commands); - return reader; + return super.buildReader(builder); } @Override @@ -59,18 +66,40 @@ public class Cli { commandArgs = ""; } switch (commandName) { - case "exit", "stop" -> acceptInputs.set(false); + case "exit", "stop" -> shutdown(); case "createsession" -> createSession(api, commandArgs); case "help", "?", "man" -> LOG.info("Commands: {}", commands); + case "sessions" -> printSessions(api, false); + case "localsessions" -> printSessions(api, true); default -> LOG.info("Unknown command \"{}\"", command); } } + private void printSessions(ReactiveApi api, boolean onlyLocal) { + api.getAllUsers().subscribe(sessions -> { + StringBuilder sb = new StringBuilder(); + sb.append("Sessions:\n"); + for (var userEntry : sessions.entrySet()) { + var userId = userEntry.getKey(); + var nodeId = userEntry.getValue(); + if (!onlyLocal || api.is(nodeId)) { + sb.append(" - session #IDU").append(userId); + if (!onlyLocal) { + sb.append(": ").append(nodeId); + } + sb.append("\n"); + } + } + LOG.info(sb.toString()); + }); + } + @Override protected void shutdown() { acceptInputs.set(false); if (alreadyShutDown.compareAndSet(false, true)) { api.getAtomix().stop().join(); + System.exit(0); } } }; @@ -91,8 +120,9 @@ public class Cli { } }; if (!invalid) { - CreateSessionResponse response = api.createSession(request).join(); - LOG.info("Created a session with session id \"{}\"", response.sessionId()); + api + .createSession(request) + .thenAccept(response -> LOG.info("Created a session with live id \"{}\"", response.sessionId())); } } else { invalid = true; diff --git a/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java b/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java index 0c0621e..645784c 100644 --- a/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java +++ b/src/main/java/it/tdlight/reactiveapi/CreateSessionRequest.java @@ -28,7 +28,6 @@ public sealed interface CreateSessionRequest permits CreateUserSessionRequest, C case 2 -> { var dis = new DataInputStream(new ByteArrayInputStream(bytes, 1 + Long.BYTES, bytes.length - (1 + Long.BYTES))); try { - var pathName = dis.readUTF(); var isBot = dis.readBoolean(); String token; Long phoneNumber; @@ -39,7 +38,7 @@ public sealed interface CreateSessionRequest permits CreateUserSessionRequest, C token = null; phoneNumber = dis.readLong(); } - yield new LoadSessionFromDiskRequest(userId, pathName, token, phoneNumber); + yield new LoadSessionFromDiskRequest(userId, token, phoneNumber, dis.readBoolean()); } catch (IOException e) { throw new SerializationException(e); } @@ -52,7 +51,7 @@ public sealed interface CreateSessionRequest permits CreateUserSessionRequest, C final record CreateBotSessionRequest(long userId, String token) implements CreateSessionRequest {} - final record LoadSessionFromDiskRequest(long userId, String pathName, String token, Long phoneNumber) implements + final record LoadSessionFromDiskRequest(long userId, String token, Long phoneNumber, boolean createNew) implements CreateSessionRequest { public LoadSessionFromDiskRequest { diff --git a/src/main/java/it/tdlight/reactiveapi/DiskSession.java b/src/main/java/it/tdlight/reactiveapi/DiskSession.java index 16fd4ab..70f05b1 100644 --- a/src/main/java/it/tdlight/reactiveapi/DiskSession.java +++ b/src/main/java/it/tdlight/reactiveapi/DiskSession.java @@ -9,17 +9,14 @@ import org.jetbrains.annotations.Nullable; @JsonInclude(Include.NON_NULL) public class DiskSession { - public long userId; @Nullable public String token; @Nullable public Long phoneNumber; @JsonCreator - public DiskSession(@JsonProperty(required = true, value = "userId") long userId, - @JsonProperty("token") @Nullable String token, + public DiskSession(@JsonProperty("token") @Nullable String token, @JsonProperty("phoneNumber") @Nullable Long phoneNumber) { - this.userId = userId; this.token = token; this.phoneNumber = phoneNumber; this.validate(); diff --git a/src/main/java/it/tdlight/reactiveapi/DiskSessions.java b/src/main/java/it/tdlight/reactiveapi/DiskSessions.java index e97a280..8ad1328 100644 --- a/src/main/java/it/tdlight/reactiveapi/DiskSessions.java +++ b/src/main/java/it/tdlight/reactiveapi/DiskSessions.java @@ -2,7 +2,6 @@ package it.tdlight.reactiveapi; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import java.util.List; import java.util.Map; import org.jetbrains.annotations.NotNull; @@ -15,12 +14,16 @@ public class DiskSessions { * key: session folder name */ @NotNull - public Map sessions; + private Map sessions; @JsonCreator public DiskSessions(@JsonProperty(required = true, value = "path") @NotNull String path, - @JsonProperty(required = true, value = "sessions") @NotNull Map sessions) { + @JsonProperty(required = true, value = "sessions") @NotNull Map userIdToSession) { this.path = path; - this.sessions = sessions; + this.sessions = userIdToSession; + } + + public Map userIdToSession() { + return sessions; } } diff --git a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java index b2f8117..3dc481c 100644 --- a/src/main/java/it/tdlight/reactiveapi/Entrypoint.java +++ b/src/main/java/it/tdlight/reactiveapi/Entrypoint.java @@ -6,9 +6,11 @@ import io.atomix.cluster.Node; import io.atomix.cluster.discovery.BootstrapDiscoveryProvider; import io.atomix.core.Atomix; import io.atomix.core.AtomixBuilder; +import io.atomix.core.profile.ConsensusProfileConfig; import io.atomix.core.profile.Profile; import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup; import io.atomix.protocols.raft.partition.RaftPartitionGroup; +import io.atomix.storage.StorageLevel; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -56,8 +58,10 @@ public class Entrypoint { atomixBuilder.withClusterId(clusterSettings.id); + String nodeId; if (instanceSettings.client) { atomixBuilder.withMemberId(instanceSettings.id).withAddress(instanceSettings.clientAddress); + nodeId = null; } else { // Find node settings var nodeSettingsOptional = clusterSettings.nodes @@ -74,6 +78,7 @@ public class Entrypoint { var nodeSettings = nodeSettingsOptional.get(); atomixBuilder.withMemberId(instanceSettings.id).withAddress(nodeSettings.address); + nodeId = nodeSettings.id; } var bootstrapDiscoveryProviderNodes = new ArrayList(); @@ -92,18 +97,32 @@ public class Entrypoint { .builder("system") .withNumPartitions(1) .withMembers(systemPartitionGroupMembers) + .withDataDirectory(Paths.get(".data-" + instanceSettings.id).toFile()) .build()); - atomixBuilder.withPartitionGroups(PrimaryBackupPartitionGroup.builder("data").withNumPartitions(32).build()); + /*atomixBuilder.withPartitionGroups(RaftPartitionGroup + .builder("raft") + .withNumPartitions(3) + .withFlushOnCommit() + .withStorageLevel(StorageLevel.MAPPED) + .withDataDirectory(Paths.get(".data-" + instanceSettings.id).toFile()) + .build()); + */ atomixBuilder.withShutdownHook(false); atomixBuilder.withTypeRegistrationRequired(); if (instanceSettings.client) { - atomixBuilder.addProfile(Profile.consensus(systemPartitionGroupMembers)); - atomixBuilder.addProfile(Profile.dataGrid(32)); - } else { atomixBuilder.addProfile(Profile.client()); + } else { + var prof = Profile.consensus(systemPartitionGroupMembers); + var profCfg = (ConsensusProfileConfig) prof.config(); + //profCfg.setDataGroup("raft"); + profCfg.setDataPath(".data-" + instanceSettings.id); + //profCfg.setPartitions(3); + //profCfg.setManagementGroup("system"); + atomixBuilder.addProfile(prof); + //atomixBuilder.addProfile(Profile.dataGrid(32)); } atomixBuilder.withCompatibleSerialization(false); @@ -114,11 +133,13 @@ public class Entrypoint { atomix.start().join(); - var api = new ReactiveApi(atomix, diskSessions); + var api = new ReactiveApi(nodeId, atomix, diskSessions); LOG.info("Starting ReactiveApi..."); - api.start(); + api.start().block(); + + LOG.info("Started ReactiveApi"); return api; } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java index ee23a38..ac9044c 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java @@ -1,199 +1,348 @@ package it.tdlight.reactiveapi; +import static java.util.Collections.unmodifiableSet; +import static java.util.Objects.requireNonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; +import static reactor.core.publisher.Mono.fromCompletionStage; import io.atomix.core.Atomix; import io.atomix.core.idgenerator.AsyncAtomicIdGenerator; import io.atomix.core.lock.AsyncAtomicLock; import io.atomix.core.map.AsyncAtomicMap; +import io.atomix.protocols.raft.MultiRaftProtocol; import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest; import java.io.IOException; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; +import java.util.Map; import java.util.Map.Entry; -import java.util.UUID; +import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class ReactiveApi { private static final Logger LOG = LoggerFactory.getLogger(ReactiveApi.class); + @NotNull + private final String nodeId; private final Atomix atomix; private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.parallel()); private static final SchedulerExecutor BOUNDED_ELASTIC_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic()); - private final AsyncAtomicIdGenerator nextSessionId; + private final AsyncAtomicIdGenerator nextSessionLiveId; private final AsyncAtomicLock sessionModificationLock; - private final AsyncAtomicMap sessionIdToUserId; - private final ConcurrentMap localNodeSessions = new ConcurrentHashMap<>(); + private final AsyncAtomicMap userIdToNodeId; + /** + * User id -> session + */ + private final ConcurrentMap localLiveSessions = new ConcurrentHashMap<>(); private final DiskSessionsManager diskSessions; - public ReactiveApi(Atomix atomix, DiskSessionsManager diskSessions) { + public ReactiveApi(@NotNull String nodeId, Atomix atomix, DiskSessionsManager diskSessions) { + this.nodeId = nodeId; this.atomix = atomix; - this.nextSessionId = atomix.getAtomicIdGenerator("session-id").async(); - this.sessionIdToUserId = atomix.getAtomicMap("session-id-to-user-id").async(); - this.sessionModificationLock = atomix.getAtomicLock("session-modification").async(); + + var raftProtocol = MultiRaftProtocol.builder().build(); + this.nextSessionLiveId = atomix + .atomicIdGeneratorBuilder("session-live-id") + .withProtocol(raftProtocol) + .build() + .async(); + this.sessionModificationLock = atomix + .atomicLockBuilder("session-modification") + .withProtocol(raftProtocol) + .build() + .async(); + + this.userIdToNodeId = atomix + .atomicMapBuilder("user-id-to-node-id") + //.withCacheEnabled(true) + //.withCacheSize(4096) + .withNullValues(false) + .withProtocol(raftProtocol) + .build() + .async(); + this.diskSessions = diskSessions; } - public void start() { - CompletableFuture.runAsync(() -> { - List> requests = new ArrayList<>(); + public Mono start() { + Mono> idsSavedIntoLocalConfiguration = Mono.fromCallable(() -> { synchronized (diskSessions) { - for (Entry entry : diskSessions.getSettings().sessions.entrySet()) { - try { - entry.getValue().validate(); - } catch (Throwable ex) { - LOG.error("Failed to load disk session {}", entry.getKey(), ex); - } - var sessionFolderName = entry.getKey(); - var diskSession = entry.getValue(); - requests.add(createSession(new LoadSessionFromDiskRequest(diskSession.userId, - sessionFolderName, - diskSession.token, - diskSession.phoneNumber - ))); - } + return diskSessions.getSettings().userIdToSession().keySet(); } - CompletableFuture - .allOf(requests.toArray(CompletableFuture[]::new)) - .thenAccept(responses -> LOG.info("Loaded all saved sessions from disk")); - }, BOUNDED_ELASTIC_EXECUTOR); + }); + Mono> distributedIds = this + .getAllUsers() + .flatMapIterable(Map::entrySet) + .filter(entry -> entry.getValue().equals(nodeId)) + .map(Entry::getKey) + .collect(Collectors.toUnmodifiableSet()); + + record DiskChanges(Set normalIds, Set addedIds, Set removedIds) {} + + var diskChangesMono = Mono.zip(idsSavedIntoLocalConfiguration, distributedIds).map(tuple -> { + var localSet = tuple.getT1(); + var remoteSet = tuple.getT2(); + + var deletedUsers = new HashSet<>(remoteSet); + deletedUsers.removeAll(localSet); + + var addedUsers = new HashSet<>(localSet); + addedUsers.removeAll(remoteSet); + + var normalUsers = new HashSet<>(localSet); + normalUsers.removeAll(addedUsers); + + for (long user : addedUsers) { + LOG.warn("Detected a new user id from the disk configuration file: {}", user); + } + for (long user : normalUsers) { + LOG.info("Detected a user id from the disk configuration file: {}", user); + } + for (long user : deletedUsers) { + LOG.warn("The user id {} has been deleted from the disk configuration file", user); + } + + return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(remoteSet)); + }).cache(); + + var removeObsoleteDiskSessions = diskChangesMono + .flatMapIterable(diskChanges -> diskChanges.removedIds) + .flatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId))) + .then(); + + var addedDiskSessionsFlux = diskChangesMono + .flatMapIterable(diskChanges -> diskChanges.addedIds) + .flatMap(this::getLocalDiskSession); + var normalDiskSessionsFlux = diskChangesMono + .flatMapIterable(diskChanges -> diskChanges.normalIds) + .flatMap(this::getLocalDiskSession); + + var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> { + var id = diskSessionAndId.id; + var diskSession = diskSessionAndId.diskSession; + return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id, + diskSession.token, + diskSession.phoneNumber, + true + ))); + }).then(); + + var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> { + var id = diskSessionAndId.id; + var diskSession = diskSessionAndId.diskSession; + return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id, + diskSession.token, + diskSession.phoneNumber, + false + ))); + }).then(); + + var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions) + .subscribeOn(Schedulers.boundedElastic()) + .doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk")); // Listen for create-session signals - atomix.getEventService().subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> { - if (req instanceof LoadSessionFromDiskRequest) { - return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster")); - } else { - return createSession(req); - } - }, CreateSessionResponse::serializeBytes); + var subscriptionMono = fromCompletionStage(() -> atomix + .getEventService() + .subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> { + if (req instanceof LoadSessionFromDiskRequest) { + return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster")); + } else { + return createSession(req); + } + }, CreateSessionResponse::serializeBytes)); + + return diskInitMono.then(subscriptionMono).then(); + } + + private CompletableFuture destroySession(long userId, String nodeId) { + LOG.debug("Received session delete request: userid={}, nodeid=\"{}\"", userId, nodeId); + + // Lock sessions modification + return sessionModificationLock + .lock() + .thenCompose(lockVersion -> { + LOG.trace("Obtained session modification lock for session delete request: {} \"{}\"", userId, nodeId); + return userIdToNodeId + .remove(userId, nodeId) + .thenAccept(deleted -> LOG.debug("Deleted session {} \"{}\": {}", userId, nodeId, deleted)); + }) + .whenComplete((response, error) -> sessionModificationLock + .unlock() + .thenRun(() -> LOG.trace("Released session modification lock for session delete request: {} \"{}\"", userId, nodeId)) + ) + .whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex)); } public CompletableFuture createSession(CreateSessionRequest req) { + LOG.debug("Received create session request: {}", req); // Lock sessions creation - return sessionModificationLock.lock().thenCompose(lockVersion -> { - // Generate session id - return this.nextFreeId().thenCompose(sessionId -> { - // Create the session instance - ReactiveApiPublisher reactiveApiPublisher; - boolean loadedFromDisk; - long userId; - String botToken; - Long phoneNumber; - if (req instanceof CreateBotSessionRequest createBotSessionRequest) { - loadedFromDisk = false; - userId = createBotSessionRequest.userId(); - botToken = createBotSessionRequest.token(); - phoneNumber = null; - reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, sessionId, userId, botToken); - } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) { - loadedFromDisk = false; - userId = createUserSessionRequest.userId(); - botToken = null; - phoneNumber = createUserSessionRequest.phoneNumber(); - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, sessionId, userId, phoneNumber); - } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) { - loadedFromDisk = true; - userId = loadSessionFromDiskRequest.userId(); - botToken = loadSessionFromDiskRequest.token(); - phoneNumber = loadSessionFromDiskRequest.phoneNumber(); - if (loadSessionFromDiskRequest.phoneNumber() != null) { - reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, sessionId, userId, phoneNumber); - } else { - reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, sessionId, userId, botToken); - } - } else { - return failedFuture(new UnsupportedOperationException("Unexpected value: " + req)); - } - - // Register the session instance to the local nodes map - var prev = localNodeSessions.put(sessionId, reactiveApiPublisher); - if (prev != null) { - LOG.error("Session id \"{}\" was already registered locally!", sessionId); - } - - // Register the session instance to the distributed nodes map - return sessionIdToUserId.put(sessionId, req.userId()).thenComposeAsync(prevDistributed -> { - if (prevDistributed != null) { - LOG.error("Session id \"{}\" was already registered in the cluster!", sessionId); - } - - CompletableFuture saveToDiskFuture; - if (!loadedFromDisk) { - // Load existing session paths - HashSet alreadyExistingPaths = new HashSet<>(); - synchronized (diskSessions) { - for (var entry : diskSessions.getSettings().sessions.entrySet()) { - var path = entry.getKey(); - var diskSessionSettings = entry.getValue(); - if (diskSessionSettings.userId == userId) { - LOG.warn("User id \"{}\" session already exists in path: \"{}\"", userId, path); - } - alreadyExistingPaths.add(entry.getKey()); + return sessionModificationLock + .lock() + .thenCompose(lockVersion -> { + LOG.trace("Obtained session modification lock for session request: {}", req); + // Generate session id + return this.nextFreeLiveId().thenCompose(liveId -> { + // Create the session instance + ReactiveApiPublisher reactiveApiPublisher; + boolean loadedFromDisk; + boolean createNew; + long userId; + String botToken; + Long phoneNumber; + if (req instanceof CreateBotSessionRequest createBotSessionRequest) { + loadedFromDisk = false; + createNew = true; + userId = createBotSessionRequest.userId(); + botToken = createBotSessionRequest.token(); + phoneNumber = null; + reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); + } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) { + loadedFromDisk = false; + createNew = true; + userId = createUserSessionRequest.userId(); + botToken = null; + phoneNumber = createUserSessionRequest.phoneNumber(); + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber); + } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) { + loadedFromDisk = true; + createNew = loadSessionFromDiskRequest.createNew(); + userId = loadSessionFromDiskRequest.userId(); + botToken = loadSessionFromDiskRequest.token(); + phoneNumber = loadSessionFromDiskRequest.phoneNumber(); + if (loadSessionFromDiskRequest.phoneNumber() != null) { + reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber); + } else { + reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); } + } else { + return failedFuture(new UnsupportedOperationException("Unexpected value: " + req)); } - // Get a new disk session folder name - String diskSessionFolderName; - do { - diskSessionFolderName = UUID.randomUUID().toString(); - } while (alreadyExistingPaths.contains(diskSessionFolderName)); - - // Create the disk session configuration - var diskSession = new DiskSession(userId, botToken, phoneNumber); - Path path; - synchronized (diskSessions) { - diskSessions.getSettings().sessions.put(diskSessionFolderName, diskSession); - path = Paths.get(diskSessions.getSettings().path).resolve(diskSessionFolderName); + // Register the session instance to the local nodes map + var prev = localLiveSessions.put(liveId, reactiveApiPublisher); + if (prev != null) { + LOG.error("User id \"{}\" was already registered locally! {}", liveId, prev); } - // Start the session instance - reactiveApiPublisher.start(path); + // Register the session instance to the distributed nodes map + return userIdToNodeId.put(userId, nodeId).thenComposeAsync(prevDistributed -> { + if (prevDistributed != null && prevDistributed.value() != null && + !Objects.equals(this.nodeId, prevDistributed.value())) { + LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value()); + } - saveToDiskFuture = CompletableFuture.runAsync(() -> { - // Save updated sessions configuration to disk - try { + Path baseSessionsPath; + synchronized (diskSessions) { + baseSessionsPath = Paths.get(diskSessions.getSettings().path); + } + String diskSessionFolderName = Long.toUnsignedString(userId); + Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName); + + CompletableFuture saveToDiskFuture; + if (!loadedFromDisk) { + // Create the disk session configuration + var diskSession = new DiskSession(botToken, phoneNumber); synchronized (diskSessions) { - diskSessions.save(); + diskSessions.getSettings().userIdToSession().put(userId, diskSession); } - } catch (IOException e) { - throw new CompletionException("Failed to save disk sessions configuration", e); - } - }, BOUNDED_ELASTIC_EXECUTOR); - } else { - saveToDiskFuture = completedFuture(null); - } - return saveToDiskFuture.thenApply(ignored -> new CreateSessionResponse(sessionId)); - }, BOUNDED_ELASTIC_EXECUTOR); - }); - }); + saveToDiskFuture = CompletableFuture.runAsync(() -> { + // Save updated sessions configuration to disk + try { + synchronized (diskSessions) { + diskSessions.save(); + } + } catch (IOException e) { + throw new CompletionException("Failed to save disk sessions configuration", e); + } + }, BOUNDED_ELASTIC_EXECUTOR); + } else { + saveToDiskFuture = completedFuture(null); + } + + // Start the session instance + reactiveApiPublisher.start(sessionPath); + + return saveToDiskFuture.thenApply(ignored -> new CreateSessionResponse(liveId)); + }, BOUNDED_ELASTIC_EXECUTOR); + }); + }) + .whenComplete((response, error) -> sessionModificationLock + .unlock() + .thenRun(() -> LOG.trace("Released session modification lock for session request: {}", req)) + ) + .whenComplete((resp, ex) -> LOG.debug("Handled session request {}, the response is: {}", req, resp, ex)); } - public CompletableFuture nextFreeId() { - return nextSessionId.nextId().thenCompose(id -> sessionIdToUserId.containsKey(id).thenCompose(exists -> { - if (exists) { - return nextFreeId(); - } else { - return completedFuture(id); - } - })); + public CompletableFuture nextFreeLiveId() { + return nextSessionLiveId.nextId(); } public Atomix getAtomix() { return atomix; } + + /** + * Get the list of current sessions + * @return map of user id -> node id + */ + public Mono> getAllUsers() { + return Flux.defer(() -> { + var it = userIdToNodeId.entrySet().iterator(); + var hasNextMono = fromCompletionStage(it::hasNext); + var strictNextMono = fromCompletionStage(it::next) + .map(elem -> Map.entry(elem.getKey(), elem.getValue().value())); + + var nextOrNothingMono = hasNextMono.flatMap(hasNext -> { + if (hasNext) { + return strictNextMono; + } else { + return Mono.empty(); + } + }); + return nextOrNothingMono.repeatWhen(s -> s.takeWhile(n -> n > 0)); + }).collectMap(Entry::getKey, Entry::getValue); + } + + public boolean is(String nodeId) { + return this.nodeId.equals(nodeId); + } + + private static record DiskSessionAndId(DiskSession diskSession, long id) {} + + private Mono getLocalDiskSession(Long localId) { + return Mono.fromCallable(() -> { + synchronized (diskSessions) { + var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localId), + "Id not found: " + localId + ); + try { + diskSession.validate(); + } catch (Throwable ex) { + LOG.error("Failed to load disk session {}", localId, ex); + return null; + } + return new DiskSessionAndId(diskSession, localId); + } + }).subscribeOn(Schedulers.boundedElastic()); + } } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index c081cf9..8a72f17 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -1,44 +1,51 @@ package it.tdlight.reactiveapi; import io.atomix.core.Atomix; -import it.tdlight.jni.TdApi; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; import java.nio.file.Path; -import java.util.concurrent.Executors; -import org.apache.commons.lang3.SerializationException; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink.OverflowStrategy; +import java.util.StringJoiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.scheduler.Schedulers; public class ReactiveApiPublisher { + + private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic()); private final Atomix atomix; private final long userId; - private final long sessionId; + private final long liveId; private final String botToken; private final Long phoneNumber; - private ReactiveApiPublisher(Atomix atomix, long sessionId, long userId, String botToken, Long phoneNumber) { + private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) { this.atomix = atomix; this.userId = userId; - this.sessionId = sessionId; + this.liveId = liveId; this.botToken = botToken; this.phoneNumber = phoneNumber; } - public static ReactiveApiPublisher fromToken(Atomix atomix, Long sessionId, long userId, String token) { - return new ReactiveApiPublisher(atomix, sessionId, userId, token, null); + public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) { + return new ReactiveApiPublisher(atomix, liveId, userId, token, null); } - public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long sessionId, long userId, long phoneNumber) { - return new ReactiveApiPublisher(atomix, sessionId, userId, null, phoneNumber); + public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) { + return new ReactiveApiPublisher(atomix, liveId, userId, null, phoneNumber); } public void start(Path path) { + LOG.info("Starting session \"{}\" in path \"{}\"", this, path); + } + @Override + public String toString() { + return new StringJoiner(", ", ReactiveApiPublisher.class.getSimpleName() + "[", "]") + .add("userId=" + userId) + .add("liveId=" + liveId) + .add("botToken='" + botToken + "'") + .add("phoneNumber=" + phoneNumber) + .toString(); } } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 307d194..3342724 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -5,7 +5,6 @@ - @@ -14,10 +13,9 @@ - + - \ No newline at end of file +