Implement live and dynamic clients
This commit is contained in:
parent
af96cfb7dc
commit
473783b501
438
src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
Normal file
438
src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java
Normal file
@ -0,0 +1,438 @@
|
|||||||
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import static java.util.Collections.unmodifiableSet;
|
||||||
|
import static java.util.Objects.requireNonNull;
|
||||||
|
import static java.util.concurrent.CompletableFuture.failedFuture;
|
||||||
|
import static reactor.core.publisher.Mono.fromCompletionStage;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
|
import io.atomix.cluster.messaging.MessagingException;
|
||||||
|
import io.atomix.cluster.messaging.Subscription;
|
||||||
|
import io.atomix.core.Atomix;
|
||||||
|
import io.atomix.core.idgenerator.AsyncAtomicIdGenerator;
|
||||||
|
import io.atomix.core.lock.AsyncAtomicLock;
|
||||||
|
import io.atomix.core.map.AsyncAtomicMap;
|
||||||
|
import io.atomix.protocols.raft.MultiRaftProtocol;
|
||||||
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
|
||||||
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
||||||
|
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.CompletionException;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
|
public class AtomixReactiveApi implements ReactiveApi {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(AtomixReactiveApi.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* nodeId is null when DiskSessions is null
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
private final String nodeId;
|
||||||
|
private final Atomix atomix;
|
||||||
|
private final AsyncAtomicIdGenerator nextSessionLiveId;
|
||||||
|
|
||||||
|
private final AsyncAtomicLock sessionModificationLock;
|
||||||
|
private final AsyncAtomicMap<Long, String> userIdToNodeId;
|
||||||
|
/**
|
||||||
|
* User id -> session
|
||||||
|
*/
|
||||||
|
private final ConcurrentMap<Long, ReactiveApiPublisher> localLiveSessions = new ConcurrentHashMap<>();
|
||||||
|
/**
|
||||||
|
* DiskSessions is null when nodeId is null
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
private final DiskSessionsManager diskSessions;
|
||||||
|
|
||||||
|
public AtomixReactiveApi(@Nullable String nodeId, Atomix atomix, @Nullable DiskSessionsManager diskSessions) {
|
||||||
|
this.nodeId = nodeId;
|
||||||
|
this.atomix = atomix;
|
||||||
|
|
||||||
|
if (nodeId == null) {
|
||||||
|
if (diskSessions != null) {
|
||||||
|
throw new IllegalArgumentException("A client must not manage disk sessions");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (diskSessions == null) {
|
||||||
|
throw new IllegalArgumentException("A node must be able to manage disk sessions");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var raftProtocol = MultiRaftProtocol.builder().build();
|
||||||
|
this.nextSessionLiveId = atomix
|
||||||
|
.atomicIdGeneratorBuilder("session-live-id")
|
||||||
|
.withProtocol(raftProtocol)
|
||||||
|
.build()
|
||||||
|
.async();
|
||||||
|
this.sessionModificationLock = atomix
|
||||||
|
.atomicLockBuilder("session-modification")
|
||||||
|
.withProtocol(raftProtocol)
|
||||||
|
.build()
|
||||||
|
.async();
|
||||||
|
|
||||||
|
this.userIdToNodeId = atomix
|
||||||
|
.<Long, String>atomicMapBuilder("user-id-to-node-id")
|
||||||
|
//.withCacheEnabled(true)
|
||||||
|
//.withCacheSize(4096)
|
||||||
|
.withNullValues(false)
|
||||||
|
.withProtocol(raftProtocol)
|
||||||
|
.build()
|
||||||
|
.async();
|
||||||
|
|
||||||
|
this.diskSessions = diskSessions;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Void> start() {
|
||||||
|
Mono<Set<Long>> idsSavedIntoLocalConfiguration = Mono.fromCallable(() -> {
|
||||||
|
if (diskSessions == null) {
|
||||||
|
return Set.of();
|
||||||
|
}
|
||||||
|
synchronized (diskSessions) {
|
||||||
|
return diskSessions.getSettings().userIdToSession().keySet();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Mono<Set<Long>> distributedIds;
|
||||||
|
if (this.nodeId == null) {
|
||||||
|
distributedIds = Mono.just(Set.of());
|
||||||
|
} else {
|
||||||
|
distributedIds = this
|
||||||
|
.getAllUsers()
|
||||||
|
.flatMapIterable(Map::entrySet)
|
||||||
|
.filter(entry -> entry.getValue().equals(this.nodeId))
|
||||||
|
.map(Entry::getKey)
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
}
|
||||||
|
|
||||||
|
record DiskChanges(Set<Long> normalIds, Set<Long> addedIds, Set<Long> removedIds) {}
|
||||||
|
|
||||||
|
var diskChangesMono = Mono.zip(idsSavedIntoLocalConfiguration, distributedIds).map(tuple -> {
|
||||||
|
var localSet = tuple.getT1();
|
||||||
|
var remoteSet = tuple.getT2();
|
||||||
|
|
||||||
|
var deletedUsers = new HashSet<>(remoteSet);
|
||||||
|
deletedUsers.removeAll(localSet);
|
||||||
|
|
||||||
|
var addedUsers = new HashSet<>(localSet);
|
||||||
|
addedUsers.removeAll(remoteSet);
|
||||||
|
|
||||||
|
var normalUsers = new HashSet<>(localSet);
|
||||||
|
normalUsers.removeAll(addedUsers);
|
||||||
|
|
||||||
|
for (long user : addedUsers) {
|
||||||
|
LOG.warn("Detected a new user id from the disk configuration file: {}", user);
|
||||||
|
}
|
||||||
|
for (long user : normalUsers) {
|
||||||
|
LOG.info("Detected a user id from the disk configuration file: {}", user);
|
||||||
|
}
|
||||||
|
for (long user : deletedUsers) {
|
||||||
|
LOG.warn("The user id {} has been deleted from the disk configuration file", user);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(deletedUsers));
|
||||||
|
}).cache();
|
||||||
|
|
||||||
|
var removeObsoleteDiskSessions = diskChangesMono
|
||||||
|
.flatMapIterable(diskChanges -> diskChanges.removedIds)
|
||||||
|
.flatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId)))
|
||||||
|
.then();
|
||||||
|
|
||||||
|
var addedDiskSessionsFlux = diskChangesMono
|
||||||
|
.flatMapIterable(diskChanges -> diskChanges.addedIds)
|
||||||
|
.flatMap(this::getLocalDiskSession);
|
||||||
|
var normalDiskSessionsFlux = diskChangesMono
|
||||||
|
.flatMapIterable(diskChanges -> diskChanges.normalIds)
|
||||||
|
.flatMap(this::getLocalDiskSession);
|
||||||
|
|
||||||
|
var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> {
|
||||||
|
var id = diskSessionAndId.id;
|
||||||
|
var diskSession = diskSessionAndId.diskSession;
|
||||||
|
return createSession(new LoadSessionFromDiskRequest(id,
|
||||||
|
diskSession.token,
|
||||||
|
diskSession.phoneNumber,
|
||||||
|
true
|
||||||
|
));
|
||||||
|
}).then();
|
||||||
|
|
||||||
|
var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> {
|
||||||
|
var id = diskSessionAndId.id;
|
||||||
|
var diskSession = diskSessionAndId.diskSession;
|
||||||
|
return createSession(new LoadSessionFromDiskRequest(id,
|
||||||
|
diskSession.token,
|
||||||
|
diskSession.phoneNumber,
|
||||||
|
false
|
||||||
|
));
|
||||||
|
}).then();
|
||||||
|
|
||||||
|
var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions)
|
||||||
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
|
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
|
||||||
|
|
||||||
|
// Listen for create-session signals
|
||||||
|
Mono<Subscription> subscriptionMono;
|
||||||
|
if (nodeId != null) {
|
||||||
|
subscriptionMono = fromCompletionStage(() -> atomix
|
||||||
|
.getEventService()
|
||||||
|
.subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> {
|
||||||
|
if (req instanceof LoadSessionFromDiskRequest) {
|
||||||
|
return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster"));
|
||||||
|
} else {
|
||||||
|
return createSession(req).toFuture();
|
||||||
|
}
|
||||||
|
}, CreateSessionResponse::serializeBytes));
|
||||||
|
} else {
|
||||||
|
subscriptionMono = Mono.empty();
|
||||||
|
}
|
||||||
|
return diskInitMono.then(subscriptionMono).then();
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Void> destroySession(long userId, String nodeId) {
|
||||||
|
LOG.debug("Received session delete request: user_id={}, node_id=\"{}\"", userId, nodeId);
|
||||||
|
|
||||||
|
// Lock sessions modification
|
||||||
|
return sessionModificationLock
|
||||||
|
.lock()
|
||||||
|
.thenCompose(lockVersion -> {
|
||||||
|
LOG.trace("Obtained session modification lock for session delete request: {} \"{}\"", userId, nodeId);
|
||||||
|
return userIdToNodeId
|
||||||
|
.remove(userId, nodeId)
|
||||||
|
.thenAccept(deleted -> LOG.debug("Deleted session {} \"{}\": {}", userId, nodeId, deleted));
|
||||||
|
})
|
||||||
|
.whenComplete((response, error) -> sessionModificationLock
|
||||||
|
.unlock()
|
||||||
|
.thenRun(() -> LOG.trace("Released session modification lock for session delete request: {} \"{}\"", userId, nodeId))
|
||||||
|
)
|
||||||
|
.whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<CreateSessionResponse> createSession(CreateSessionRequest req) {
|
||||||
|
LOG.debug("Received create session request: {}", req);
|
||||||
|
|
||||||
|
if (nodeId == null) {
|
||||||
|
return Mono.error(new UnsupportedOperationException("This is a client, it can't have own sessions"));
|
||||||
|
}
|
||||||
|
|
||||||
|
Mono<CreateSessionResponse> unlockedSessionCreationMono = Mono.defer(() -> {
|
||||||
|
LOG.trace("Obtained session modification lock for session request: {}", req);
|
||||||
|
// Generate session id
|
||||||
|
return this
|
||||||
|
.nextFreeLiveId()
|
||||||
|
.flatMap(liveId -> {
|
||||||
|
// Create the session instance
|
||||||
|
ReactiveApiPublisher reactiveApiPublisher;
|
||||||
|
boolean loadedFromDisk;
|
||||||
|
long userId;
|
||||||
|
String botToken;
|
||||||
|
Long phoneNumber;
|
||||||
|
if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
|
||||||
|
loadedFromDisk = false;
|
||||||
|
userId = createBotSessionRequest.userId();
|
||||||
|
botToken = createBotSessionRequest.token();
|
||||||
|
phoneNumber = null;
|
||||||
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
||||||
|
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
|
||||||
|
loadedFromDisk = false;
|
||||||
|
userId = createUserSessionRequest.userId();
|
||||||
|
botToken = null;
|
||||||
|
phoneNumber = createUserSessionRequest.phoneNumber();
|
||||||
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
||||||
|
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
|
||||||
|
loadedFromDisk = true;
|
||||||
|
userId = loadSessionFromDiskRequest.userId();
|
||||||
|
botToken = loadSessionFromDiskRequest.token();
|
||||||
|
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
|
||||||
|
if (loadSessionFromDiskRequest.phoneNumber() != null) {
|
||||||
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
||||||
|
} else {
|
||||||
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the session instance to the local nodes map
|
||||||
|
var prev = localLiveSessions.put(liveId, reactiveApiPublisher);
|
||||||
|
if (prev != null) {
|
||||||
|
LOG.error("User id \"{}\" was already registered locally! {}", liveId, prev);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the session instance to the distributed nodes map
|
||||||
|
return Mono
|
||||||
|
.fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable))
|
||||||
|
.flatMap(prevDistributed -> {
|
||||||
|
if (prevDistributed.isPresent() && prevDistributed.get().value() != null &&
|
||||||
|
!Objects.equals(this.nodeId, prevDistributed.get().value())) {
|
||||||
|
LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId,
|
||||||
|
prevDistributed.get().value());
|
||||||
|
}
|
||||||
|
|
||||||
|
var saveToDiskMono = Mono
|
||||||
|
.<Void>fromCallable(() -> {
|
||||||
|
// Save updated sessions configuration to disk
|
||||||
|
try {
|
||||||
|
Objects.requireNonNull(diskSessions);
|
||||||
|
|
||||||
|
synchronized (diskSessions) {
|
||||||
|
diskSessions.save();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new CompletionException("Failed to save disk sessions configuration", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.subscribeOn(Schedulers.boundedElastic());
|
||||||
|
|
||||||
|
// Start the session instance
|
||||||
|
return Mono
|
||||||
|
.fromCallable(() -> {
|
||||||
|
Objects.requireNonNull(diskSessions);
|
||||||
|
synchronized (diskSessions) {
|
||||||
|
return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path),
|
||||||
|
"Session " + userId + " path is missing");
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
|
.flatMap(baseSessionsPath -> {
|
||||||
|
String diskSessionFolderName = Long.toUnsignedString(userId);
|
||||||
|
Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
|
||||||
|
|
||||||
|
if (!loadedFromDisk) {
|
||||||
|
// Create the disk session configuration
|
||||||
|
var diskSession = new DiskSession(botToken, phoneNumber);
|
||||||
|
return Mono.<Void>fromCallable(() -> {
|
||||||
|
Objects.requireNonNull(diskSessions);
|
||||||
|
synchronized (diskSessions) {
|
||||||
|
diskSessions.getSettings().userIdToSession().put(userId, diskSession);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath);
|
||||||
|
} else {
|
||||||
|
return Mono.just(sessionPath);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.doOnNext(path -> reactiveApiPublisher.start(path,
|
||||||
|
() -> AtomixReactiveApi.this.onPublisherClosed(userId)
|
||||||
|
))
|
||||||
|
.thenReturn(new CreateSessionResponse(liveId));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Lock sessions creation
|
||||||
|
return Mono
|
||||||
|
.usingWhen(Mono.fromCompletionStage(sessionModificationLock::lock),
|
||||||
|
lockVersion -> unlockedSessionCreationMono,
|
||||||
|
lockVersion -> Mono
|
||||||
|
.fromCompletionStage(sessionModificationLock::unlock)
|
||||||
|
.doOnTerminate(() -> LOG.trace("Released session modification lock for session request: {}", req))
|
||||||
|
)
|
||||||
|
.doOnNext(resp -> LOG.debug("Handled session request {}, the response is: {}", req, resp))
|
||||||
|
.doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onPublisherClosed(long userId) {
|
||||||
|
this.destroySession(userId, nodeId).whenComplete((result, ex) -> {
|
||||||
|
if (ex != null) {
|
||||||
|
LOG.error("Failed to close the session for user {} after it was closed itself", userId);
|
||||||
|
} else {
|
||||||
|
LOG.debug("Closed the session for user {} after it was closed itself", userId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Mono<Long> nextFreeLiveId() {
|
||||||
|
return Mono.fromCompletionStage(nextSessionLiveId::nextId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Atomix getAtomix() {
|
||||||
|
return atomix;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of current sessions
|
||||||
|
* @return map of user id -> node id
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Mono<Map<Long, String>> getAllUsers() {
|
||||||
|
return Flux.defer(() -> {
|
||||||
|
var it = userIdToNodeId.entrySet().iterator();
|
||||||
|
var hasNextMono = fromCompletionStage(it::hasNext);
|
||||||
|
var strictNextMono = fromCompletionStage(it::next)
|
||||||
|
.map(elem -> Map.entry(elem.getKey(), elem.getValue().value()));
|
||||||
|
|
||||||
|
var nextOrNothingMono = hasNextMono.flatMap(hasNext -> {
|
||||||
|
if (hasNext) {
|
||||||
|
return strictNextMono;
|
||||||
|
} else {
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return nextOrNothingMono.repeatWhen(s -> s.takeWhile(n -> n > 0));
|
||||||
|
}).collectMap(Entry::getKey, Entry::getValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean is(String nodeId) {
|
||||||
|
if (this.nodeId == null) {
|
||||||
|
return nodeId == null;
|
||||||
|
}
|
||||||
|
return this.nodeId.equals(nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Long> resolveUserLiveId(long userId) {
|
||||||
|
return Mono.fromCompletionStage(() -> atomix
|
||||||
|
.getEventService()
|
||||||
|
.send(SubjectNaming.getDynamicIdResolveSubject(userId), userId, Longs::toByteArray, Longs::fromByteArray))
|
||||||
|
.onErrorResume(ex -> {
|
||||||
|
if (ex instanceof MessagingException.NoRemoteHandler) {
|
||||||
|
return Mono.empty();
|
||||||
|
} else {
|
||||||
|
return Mono.error(ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Mono<Void> close() {
|
||||||
|
return Mono.fromCompletionStage(this.atomix::stop);
|
||||||
|
}
|
||||||
|
|
||||||
|
private record DiskSessionAndId(DiskSession diskSession, long id) {}
|
||||||
|
|
||||||
|
private Mono<DiskSessionAndId> getLocalDiskSession(Long localId) {
|
||||||
|
return Mono.fromCallable(() -> {
|
||||||
|
Objects.requireNonNull(diskSessions);
|
||||||
|
synchronized (diskSessions) {
|
||||||
|
var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localId),
|
||||||
|
"Id not found: " + localId
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
diskSession.validate();
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.error("Failed to load disk session {}", localId, ex);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new DiskSessionAndId(diskSession, localId);
|
||||||
|
}
|
||||||
|
}).subscribeOn(Schedulers.boundedElastic());
|
||||||
|
}
|
||||||
|
}
|
@ -21,7 +21,7 @@ public class Cli {
|
|||||||
public static void main(String[] args) throws IOException {
|
public static void main(String[] args) throws IOException {
|
||||||
var validArgs = Entrypoint.parseArguments(args);
|
var validArgs = Entrypoint.parseArguments(args);
|
||||||
var atomixBuilder = Atomix.builder();
|
var atomixBuilder = Atomix.builder();
|
||||||
var api = Entrypoint.start(validArgs, atomixBuilder);
|
var api = (AtomixReactiveApi) Entrypoint.start(validArgs, atomixBuilder);
|
||||||
|
|
||||||
AtomicBoolean alreadyShutDown = new AtomicBoolean(false);
|
AtomicBoolean alreadyShutDown = new AtomicBoolean(false);
|
||||||
AtomicBoolean acceptInputs = new AtomicBoolean(true);
|
AtomicBoolean acceptInputs = new AtomicBoolean(true);
|
||||||
|
@ -0,0 +1,130 @@
|
|||||||
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import io.atomix.cluster.messaging.ClusterEventService;
|
||||||
|
import io.atomix.cluster.messaging.MessagingException;
|
||||||
|
import io.atomix.cluster.messaging.Subscription;
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
||||||
|
import it.tdlight.reactiveapi.Event.Request;
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import reactor.core.Disposable;
|
||||||
|
import reactor.core.publisher.BufferOverflowStrategy;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
|
public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCloseable {
|
||||||
|
|
||||||
|
private static final long LIVE_ID_UNSET = -1L;
|
||||||
|
private static final long LIVE_ID_FAILED = -2L;
|
||||||
|
|
||||||
|
private final ReactiveApi api;
|
||||||
|
private final ClusterEventService eventService;
|
||||||
|
private final AtomicLong liveId = new AtomicLong(LIVE_ID_UNSET);
|
||||||
|
private final Disposable liveIdSubscription;
|
||||||
|
private final long userId;
|
||||||
|
|
||||||
|
private final Flux<ClientBoundEvent> clientBoundEvents;
|
||||||
|
private final Flux<Long> liveIdChange;
|
||||||
|
private final Mono<Long> liveIdResolution;
|
||||||
|
|
||||||
|
public DynamicAtomixReactiveApiClient(AtomixReactiveApi api, long userId) {
|
||||||
|
this.api = api;
|
||||||
|
this.eventService = api.getAtomix().getEventService();
|
||||||
|
this.userId = userId;
|
||||||
|
|
||||||
|
clientBoundEvents = Flux
|
||||||
|
.<ClientBoundEvent>push(sink -> {
|
||||||
|
var subscriptionFuture = eventService.subscribe("session-client-bound-events",
|
||||||
|
LiveAtomixReactiveApiClient::deserializeEvent,
|
||||||
|
s -> {
|
||||||
|
sink.next(s);
|
||||||
|
return CompletableFuture.completedFuture(null);
|
||||||
|
},
|
||||||
|
(a) -> null
|
||||||
|
);
|
||||||
|
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
|
||||||
|
}, OverflowStrategy.ERROR)
|
||||||
|
.filter(e -> e.userId() == userId)
|
||||||
|
.doOnNext(e -> liveId.set(e.liveId()))
|
||||||
|
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
|
||||||
|
.share();
|
||||||
|
|
||||||
|
liveIdChange = this.clientBoundEvents()
|
||||||
|
.sample(Duration.ofSeconds(1))
|
||||||
|
.map(Event::liveId)
|
||||||
|
.distinctUntilChanged();
|
||||||
|
|
||||||
|
this.liveIdSubscription = liveIdChange.subscribeOn(Schedulers.parallel()).subscribe(liveId::set);
|
||||||
|
this.liveIdResolution = this.resolveLiveId();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Flux<ClientBoundEvent> clientBoundEvents() {
|
||||||
|
return clientBoundEvents;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
|
||||||
|
return liveIdResolution
|
||||||
|
.flatMap(liveId -> Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
|
||||||
|
new Request<>(liveId, request, timeout),
|
||||||
|
LiveAtomixReactiveApiClient::serializeRequest,
|
||||||
|
LiveAtomixReactiveApiClient::deserializeResponse,
|
||||||
|
Duration.between(Instant.now(), timeout)
|
||||||
|
)))
|
||||||
|
.<T>handle((item, sink) -> {
|
||||||
|
if (item instanceof TdApi.Error error) {
|
||||||
|
sink.error(new TdError(error.code, error.message));
|
||||||
|
} else {
|
||||||
|
//noinspection unchecked
|
||||||
|
sink.next((T) item);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.onErrorMap(ex -> {
|
||||||
|
if (ex instanceof MessagingException.NoRemoteHandler) {
|
||||||
|
return new TdError(404, "Bot #IDU" + this.userId + " is not found on the cluster");
|
||||||
|
} else {
|
||||||
|
return ex;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Mono<Long> resolveLiveId() {
|
||||||
|
return Mono
|
||||||
|
.fromSupplier(this.liveId::get)
|
||||||
|
.flatMap(liveId -> {
|
||||||
|
if (liveId == LIVE_ID_UNSET) {
|
||||||
|
return api.resolveUserLiveId(userId)
|
||||||
|
.switchIfEmpty(Mono.error(this::createLiveIdFailed))
|
||||||
|
.doOnError(ex -> this.liveId.compareAndSet(LIVE_ID_UNSET, LIVE_ID_FAILED));
|
||||||
|
} else if (liveId == LIVE_ID_FAILED) {
|
||||||
|
return Mono.error(createLiveIdFailed());
|
||||||
|
} else {
|
||||||
|
return Mono.just(liveId);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private Throwable createLiveIdFailed() {
|
||||||
|
return new TdError(404, "Bot #IDU" + this.userId
|
||||||
|
+ " is not found on the cluster, no live id has been associated with it locally");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getUserId() {
|
||||||
|
return userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Flux<Long> liveIdChange() {
|
||||||
|
return liveIdChange;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
liveIdSubscription.dispose();
|
||||||
|
}
|
||||||
|
}
|
@ -8,14 +8,13 @@ import io.atomix.core.Atomix;
|
|||||||
import io.atomix.core.AtomixBuilder;
|
import io.atomix.core.AtomixBuilder;
|
||||||
import io.atomix.core.profile.ConsensusProfileConfig;
|
import io.atomix.core.profile.ConsensusProfileConfig;
|
||||||
import io.atomix.core.profile.Profile;
|
import io.atomix.core.profile.Profile;
|
||||||
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup;
|
|
||||||
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
|
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
|
||||||
import io.atomix.storage.StorageLevel;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import org.jetbrains.annotations.Nullable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -53,16 +52,35 @@ public class Entrypoint {
|
|||||||
String diskSessionsConfigPath = args.diskSessionsPath;
|
String diskSessionsConfigPath = args.diskSessionsPath;
|
||||||
clusterSettings = mapper.readValue(Paths.get(clusterConfigPath).toFile(), ClusterSettings.class);
|
clusterSettings = mapper.readValue(Paths.get(clusterConfigPath).toFile(), ClusterSettings.class);
|
||||||
instanceSettings = mapper.readValue(Paths.get(instanceConfigPath).toFile(), InstanceSettings.class);
|
instanceSettings = mapper.readValue(Paths.get(instanceConfigPath).toFile(), InstanceSettings.class);
|
||||||
|
if (instanceSettings.client) {
|
||||||
|
diskSessions = null;
|
||||||
|
} else {
|
||||||
diskSessions = new DiskSessionsManager(mapper, diskSessionsConfigPath);
|
diskSessions = new DiskSessionsManager(mapper, diskSessionsConfigPath);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return start(clusterSettings, instanceSettings, diskSessions, atomixBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ReactiveApi start(ClusterSettings clusterSettings,
|
||||||
|
InstanceSettings instanceSettings,
|
||||||
|
@Nullable DiskSessionsManager diskSessions,
|
||||||
|
AtomixBuilder atomixBuilder) {
|
||||||
|
|
||||||
|
atomixBuilder.withCompatibleSerialization(false);
|
||||||
atomixBuilder.withClusterId(clusterSettings.id);
|
atomixBuilder.withClusterId(clusterSettings.id);
|
||||||
|
|
||||||
String nodeId;
|
String nodeId;
|
||||||
if (instanceSettings.client) {
|
if (instanceSettings.client) {
|
||||||
|
if (diskSessions != null) {
|
||||||
|
throw new IllegalArgumentException("A client instance can't have a session manager!");
|
||||||
|
}
|
||||||
atomixBuilder.withMemberId(instanceSettings.id).withAddress(instanceSettings.clientAddress);
|
atomixBuilder.withMemberId(instanceSettings.id).withAddress(instanceSettings.clientAddress);
|
||||||
nodeId = null;
|
nodeId = null;
|
||||||
} else {
|
} else {
|
||||||
|
if (diskSessions == null) {
|
||||||
|
throw new IllegalArgumentException("A full instance must have a session manager!");
|
||||||
|
}
|
||||||
// Find node settings
|
// Find node settings
|
||||||
var nodeSettingsOptional = clusterSettings.nodes
|
var nodeSettingsOptional = clusterSettings.nodes
|
||||||
.stream()
|
.stream()
|
||||||
@ -115,15 +133,13 @@ public class Entrypoint {
|
|||||||
//atomixBuilder.addProfile(Profile.dataGrid(32));
|
//atomixBuilder.addProfile(Profile.dataGrid(32));
|
||||||
}
|
}
|
||||||
|
|
||||||
atomixBuilder.withCompatibleSerialization(false);
|
|
||||||
|
|
||||||
var atomix = atomixBuilder.build();
|
var atomix = atomixBuilder.build();
|
||||||
|
|
||||||
TdSerializer.register(atomix.getSerializationService());
|
TdSerializer.register(atomix.getSerializationService());
|
||||||
|
|
||||||
atomix.start().join();
|
atomix.start().join();
|
||||||
|
|
||||||
var api = new ReactiveApi(nodeId, atomix, diskSessions);
|
var api = new AtomixReactiveApi(nodeId, atomix, diskSessions);
|
||||||
|
|
||||||
LOG.info("Starting ReactiveApi...");
|
LOG.info("Starting ReactiveApi...");
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ public sealed interface Event permits ClientBoundEvent, ServerBoundEvent {
|
|||||||
|
|
||||||
record OnBotLoginCodeRequested(long liveId, long userId, String token) implements OnLoginCodeRequested {}
|
record OnBotLoginCodeRequested(long liveId, long userId, String token) implements OnLoginCodeRequested {}
|
||||||
|
|
||||||
record OnOtherDeviceLoginRequested(long liveId, long userId) implements ClientBoundEvent {}
|
record OnOtherDeviceLoginRequested(long liveId, long userId, String link) implements ClientBoundEvent {}
|
||||||
|
|
||||||
record OnPasswordRequested(long liveId, long userId) implements ClientBoundEvent {}
|
record OnPasswordRequested(long liveId, long userId) implements ClientBoundEvent {}
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import io.atomix.core.Atomix;
|
|||||||
import it.tdlight.jni.TdApi;
|
import it.tdlight.jni.TdApi;
|
||||||
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
||||||
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
|
import it.tdlight.reactiveapi.Event.OnBotLoginCodeRequested;
|
||||||
|
import it.tdlight.reactiveapi.Event.OnOtherDeviceLoginRequested;
|
||||||
import it.tdlight.reactiveapi.Event.OnUpdateData;
|
import it.tdlight.reactiveapi.Event.OnUpdateData;
|
||||||
import it.tdlight.reactiveapi.Event.OnUpdateError;
|
import it.tdlight.reactiveapi.Event.OnUpdateError;
|
||||||
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
|
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
|
||||||
@ -24,53 +25,67 @@ import reactor.core.publisher.Flux;
|
|||||||
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
|
||||||
public class AtomixReactiveApiClient implements ReactiveApiClient {
|
public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
|
||||||
|
|
||||||
private final ClusterEventService eventService;
|
private final ClusterEventService eventService;
|
||||||
private final long liveId;
|
private final long liveId;
|
||||||
private final long userId;
|
private final long userId;
|
||||||
|
|
||||||
public AtomixReactiveApiClient(Atomix atomix, long liveId, long userId) {
|
private final Flux<ClientBoundEvent> clientBoundEvents;
|
||||||
|
|
||||||
|
public LiveAtomixReactiveApiClient(Atomix atomix, long liveId, long userId) {
|
||||||
this.eventService = atomix.getEventService();
|
this.eventService = atomix.getEventService();
|
||||||
this.liveId = liveId;
|
this.liveId = liveId;
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
|
this.clientBoundEvents = Flux
|
||||||
|
.<ClientBoundEvent>push(sink -> {
|
||||||
|
var subscriptionFuture = eventService.subscribe("session-client-bound-events", LiveAtomixReactiveApiClient::deserializeEvent, s -> {
|
||||||
|
sink.next(s);
|
||||||
|
return CompletableFuture.completedFuture(null);
|
||||||
|
}, (a) -> null);
|
||||||
|
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
|
||||||
|
}, OverflowStrategy.ERROR)
|
||||||
|
.filter(e -> e.userId() == userId && e.liveId() == liveId)
|
||||||
|
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
|
||||||
|
.share();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<ClientBoundEvent> clientBoundEvents() {
|
public Flux<ClientBoundEvent> clientBoundEvents() {
|
||||||
return Flux.<ClientBoundEvent>push(sink -> {
|
return clientBoundEvents;
|
||||||
var subscriptionFuture = eventService.subscribe("session-" + liveId + "-client-bound-events",
|
|
||||||
this::deserializeEvent,
|
|
||||||
s -> {
|
|
||||||
sink.next(s);
|
|
||||||
return CompletableFuture.completedFuture(null);
|
|
||||||
},
|
|
||||||
(a) -> null
|
|
||||||
);
|
|
||||||
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
|
|
||||||
}, OverflowStrategy.ERROR).onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
|
public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
|
||||||
return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
|
return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
|
||||||
new Request<>(liveId, request, timeout),
|
new Request<>(liveId, request, timeout),
|
||||||
AtomixReactiveApiClient::serializeRequest,
|
LiveAtomixReactiveApiClient::serializeRequest,
|
||||||
AtomixReactiveApiClient::deserializeResponse,
|
LiveAtomixReactiveApiClient::deserializeResponse,
|
||||||
Duration.between(Instant.now(), timeout)
|
Duration.between(Instant.now(), timeout)
|
||||||
));
|
)).handle((item, sink) -> {
|
||||||
|
if (item instanceof TdApi.Error error) {
|
||||||
|
sink.error(new TdError(error.code, error.message));
|
||||||
|
} else {
|
||||||
|
//noinspection unchecked
|
||||||
|
sink.next((T) item);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@Override
|
||||||
private static <R extends TdApi.Object> R deserializeResponse(byte[] bytes) {
|
public long getUserId() {
|
||||||
|
return userId;
|
||||||
|
}
|
||||||
|
|
||||||
|
static TdApi.Object deserializeResponse(byte[] bytes) {
|
||||||
try {
|
try {
|
||||||
return (R) TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
|
return TdApi.Deserializer.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new SerializationException(ex);
|
throw new SerializationException(ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] serializeRequest(Request<?> request) {
|
static byte[] serializeRequest(Request<?> request) {
|
||||||
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
|
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
|
||||||
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
|
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
|
||||||
dataOutputStream.writeLong(request.liveId());
|
dataOutputStream.writeLong(request.liveId());
|
||||||
@ -83,14 +98,17 @@ public class AtomixReactiveApiClient implements ReactiveApiClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClientBoundEvent deserializeEvent(byte[] bytes) {
|
static ClientBoundEvent deserializeEvent(byte[] bytes) {
|
||||||
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
|
try (var byteArrayInputStream = new ByteArrayInputStream(bytes)) {
|
||||||
try (var dataInputStream = new DataInputStream(byteArrayInputStream)) {
|
try (var dataInputStream = new DataInputStream(byteArrayInputStream)) {
|
||||||
|
var liveId = dataInputStream.readLong();
|
||||||
|
var userId = dataInputStream.readLong();
|
||||||
return switch (dataInputStream.readByte()) {
|
return switch (dataInputStream.readByte()) {
|
||||||
case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(dataInputStream));
|
case 0x01 -> new OnUpdateData(liveId, userId, (TdApi.Update) TdApi.Deserializer.deserialize(dataInputStream));
|
||||||
case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(dataInputStream));
|
case 0x02 -> new OnUpdateError(liveId, userId, (TdApi.Error) TdApi.Deserializer.deserialize(dataInputStream));
|
||||||
case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, dataInputStream.readLong());
|
case 0x03 -> new OnUserLoginCodeRequested(liveId, userId, dataInputStream.readLong());
|
||||||
case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, dataInputStream.readUTF());
|
case 0x04 -> new OnBotLoginCodeRequested(liveId, userId, dataInputStream.readUTF());
|
||||||
|
case 0x05 -> new OnOtherDeviceLoginRequested(liveId, userId, dataInputStream.readUTF());
|
||||||
default -> throw new IllegalStateException("Unexpected value: " + dataInputStream.readByte());
|
default -> throw new IllegalStateException("Unexpected value: " + dataInputStream.readByte());
|
||||||
};
|
};
|
||||||
}
|
}
|
@ -1,373 +1,22 @@
|
|||||||
package it.tdlight.reactiveapi;
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
import static java.util.Collections.unmodifiableSet;
|
|
||||||
import static java.util.Objects.requireNonNull;
|
|
||||||
import static java.util.concurrent.CompletableFuture.completedFuture;
|
|
||||||
import static java.util.concurrent.CompletableFuture.failedFuture;
|
|
||||||
import static reactor.core.publisher.Mono.fromCompletionStage;
|
|
||||||
|
|
||||||
import io.atomix.core.Atomix;
|
|
||||||
import io.atomix.core.idgenerator.AsyncAtomicIdGenerator;
|
|
||||||
import io.atomix.core.lock.AsyncAtomicLock;
|
|
||||||
import io.atomix.core.map.AsyncAtomicMap;
|
|
||||||
import io.atomix.protocols.raft.MultiRaftProtocol;
|
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
|
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
|
||||||
import java.util.concurrent.CompletionException;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
import org.jetbrains.annotations.NotNull;
|
|
||||||
import org.reactivestreams.Publisher;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
|
||||||
|
|
||||||
public class ReactiveApi {
|
public interface ReactiveApi {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApi.class);
|
Mono<Void> start();
|
||||||
|
|
||||||
@NotNull
|
Mono<CreateSessionResponse> createSession(CreateSessionRequest req);
|
||||||
private final String nodeId;
|
|
||||||
private final Atomix atomix;
|
|
||||||
private final AsyncAtomicIdGenerator nextSessionLiveId;
|
|
||||||
|
|
||||||
private final AsyncAtomicLock sessionModificationLock;
|
Mono<Map<Long, String>> getAllUsers();
|
||||||
private final AsyncAtomicMap<Long, String> userIdToNodeId;
|
|
||||||
/**
|
|
||||||
* User id -> session
|
|
||||||
*/
|
|
||||||
private final ConcurrentMap<Long, ReactiveApiPublisher> localLiveSessions = new ConcurrentHashMap<>();
|
|
||||||
private final DiskSessionsManager diskSessions;
|
|
||||||
|
|
||||||
public ReactiveApi(@NotNull String nodeId, Atomix atomix, DiskSessionsManager diskSessions) {
|
boolean is(String nodeId);
|
||||||
this.nodeId = nodeId;
|
|
||||||
this.atomix = atomix;
|
|
||||||
|
|
||||||
var raftProtocol = MultiRaftProtocol.builder().build();
|
|
||||||
this.nextSessionLiveId = atomix
|
|
||||||
.atomicIdGeneratorBuilder("session-live-id")
|
|
||||||
.withProtocol(raftProtocol)
|
|
||||||
.build()
|
|
||||||
.async();
|
|
||||||
this.sessionModificationLock = atomix
|
|
||||||
.atomicLockBuilder("session-modification")
|
|
||||||
.withProtocol(raftProtocol)
|
|
||||||
.build()
|
|
||||||
.async();
|
|
||||||
|
|
||||||
this.userIdToNodeId = atomix
|
|
||||||
.<Long, String>atomicMapBuilder("user-id-to-node-id")
|
|
||||||
//.withCacheEnabled(true)
|
|
||||||
//.withCacheSize(4096)
|
|
||||||
.withNullValues(false)
|
|
||||||
.withProtocol(raftProtocol)
|
|
||||||
.build()
|
|
||||||
.async();
|
|
||||||
|
|
||||||
this.diskSessions = diskSessions;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Mono<Void> start() {
|
|
||||||
Mono<Set<Long>> idsSavedIntoLocalConfiguration = Mono.fromCallable(() -> {
|
|
||||||
synchronized (diskSessions) {
|
|
||||||
return diskSessions.getSettings().userIdToSession().keySet();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
Mono<Set<Long>> distributedIds = this
|
|
||||||
.getAllUsers()
|
|
||||||
.flatMapIterable(Map::entrySet)
|
|
||||||
.filter(entry -> entry.getValue().equals(nodeId))
|
|
||||||
.map(Entry::getKey)
|
|
||||||
.collect(Collectors.toUnmodifiableSet());
|
|
||||||
|
|
||||||
record DiskChanges(Set<Long> normalIds, Set<Long> addedIds, Set<Long> removedIds) {}
|
|
||||||
|
|
||||||
var diskChangesMono = Mono.zip(idsSavedIntoLocalConfiguration, distributedIds).map(tuple -> {
|
|
||||||
var localSet = tuple.getT1();
|
|
||||||
var remoteSet = tuple.getT2();
|
|
||||||
|
|
||||||
var deletedUsers = new HashSet<>(remoteSet);
|
|
||||||
deletedUsers.removeAll(localSet);
|
|
||||||
|
|
||||||
var addedUsers = new HashSet<>(localSet);
|
|
||||||
addedUsers.removeAll(remoteSet);
|
|
||||||
|
|
||||||
var normalUsers = new HashSet<>(localSet);
|
|
||||||
normalUsers.removeAll(addedUsers);
|
|
||||||
|
|
||||||
for (long user : addedUsers) {
|
|
||||||
LOG.warn("Detected a new user id from the disk configuration file: {}", user);
|
|
||||||
}
|
|
||||||
for (long user : normalUsers) {
|
|
||||||
LOG.info("Detected a user id from the disk configuration file: {}", user);
|
|
||||||
}
|
|
||||||
for (long user : deletedUsers) {
|
|
||||||
LOG.warn("The user id {} has been deleted from the disk configuration file", user);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(deletedUsers));
|
|
||||||
}).cache();
|
|
||||||
|
|
||||||
var removeObsoleteDiskSessions = diskChangesMono
|
|
||||||
.flatMapIterable(diskChanges -> diskChanges.removedIds)
|
|
||||||
.flatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId)))
|
|
||||||
.then();
|
|
||||||
|
|
||||||
var addedDiskSessionsFlux = diskChangesMono
|
|
||||||
.flatMapIterable(diskChanges -> diskChanges.addedIds)
|
|
||||||
.flatMap(this::getLocalDiskSession);
|
|
||||||
var normalDiskSessionsFlux = diskChangesMono
|
|
||||||
.flatMapIterable(diskChanges -> diskChanges.normalIds)
|
|
||||||
.flatMap(this::getLocalDiskSession);
|
|
||||||
|
|
||||||
var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> {
|
|
||||||
var id = diskSessionAndId.id;
|
|
||||||
var diskSession = diskSessionAndId.diskSession;
|
|
||||||
return createSession(new LoadSessionFromDiskRequest(id,
|
|
||||||
diskSession.token,
|
|
||||||
diskSession.phoneNumber,
|
|
||||||
true
|
|
||||||
));
|
|
||||||
}).then();
|
|
||||||
|
|
||||||
var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> {
|
|
||||||
var id = diskSessionAndId.id;
|
|
||||||
var diskSession = diskSessionAndId.diskSession;
|
|
||||||
return createSession(new LoadSessionFromDiskRequest(id,
|
|
||||||
diskSession.token,
|
|
||||||
diskSession.phoneNumber,
|
|
||||||
false
|
|
||||||
));
|
|
||||||
}).then();
|
|
||||||
|
|
||||||
var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions)
|
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
|
||||||
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
|
|
||||||
|
|
||||||
// Listen for create-session signals
|
|
||||||
var subscriptionMono = fromCompletionStage(() -> atomix
|
|
||||||
.getEventService()
|
|
||||||
.subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> {
|
|
||||||
if (req instanceof LoadSessionFromDiskRequest) {
|
|
||||||
return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster"));
|
|
||||||
} else {
|
|
||||||
return createSession(req).toFuture();
|
|
||||||
}
|
|
||||||
}, CreateSessionResponse::serializeBytes));
|
|
||||||
|
|
||||||
return diskInitMono.then(subscriptionMono).then();
|
|
||||||
}
|
|
||||||
|
|
||||||
private CompletableFuture<Void> destroySession(long userId, String nodeId) {
|
|
||||||
LOG.debug("Received session delete request: user_id={}, node_id=\"{}\"", userId, nodeId);
|
|
||||||
|
|
||||||
// Lock sessions modification
|
|
||||||
return sessionModificationLock
|
|
||||||
.lock()
|
|
||||||
.thenCompose(lockVersion -> {
|
|
||||||
LOG.trace("Obtained session modification lock for session delete request: {} \"{}\"", userId, nodeId);
|
|
||||||
return userIdToNodeId
|
|
||||||
.remove(userId, nodeId)
|
|
||||||
.thenAccept(deleted -> LOG.debug("Deleted session {} \"{}\": {}", userId, nodeId, deleted));
|
|
||||||
})
|
|
||||||
.whenComplete((response, error) -> sessionModificationLock
|
|
||||||
.unlock()
|
|
||||||
.thenRun(() -> LOG.trace("Released session modification lock for session delete request: {} \"{}\"", userId, nodeId))
|
|
||||||
)
|
|
||||||
.whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex));
|
|
||||||
}
|
|
||||||
|
|
||||||
public Mono<CreateSessionResponse> createSession(CreateSessionRequest req) {
|
|
||||||
LOG.debug("Received create session request: {}", req);
|
|
||||||
|
|
||||||
Mono<CreateSessionResponse> unlockedSessionCreationMono = Mono.defer(() -> {
|
|
||||||
LOG.trace("Obtained session modification lock for session request: {}", req);
|
|
||||||
// Generate session id
|
|
||||||
return this
|
|
||||||
.nextFreeLiveId()
|
|
||||||
.flatMap(liveId -> {
|
|
||||||
// Create the session instance
|
|
||||||
ReactiveApiPublisher reactiveApiPublisher;
|
|
||||||
boolean loadedFromDisk;
|
|
||||||
long userId;
|
|
||||||
String botToken;
|
|
||||||
Long phoneNumber;
|
|
||||||
if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
|
|
||||||
loadedFromDisk = false;
|
|
||||||
userId = createBotSessionRequest.userId();
|
|
||||||
botToken = createBotSessionRequest.token();
|
|
||||||
phoneNumber = null;
|
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
|
||||||
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
|
|
||||||
loadedFromDisk = false;
|
|
||||||
userId = createUserSessionRequest.userId();
|
|
||||||
botToken = null;
|
|
||||||
phoneNumber = createUserSessionRequest.phoneNumber();
|
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
|
||||||
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
|
|
||||||
loadedFromDisk = true;
|
|
||||||
userId = loadSessionFromDiskRequest.userId();
|
|
||||||
botToken = loadSessionFromDiskRequest.token();
|
|
||||||
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
|
|
||||||
if (loadSessionFromDiskRequest.phoneNumber() != null) {
|
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
|
||||||
} else {
|
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register the session instance to the local nodes map
|
|
||||||
var prev = localLiveSessions.put(liveId, reactiveApiPublisher);
|
|
||||||
if (prev != null) {
|
|
||||||
LOG.error("User id \"{}\" was already registered locally! {}", liveId, prev);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register the session instance to the distributed nodes map
|
|
||||||
return Mono
|
|
||||||
.fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId).thenApply(Optional::ofNullable))
|
|
||||||
.flatMap(prevDistributed -> {
|
|
||||||
if (prevDistributed.isPresent() && prevDistributed.get().value() != null &&
|
|
||||||
!Objects.equals(this.nodeId, prevDistributed.get().value())) {
|
|
||||||
LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId,
|
|
||||||
prevDistributed.get().value());
|
|
||||||
}
|
|
||||||
|
|
||||||
var saveToDiskMono = Mono
|
|
||||||
.<Void>fromCallable(() -> {
|
|
||||||
// Save updated sessions configuration to disk
|
|
||||||
try {
|
|
||||||
synchronized (diskSessions) {
|
|
||||||
diskSessions.save();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new CompletionException("Failed to save disk sessions configuration", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.subscribeOn(Schedulers.boundedElastic());
|
|
||||||
|
|
||||||
// Start the session instance
|
|
||||||
return Mono
|
|
||||||
.fromCallable(() -> {
|
|
||||||
synchronized (diskSessions) {
|
|
||||||
return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path),
|
|
||||||
"Session " + userId + " path is missing");
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
|
||||||
.flatMap(baseSessionsPath -> {
|
|
||||||
String diskSessionFolderName = Long.toUnsignedString(userId);
|
|
||||||
Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
|
|
||||||
|
|
||||||
if (!loadedFromDisk) {
|
|
||||||
// Create the disk session configuration
|
|
||||||
var diskSession = new DiskSession(botToken, phoneNumber);
|
|
||||||
return Mono.<Void>fromCallable(() -> {
|
|
||||||
synchronized (diskSessions) {
|
|
||||||
diskSessions.getSettings().userIdToSession().put(userId, diskSession);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath);
|
|
||||||
} else {
|
|
||||||
return Mono.just(sessionPath);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.doOnNext(path -> reactiveApiPublisher.start(path,
|
|
||||||
() -> ReactiveApi.this.onPublisherClosed(userId)
|
|
||||||
))
|
|
||||||
.thenReturn(new CreateSessionResponse(liveId));
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
// Lock sessions creation
|
|
||||||
return Mono
|
|
||||||
.usingWhen(Mono.fromCompletionStage(sessionModificationLock::lock),
|
|
||||||
lockVersion -> unlockedSessionCreationMono,
|
|
||||||
lockVersion -> Mono
|
|
||||||
.fromCompletionStage(sessionModificationLock::unlock)
|
|
||||||
.doOnTerminate(() -> LOG.trace("Released session modification lock for session request: {}", req))
|
|
||||||
)
|
|
||||||
.doOnNext(resp -> LOG.debug("Handled session request {}, the response is: {}", req, resp))
|
|
||||||
.doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void onPublisherClosed(long userId) {
|
|
||||||
this.destroySession(userId, nodeId).whenComplete((result, ex) -> {
|
|
||||||
if (ex != null) {
|
|
||||||
LOG.error("Failed to close the session for user {} after it was closed itself", userId);
|
|
||||||
} else {
|
|
||||||
LOG.debug("Closed the session for user {} after it was closed itself", userId);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public Mono<Long> nextFreeLiveId() {
|
|
||||||
return Mono.fromCompletionStage(nextSessionLiveId::nextId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Atomix getAtomix() {
|
|
||||||
return atomix;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the list of current sessions
|
* May return empty
|
||||||
* @return map of user id -> node id
|
|
||||||
*/
|
*/
|
||||||
public Mono<Map<Long, String>> getAllUsers() {
|
Mono<Long> resolveUserLiveId(long userId);
|
||||||
return Flux.defer(() -> {
|
|
||||||
var it = userIdToNodeId.entrySet().iterator();
|
|
||||||
var hasNextMono = fromCompletionStage(it::hasNext);
|
|
||||||
var strictNextMono = fromCompletionStage(it::next)
|
|
||||||
.map(elem -> Map.entry(elem.getKey(), elem.getValue().value()));
|
|
||||||
|
|
||||||
var nextOrNothingMono = hasNextMono.flatMap(hasNext -> {
|
Mono<Void> close();
|
||||||
if (hasNext) {
|
|
||||||
return strictNextMono;
|
|
||||||
} else {
|
|
||||||
return Mono.empty();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return nextOrNothingMono.repeatWhen(s -> s.takeWhile(n -> n > 0));
|
|
||||||
}).collectMap(Entry::getKey, Entry::getValue);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean is(String nodeId) {
|
|
||||||
return this.nodeId.equals(nodeId);
|
|
||||||
}
|
|
||||||
|
|
||||||
private record DiskSessionAndId(DiskSession diskSession, long id) {}
|
|
||||||
|
|
||||||
private Mono<DiskSessionAndId> getLocalDiskSession(Long localId) {
|
|
||||||
return Mono.fromCallable(() -> {
|
|
||||||
synchronized (diskSessions) {
|
|
||||||
var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localId),
|
|
||||||
"Id not found: " + localId
|
|
||||||
);
|
|
||||||
try {
|
|
||||||
diskSession.validate();
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
LOG.error("Failed to load disk session {}", localId, ex);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return new DiskSessionAndId(diskSession, localId);
|
|
||||||
}
|
|
||||||
}).subscribeOn(Schedulers.boundedElastic());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -11,4 +11,6 @@ public interface ReactiveApiClient {
|
|||||||
Flux<ClientBoundEvent> clientBoundEvents();
|
Flux<ClientBoundEvent> clientBoundEvents();
|
||||||
|
|
||||||
<T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout);
|
<T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout);
|
||||||
|
|
||||||
|
long getUserId();
|
||||||
}
|
}
|
||||||
|
@ -1,15 +1,18 @@
|
|||||||
package it.tdlight.reactiveapi;
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
import static it.tdlight.reactiveapi.AuthPhase.*;
|
import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN;
|
||||||
|
import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT;
|
||||||
import static java.util.Objects.requireNonNull;
|
import static java.util.Objects.requireNonNull;
|
||||||
|
|
||||||
|
import com.google.common.primitives.Longs;
|
||||||
import io.atomix.cluster.messaging.ClusterEventService;
|
import io.atomix.cluster.messaging.ClusterEventService;
|
||||||
|
import io.atomix.cluster.messaging.Subscription;
|
||||||
import io.atomix.core.Atomix;
|
import io.atomix.core.Atomix;
|
||||||
import it.tdlight.common.ReactiveTelegramClient;
|
import it.tdlight.common.ReactiveTelegramClient;
|
||||||
import it.tdlight.common.Response;
|
import it.tdlight.common.Response;
|
||||||
import it.tdlight.common.Signal;
|
import it.tdlight.common.Signal;
|
||||||
import it.tdlight.common.utils.LibraryVersion;
|
|
||||||
import it.tdlight.jni.TdApi;
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.jni.TdApi.AuthorizationStateWaitOtherDeviceConfirmation;
|
||||||
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
|
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
|
||||||
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
||||||
import it.tdlight.jni.TdApi.Object;
|
import it.tdlight.jni.TdApi.Object;
|
||||||
@ -64,23 +67,26 @@ public abstract class ReactiveApiPublisher {
|
|||||||
private final AtomicReference<State> state = new AtomicReference<>(new State(LOGGED_OUT));
|
private final AtomicReference<State> state = new AtomicReference<>(new State(LOGGED_OUT));
|
||||||
protected final long userId;
|
protected final long userId;
|
||||||
protected final long liveId;
|
protected final long liveId;
|
||||||
|
private final String dynamicIdResolveSubject;
|
||||||
|
|
||||||
private final AtomicReference<Disposable> disposable = new AtomicReference<>();
|
private final AtomicReference<Disposable> disposable = new AtomicReference<>();
|
||||||
private final AtomicReference<Path> path = new AtomicReference<>();
|
private final AtomicReference<Path> path = new AtomicReference<>();
|
||||||
|
|
||||||
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId) {
|
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId) {
|
||||||
|
this.eventService = atomix.getEventService();
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
this.liveId = liveId;
|
this.liveId = liveId;
|
||||||
|
this.dynamicIdResolveSubject = SubjectNaming.getDynamicIdResolveSubject(userId);
|
||||||
this.rawTelegramClient = ClientManager.createReactive();
|
this.rawTelegramClient = ClientManager.createReactive();
|
||||||
this.telegramClient = Flux.<Signal>create(sink -> {
|
this.telegramClient = Flux.<Signal>create(sink -> this.registerTopics().thenAccept(subscription -> {
|
||||||
rawTelegramClient.createAndRegisterClient();
|
rawTelegramClient.createAndRegisterClient();
|
||||||
rawTelegramClient.setListener(sink::next);
|
rawTelegramClient.setListener(sink::next);
|
||||||
sink.onCancel(rawTelegramClient::cancel);
|
sink.onCancel(rawTelegramClient::cancel);
|
||||||
sink.onDispose(rawTelegramClient::dispose);
|
sink.onDispose(() -> {
|
||||||
|
subscription.close();
|
||||||
this.registerTopics();
|
rawTelegramClient.dispose();
|
||||||
}).share();
|
});
|
||||||
this.eventService = atomix.getEventService();
|
})).share();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) {
|
public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) {
|
||||||
@ -142,7 +148,7 @@ public abstract class ReactiveApiPublisher {
|
|||||||
|
|
||||||
// Send events to the client
|
// Send events to the client
|
||||||
.subscribeOn(Schedulers.parallel())
|
.subscribeOn(Schedulers.parallel())
|
||||||
.subscribe(clientBoundEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events",
|
.subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events",
|
||||||
clientBoundEvent, ReactiveApiPublisher::serializeEvent));
|
clientBoundEvent, ReactiveApiPublisher::serializeEvent));
|
||||||
|
|
||||||
publishedResultingEvents
|
publishedResultingEvents
|
||||||
@ -242,7 +248,8 @@ public abstract class ReactiveApiPublisher {
|
|||||||
return onWaitCode();
|
return onWaitCode();
|
||||||
}
|
}
|
||||||
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> {
|
||||||
return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId));
|
var link = ((AuthorizationStateWaitOtherDeviceConfirmation) updateAuthorizationState.authorizationState).link;
|
||||||
|
return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId, link));
|
||||||
}
|
}
|
||||||
case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> {
|
||||||
return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId));
|
return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId));
|
||||||
@ -290,6 +297,8 @@ public abstract class ReactiveApiPublisher {
|
|||||||
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
|
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
|
||||||
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
|
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
|
||||||
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
|
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
|
||||||
|
dataOutputStream.writeLong(clientBoundEvent.liveId());
|
||||||
|
dataOutputStream.writeLong(clientBoundEvent.userId());
|
||||||
if (clientBoundEvent instanceof OnUpdateData onUpdateData) {
|
if (clientBoundEvent instanceof OnUpdateData onUpdateData) {
|
||||||
dataOutputStream.writeByte(0x1);
|
dataOutputStream.writeByte(0x1);
|
||||||
onUpdateData.update().serialize(dataOutputStream);
|
onUpdateData.update().serialize(dataOutputStream);
|
||||||
@ -302,6 +311,9 @@ public abstract class ReactiveApiPublisher {
|
|||||||
} else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) {
|
} else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) {
|
||||||
dataOutputStream.writeByte(0x4);
|
dataOutputStream.writeByte(0x4);
|
||||||
dataOutputStream.writeUTF(onBotLoginCodeRequested.token());
|
dataOutputStream.writeUTF(onBotLoginCodeRequested.token());
|
||||||
|
} else if (clientBoundEvent instanceof OnOtherDeviceLoginRequested onOtherDeviceLoginRequested) {
|
||||||
|
dataOutputStream.writeByte(0x5);
|
||||||
|
dataOutputStream.writeUTF(onOtherDeviceLoginRequested.link());
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent);
|
throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent);
|
||||||
}
|
}
|
||||||
@ -312,12 +324,19 @@ public abstract class ReactiveApiPublisher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerTopics() {
|
private CompletableFuture<Subscription> registerTopics() {
|
||||||
// Start receiving requests
|
// Start receiving requests
|
||||||
eventService.subscribe("session-" + liveId + "-requests",
|
eventService.subscribe("session-" + liveId + "-requests",
|
||||||
ReactiveApiPublisher::deserializeRequest,
|
ReactiveApiPublisher::deserializeRequest,
|
||||||
this::handleRequest,
|
this::handleRequest,
|
||||||
ReactiveApiPublisher::serializeResponse);
|
ReactiveApiPublisher::serializeResponse);
|
||||||
|
|
||||||
|
// Start receiving request
|
||||||
|
return eventService.subscribe(dynamicIdResolveSubject,
|
||||||
|
b -> null,
|
||||||
|
r -> CompletableFuture.completedFuture(liveId),
|
||||||
|
Longs::toByteArray
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] serializeResponse(Response response) {
|
private static byte[] serializeResponse(Response response) {
|
||||||
@ -325,7 +344,7 @@ public abstract class ReactiveApiPublisher {
|
|||||||
var object = response.getObject();
|
var object = response.getObject();
|
||||||
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
|
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
|
||||||
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
|
try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
|
||||||
dataOutputStream.writeLong(id);
|
//dataOutputStream.writeLong(id);
|
||||||
object.serialize(dataOutputStream);
|
object.serialize(dataOutputStream);
|
||||||
return byteArrayOutputStream.toByteArray();
|
return byteArrayOutputStream.toByteArray();
|
||||||
}
|
}
|
||||||
|
8
src/main/java/it/tdlight/reactiveapi/SubjectNaming.java
Normal file
8
src/main/java/it/tdlight/reactiveapi/SubjectNaming.java
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
public class SubjectNaming {
|
||||||
|
|
||||||
|
public static String getDynamicIdResolveSubject(long userId) {
|
||||||
|
return "session-" + userId + "-dynamic-live-id-resolve";
|
||||||
|
}
|
||||||
|
}
|
34
src/main/java/it/tdlight/reactiveapi/TdError.java
Normal file
34
src/main/java/it/tdlight/reactiveapi/TdError.java
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.jni.TdApi.Error;
|
||||||
|
|
||||||
|
public class TdError extends Exception {
|
||||||
|
|
||||||
|
private final int code;
|
||||||
|
private final String message;
|
||||||
|
|
||||||
|
public TdError(int code, String message) {
|
||||||
|
super(code + " " + message);
|
||||||
|
this.code = code;
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TdError(int code, String message, Throwable cause) {
|
||||||
|
super(code + " " + message, cause);
|
||||||
|
this.code = code;
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTdCode() {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTdMessage() {
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TdApi.Error getTdError() {
|
||||||
|
return new Error(code, message);
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user