diff --git a/pom.xml b/pom.xml
index 90f426e..8f454c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,6 @@
UTF-8
0-SNAPSHOT
- 3.1.12
33
@@ -108,31 +107,6 @@
it.unimi.dsi
fastutil
-
- io.atomix
- atomix
- ${atomix.version}
-
-
- org.ow2.asm
- asm
-
-
- io.netty
- netty-handler
-
-
-
-
- io.atomix
- atomix-raft
- ${atomix.version}
-
-
- io.atomix
- atomix-primary-backup
- ${atomix.version}
-
org.ow2.asm
asm
diff --git a/src/main/java/it/tdlight/reactiveapi/Address.java b/src/main/java/it/tdlight/reactiveapi/Address.java
deleted file mode 100644
index 91f45d2..0000000
--- a/src/main/java/it/tdlight/reactiveapi/Address.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package it.tdlight.reactiveapi;
-
-import java.util.Arrays;
-
-public record Address(String host, int port) {
- public Address {
- if (host.isBlank()) {
- throw new IllegalArgumentException("Host is blank");
- }
- if (port < 0) {
- throw new IndexOutOfBoundsException(port);
- }
- if (port >= 65536) {
- throw new IndexOutOfBoundsException(port);
- }
- }
-
- public static Address fromString(String address) {
- var parts = address.split(":");
- if (parts.length < 2) {
- throw new IllegalArgumentException("Malformed client address, it must have a port (host:port)");
- }
- var host = String.join(":", Arrays.copyOf(parts, parts.length - 1));
- var port = Integer.parseUnsignedInt(parts[parts.length - 1]);
- return new Address(host, port);
- }
-}
diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
index 1406dc3..bd3acb9 100644
--- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
+++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
@@ -1,18 +1,7 @@
package it.tdlight.reactiveapi;
-import static it.tdlight.reactiveapi.AtomixUtils.fromCf;
-import static java.util.Collections.unmodifiableSet;
import static java.util.Objects.requireNonNull;
-import static java.util.concurrent.CompletableFuture.failedFuture;
-import com.google.common.primitives.Longs;
-import io.atomix.cluster.messaging.MessagingException;
-import io.atomix.cluster.messaging.Subscription;
-import io.atomix.core.Atomix;
-import io.atomix.core.idgenerator.AsyncAtomicIdGenerator;
-import io.atomix.core.lock.AsyncAtomicLock;
-import io.atomix.core.map.AsyncAtomicMap;
-import io.atomix.protocols.raft.MultiRaftProtocol;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
@@ -20,23 +9,17 @@ import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
+import java.util.concurrent.locks.LockSupport;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -44,460 +27,227 @@ public class AtomixReactiveApi implements ReactiveApi {
private static final Logger LOG = LoggerFactory.getLogger(AtomixReactiveApi.class);
- /**
- * nodeId is null when DiskSessions is null
- */
- @Nullable
- private final String nodeId;
- private final Atomix atomix;
- private final KafkaProducer kafkaProducer;
- private final KafkaConsumer kafkaConsumer;
- private final Set resultingEventTransformerSet;
- private final AsyncAtomicIdGenerator nextSessionLiveId;
+ private final boolean clientOnly;
- private final AsyncAtomicLock sessionModificationLock;
- private final AsyncAtomicMap userIdToNodeId;
+ private final KafkaTdlibClient kafkaTDLibClient;
+ @Nullable
+ private final KafkaTdlibServer kafkaTDLibServer;
+
+ private final Set resultingEventTransformerSet;
/**
- * live id -> session
+ * user id -> session
*/
- private final ConcurrentMap localLiveSessions = new ConcurrentHashMap<>();
+ private final ConcurrentMap localSessions = new ConcurrentHashMap<>();
/**
* DiskSessions is null when nodeId is null
*/
@Nullable
private final DiskSessionsManager diskSessions;
+ private volatile boolean closeRequested;
- public AtomixReactiveApi(@Nullable String nodeId,
- Atomix atomix,
+ public AtomixReactiveApi(boolean clientOnly,
KafkaParameters kafkaParameters,
@Nullable DiskSessionsManager diskSessions,
@NotNull Set resultingEventTransformerSet) {
- this.nodeId = nodeId;
- this.atomix = atomix;
- this.kafkaProducer = new KafkaProducer(kafkaParameters);
- this.kafkaConsumer = new KafkaConsumer(kafkaParameters);
- this.resultingEventTransformerSet = resultingEventTransformerSet;
-
- if (nodeId == null) {
- if (diskSessions != null) {
- throw new IllegalArgumentException("A client must not manage disk sessions");
- }
+ this.clientOnly = clientOnly;
+ var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters);
+ var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters);
+ var kafkaClientBoundConsumer = new KafkaClientBoundConsumer(kafkaParameters);
+ this.kafkaTDLibClient = new KafkaTdlibClient(kafkaTDLibRequestProducer,
+ kafkaTDLibResponseConsumer,
+ kafkaClientBoundConsumer
+ );
+ if (clientOnly) {
+ this.kafkaTDLibServer = null;
} else {
- if (diskSessions == null) {
- throw new IllegalArgumentException("A node must be able to manage disk sessions");
- }
+ var kafkaTDLibRequestConsumer = new KafkaTdlibRequestConsumer(kafkaParameters);
+ var kafkaTDLibResponseProducer = new KafkaTdlibResponseProducer(kafkaParameters);
+ var kafkaClientBoundProducer = new KafkaClientBoundProducer(kafkaParameters);
+ this.kafkaTDLibServer = new KafkaTdlibServer(kafkaTDLibRequestConsumer,
+ kafkaTDLibResponseProducer,
+ kafkaClientBoundProducer
+ );
}
-
- var raftProtocol = MultiRaftProtocol.builder().build();
- this.nextSessionLiveId = atomix
- .atomicIdGeneratorBuilder("session-live-id")
- .withProtocol(raftProtocol)
- .build()
- .async();
- this.sessionModificationLock = atomix
- .atomicLockBuilder("session-modification")
- .withProtocol(raftProtocol)
- .build()
- .async();
-
- this.userIdToNodeId = atomix
- .atomicMapBuilder("user-id-to-node-id")
- //.withCacheEnabled(true)
- //.withCacheSize(4096)
- .withNullValues(false)
- .withProtocol(raftProtocol)
- .build()
- .async();
+ this.resultingEventTransformerSet = resultingEventTransformerSet;
this.diskSessions = diskSessions;
}
@Override
public Mono start() {
- Mono> idsSavedIntoLocalConfiguration = Mono.fromCallable(() -> {
- if (diskSessions == null) {
- return Set.of();
- }
- synchronized (diskSessions) {
- return diskSessions.getSettings().userIdToSession().keySet();
- }
- });
- Mono> distributedIds;
- if (this.nodeId == null) {
- distributedIds = Mono.just(Set.of());
- } else {
- distributedIds = this
- .getAllUsers()
- .flatMapIterable(Map::entrySet)
- .filter(entry -> entry.getValue().equals(this.nodeId))
- .map(Entry::getKey)
- .collect(Collectors.toUnmodifiableSet());
- }
-
- record DiskChanges(Set normalIds, Set addedIds, Set removedIds) {}
-
- var diskChangesMono = Mono.zip(idsSavedIntoLocalConfiguration, distributedIds).map(tuple -> {
- var localSet = tuple.getT1();
- var remoteSet = tuple.getT2();
-
- var deletedUsers = new HashSet<>(remoteSet);
- deletedUsers.removeAll(localSet);
-
- var addedUsers = new HashSet<>(localSet);
- addedUsers.removeAll(remoteSet);
-
- var normalUsers = new HashSet<>(localSet);
- normalUsers.removeAll(addedUsers);
-
- for (long user : addedUsers) {
- LOG.warn("Detected a new user id from the disk configuration file: {}", user);
- }
- for (long user : normalUsers) {
- LOG.info("Detected a user id from the disk configuration file: {}", user);
- }
- for (long user : deletedUsers) {
- LOG.warn("The user id {} has been deleted from the disk configuration file", user);
- }
-
- return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(deletedUsers));
- }).cache();
-
- var removeObsoleteDiskSessions = diskChangesMono
- .flatMapIterable(diskChanges -> diskChanges.removedIds)
- .concatMap(removedIds -> fromCf(() -> destroySession(removedIds, nodeId)))
- .then();
-
- var addedDiskSessionsFlux = diskChangesMono
- .flatMapIterable(diskChanges -> diskChanges.addedIds)
- .concatMap(this::getLocalDiskSession);
- var normalDiskSessionsFlux = diskChangesMono
- .flatMapIterable(diskChanges -> diskChanges.normalIds)
- .concatMap(this::getLocalDiskSession);
-
- var addNewDiskSessions = addedDiskSessionsFlux.concatMap(diskSessionAndId -> {
- var id = diskSessionAndId.id;
- var diskSession = diskSessionAndId.diskSession;
- return createSession(new LoadSessionFromDiskRequest(id,
- diskSession.token,
- diskSession.phoneNumber,
- true
- ));
- }).then();
-
- var loadExistingDiskSessions = normalDiskSessionsFlux.concatMap(diskSessionAndId -> {
- var id = diskSessionAndId.id;
- var diskSession = diskSessionAndId.diskSession;
- return createSession(new LoadSessionFromDiskRequest(id,
- diskSession.token,
- diskSession.phoneNumber,
- false
- ));
- }).then();
-
- var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions)
- .subscribeOn(Schedulers.boundedElastic())
- .doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
-
- // Listen for create-session signals
- Mono createSessionSubscriptionMono;
- if (nodeId != null) {
- createSessionSubscriptionMono = fromCf(() -> atomix
- .getEventService()
- .subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> {
- if (req instanceof LoadSessionFromDiskRequest) {
- return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster"));
- } else {
- return createSession(req).toFuture();
- }
- }, CreateSessionResponse::serializeBytes));
- } else {
- createSessionSubscriptionMono = Mono.empty();
- }
-
- // Listen for revive-session signals
- Mono reviveSessionSubscriptionMono;
- if (nodeId != null) {
- reviveSessionSubscriptionMono = fromCf(() -> atomix
- .getEventService()
- .subscribe("revive-session", (Long userId) -> this.getLocalDiskSession(userId).flatMap(sessionAndId -> {
- var diskSession = sessionAndId.diskSession();
- var request = new LoadSessionFromDiskRequest(userId, diskSession.token, diskSession.phoneNumber, false);
- return this.createSession(request);
- }).onErrorResume(ex -> Mono.empty()).then(Mono.empty()).toFuture()));
- } else {
- reviveSessionSubscriptionMono = Mono.empty();
- }
-
- return diskInitMono.then(Mono.when(createSessionSubscriptionMono, reviveSessionSubscriptionMono));
- }
-
- private CompletableFuture destroySession(long userId, String nodeId) {
- LOG.debug("Received session delete request: user_id={}, node_id=\"{}\"", userId, nodeId);
-
- // Lock sessions modification
- return sessionModificationLock
- .lock()
- .thenCompose(lockVersion -> {
- LOG.trace("Obtained session modification lock for session delete request: {} \"{}\"", userId, nodeId);
- return userIdToNodeId
- .remove(userId, nodeId)
- .thenAccept(deleted -> LOG.debug("Deleted session {} \"{}\": {}", userId, nodeId, deleted));
+ var idsSavedIntoLocalConfiguration = Mono
+ .>>fromCallable(() -> {
+ if (diskSessions == null) {
+ return Set.of();
+ }
+ synchronized (diskSessions) {
+ return diskSessions.getSettings().userIdToSession().entrySet();
+ }
})
- .whenComplete((response, error) -> sessionModificationLock
- .unlock()
- .thenRun(() -> LOG.trace("Released session modification lock for session delete request: {} \"{}\"", userId, nodeId))
- )
- .whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex));
- }
+ .subscribeOn(Schedulers.boundedElastic())
+ .flatMapIterable(a -> a)
+ .map(a -> new DiskSessionAndId(a.getValue(), a.getKey()));
- /**
- * Send a request to the cluster to load that user id from disk
- */
- public Mono tryReviveSession(long userId) {
- return Mono.fromRunnable(() -> atomix.getEventService().broadcast("revive-session", userId));
+ return idsSavedIntoLocalConfiguration
+ .filter(diskSessionAndId -> {
+ try {
+ diskSessionAndId.diskSession().validate();
+ } catch (Throwable ex) {
+ LOG.error("Failed to load disk session {}", diskSessionAndId.id, ex);
+ return false;
+ }
+ return true;
+ })
+ .flatMap(diskSessionAndId -> {
+ var id = diskSessionAndId.id;
+ var diskSession = diskSessionAndId.diskSession;
+ return createSession(new LoadSessionFromDiskRequest(id,
+ diskSession.token,
+ diskSession.phoneNumber,
+ true
+ ));
+ })
+ .then()
+ .doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
}
@Override
public Mono createSession(CreateSessionRequest req) {
LOG.debug("Received create session request: {}", req);
- if (nodeId == null) {
+ if (clientOnly) {
return Mono.error(new UnsupportedOperationException("This is a client, it can't have own sessions"));
}
- Mono unlockedSessionCreationMono = Mono.defer(() -> {
- LOG.trace("Obtained session modification lock for session request: {}", req);
- // Generate session id
- return this
- .nextFreeLiveId()
- .flatMap(liveId -> {
- // Create the session instance
- ReactiveApiPublisher reactiveApiPublisher;
- boolean loadedFromDisk;
- long userId;
- String botToken;
- Long phoneNumber;
- if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
- loadedFromDisk = false;
- userId = createBotSessionRequest.userId();
- botToken = createBotSessionRequest.token();
- phoneNumber = null;
- reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, kafkaProducer, resultingEventTransformerSet,
- liveId,
- userId,
- botToken
- );
- } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
- loadedFromDisk = false;
- userId = createUserSessionRequest.userId();
- botToken = null;
- phoneNumber = createUserSessionRequest.phoneNumber();
- reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, kafkaProducer, resultingEventTransformerSet,
- liveId,
- userId,
- phoneNumber
- );
- } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
- loadedFromDisk = true;
- userId = loadSessionFromDiskRequest.userId();
- botToken = loadSessionFromDiskRequest.token();
- phoneNumber = loadSessionFromDiskRequest.phoneNumber();
- if (loadSessionFromDiskRequest.phoneNumber() != null) {
- reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, kafkaProducer, resultingEventTransformerSet,
- liveId,
- userId,
- phoneNumber
- );
- } else {
- reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, kafkaProducer, resultingEventTransformerSet,
- liveId,
- userId,
- botToken
- );
- }
- } else {
- return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
- }
-
- // Register the session instance to the local nodes map
- var prev = localLiveSessions.put(liveId, reactiveApiPublisher);
- if (prev != null) {
- LOG.error("User id \"{}\" was already registered locally! {}", liveId, prev);
- }
-
- // Register the session instance to the distributed nodes map
- return AtomixUtils
- .fromCf(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable))
- .flatMap(prevDistributed -> {
- if (prevDistributed.isPresent() && prevDistributed.get().value() != null &&
- !Objects.equals(this.nodeId, prevDistributed.get().value())) {
- LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId,
- prevDistributed.get().value());
- }
-
- var saveToDiskMono = Mono
- .fromCallable(() -> {
- // Save updated sessions configuration to disk
- try {
- Objects.requireNonNull(diskSessions);
-
- synchronized (diskSessions) {
- diskSessions.save();
- return null;
- }
- } catch (IOException e) {
- throw new CompletionException("Failed to save disk sessions configuration", e);
- }
- })
- .subscribeOn(Schedulers.boundedElastic());
-
- // Start the session instance
- return Mono
- .fromCallable(() -> {
- Objects.requireNonNull(diskSessions);
- synchronized (diskSessions) {
- return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path),
- "Session " + userId + " path is missing");
- }
- })
- .subscribeOn(Schedulers.boundedElastic())
- .flatMap(baseSessionsPath -> {
- String diskSessionFolderName = "id" + Long.toUnsignedString(userId);
- Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
-
- if (!loadedFromDisk) {
- // Create the disk session configuration
- var diskSession = new DiskSession(botToken, phoneNumber);
- return Mono.fromCallable(() -> {
- Objects.requireNonNull(diskSessions);
- synchronized (diskSessions) {
- diskSessions.getSettings().userIdToSession().put(userId, diskSession);
- return null;
- }
- }).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath);
- } else {
- return Mono.just(sessionPath);
- }
- })
- .doOnNext(path -> reactiveApiPublisher.start(path,
- () -> AtomixReactiveApi.this.onPublisherClosed(userId, liveId)
- ))
- .thenReturn(new CreateSessionResponse(liveId));
- });
- });
- });
-
- // Lock sessions creation
- return Mono
- .usingWhen(AtomixUtils.fromCf(sessionModificationLock::lock),
- lockVersion -> unlockedSessionCreationMono,
- lockVersion -> AtomixUtils
- .fromCf(sessionModificationLock::unlock)
- .doOnTerminate(() -> LOG.trace("Released session modification lock for session request: {}", req))
- )
- .doOnNext(resp -> LOG.debug("Handled session request {}, the response is: {}", req, resp))
- .doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex));
- }
-
- private void onPublisherClosed(long userId, Long liveId) {
- this.destroySession(userId, nodeId).whenComplete((result, ex) -> {
- localLiveSessions.remove(liveId);
- if (ex != null) {
- LOG.error("Failed to close the session for user {} after it was closed itself", userId);
+ // Create the session instance
+ ReactiveApiPublisher reactiveApiPublisher;
+ boolean loadedFromDisk;
+ long userId;
+ String botToken;
+ Long phoneNumber;
+ if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
+ loadedFromDisk = false;
+ userId = createBotSessionRequest.userId();
+ botToken = createBotSessionRequest.token();
+ phoneNumber = null;
+ reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaTDLibServer, resultingEventTransformerSet,
+ userId,
+ botToken
+ );
+ } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
+ loadedFromDisk = false;
+ userId = createUserSessionRequest.userId();
+ botToken = null;
+ phoneNumber = createUserSessionRequest.phoneNumber();
+ reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaTDLibServer, resultingEventTransformerSet,
+ userId,
+ phoneNumber
+ );
+ } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
+ loadedFromDisk = true;
+ userId = loadSessionFromDiskRequest.userId();
+ botToken = loadSessionFromDiskRequest.token();
+ phoneNumber = loadSessionFromDiskRequest.phoneNumber();
+ if (loadSessionFromDiskRequest.phoneNumber() != null) {
+ reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaTDLibServer,
+ resultingEventTransformerSet,
+ userId,
+ phoneNumber
+ );
} else {
- LOG.debug("Closed the session for user {} after it was closed itself", userId);
+ reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaTDLibServer,
+ resultingEventTransformerSet,
+ userId,
+ botToken
+ );
}
- });
- }
-
- private Mono nextFreeLiveId() {
- return fromCf(nextSessionLiveId::nextId);
- }
-
- public Atomix getAtomix() {
- return atomix;
- }
-
- /**
- * Get the list of current sessions
- * @return map of user id -> node id
- */
- @Override
- public Mono