2022-01-07 23:54:18 +01:00
|
|
|
package it.tdlight.reactiveapi;
|
|
|
|
|
|
|
|
import static java.util.Objects.requireNonNull;
|
|
|
|
|
|
|
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
|
|
|
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
|
|
|
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.nio.file.Path;
|
|
|
|
import java.nio.file.Paths;
|
2022-01-08 18:13:40 +01:00
|
|
|
import java.time.Duration;
|
2022-01-07 23:54:18 +01:00
|
|
|
import java.util.Map.Entry;
|
|
|
|
import java.util.Objects;
|
|
|
|
import java.util.Set;
|
|
|
|
import java.util.concurrent.CompletionException;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ConcurrentMap;
|
2022-06-27 00:06:53 +02:00
|
|
|
import java.util.concurrent.locks.LockSupport;
|
2022-01-09 18:27:14 +01:00
|
|
|
import org.jetbrains.annotations.NotNull;
|
2022-01-07 23:54:18 +01:00
|
|
|
import org.jetbrains.annotations.Nullable;
|
|
|
|
import org.slf4j.Logger;
|
|
|
|
import org.slf4j.LoggerFactory;
|
2022-09-13 22:15:18 +02:00
|
|
|
import reactor.core.Disposable;
|
2022-01-07 23:54:18 +01:00
|
|
|
import reactor.core.publisher.Mono;
|
2022-09-13 22:15:18 +02:00
|
|
|
import reactor.core.publisher.Sinks.EmitFailureHandler;
|
2022-01-07 23:54:18 +01:00
|
|
|
import reactor.core.scheduler.Schedulers;
|
|
|
|
|
|
|
|
public class AtomixReactiveApi implements ReactiveApi {
|
|
|
|
|
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(AtomixReactiveApi.class);
|
|
|
|
|
2022-09-13 22:15:18 +02:00
|
|
|
private final AtomixReactiveApiMode mode;
|
2022-06-27 00:06:53 +02:00
|
|
|
|
2022-09-10 20:25:54 +02:00
|
|
|
private final KafkaSharedTdlibClients kafkaSharedTdlibClients;
|
2022-01-07 23:54:18 +01:00
|
|
|
@Nullable
|
2022-09-10 20:25:54 +02:00
|
|
|
private final KafkaSharedTdlibServers kafkaSharedTdlibServers;
|
2022-09-13 22:15:18 +02:00
|
|
|
private final ReactiveApiMultiClient client;
|
2022-01-07 23:54:18 +01:00
|
|
|
|
2022-06-27 00:06:53 +02:00
|
|
|
private final Set<ResultingEventTransformer> resultingEventTransformerSet;
|
2022-01-07 23:54:18 +01:00
|
|
|
/**
|
2022-06-27 00:06:53 +02:00
|
|
|
* user id -> session
|
2022-01-07 23:54:18 +01:00
|
|
|
*/
|
2022-06-27 00:06:53 +02:00
|
|
|
private final ConcurrentMap<Long, ReactiveApiPublisher> localSessions = new ConcurrentHashMap<>();
|
2022-01-07 23:54:18 +01:00
|
|
|
/**
|
|
|
|
* DiskSessions is null when nodeId is null
|
|
|
|
*/
|
|
|
|
@Nullable
|
|
|
|
private final DiskSessionsManager diskSessions;
|
2022-06-27 00:06:53 +02:00
|
|
|
private volatile boolean closeRequested;
|
2022-09-13 22:15:18 +02:00
|
|
|
private volatile Disposable requestsSub;
|
2022-01-07 23:54:18 +01:00
|
|
|
|
2022-09-13 22:15:18 +02:00
|
|
|
public enum AtomixReactiveApiMode {
|
|
|
|
CLIENT,
|
|
|
|
SERVER,
|
|
|
|
FULL
|
|
|
|
}
|
|
|
|
|
|
|
|
public AtomixReactiveApi(AtomixReactiveApiMode mode,
|
2022-01-13 01:59:26 +01:00
|
|
|
KafkaParameters kafkaParameters,
|
2022-01-09 18:27:14 +01:00
|
|
|
@Nullable DiskSessionsManager diskSessions,
|
|
|
|
@NotNull Set<ResultingEventTransformer> resultingEventTransformerSet) {
|
2022-09-13 22:15:18 +02:00
|
|
|
this.mode = mode;
|
2022-06-27 00:06:53 +02:00
|
|
|
var kafkaTDLibRequestProducer = new KafkaTdlibRequestProducer(kafkaParameters);
|
|
|
|
var kafkaTDLibResponseConsumer = new KafkaTdlibResponseConsumer(kafkaParameters);
|
|
|
|
var kafkaClientBoundConsumer = new KafkaClientBoundConsumer(kafkaParameters);
|
2022-09-10 20:25:54 +02:00
|
|
|
var kafkaTdlibClientsChannels = new KafkaTdlibClientsChannels(kafkaTDLibRequestProducer,
|
2022-06-27 00:06:53 +02:00
|
|
|
kafkaTDLibResponseConsumer,
|
|
|
|
kafkaClientBoundConsumer
|
|
|
|
);
|
2022-09-13 22:15:18 +02:00
|
|
|
if (mode != AtomixReactiveApiMode.SERVER) {
|
|
|
|
this.kafkaSharedTdlibClients = new KafkaSharedTdlibClients(kafkaTdlibClientsChannels);
|
|
|
|
this.client = new LiveAtomixReactiveApiClient(kafkaSharedTdlibClients);
|
2022-01-07 23:54:18 +01:00
|
|
|
} else {
|
2022-09-13 22:15:18 +02:00
|
|
|
this.kafkaSharedTdlibClients = null;
|
|
|
|
this.client = null;
|
|
|
|
}
|
|
|
|
if (mode != AtomixReactiveApiMode.CLIENT) {
|
2022-06-27 00:06:53 +02:00
|
|
|
var kafkaTDLibRequestConsumer = new KafkaTdlibRequestConsumer(kafkaParameters);
|
|
|
|
var kafkaTDLibResponseProducer = new KafkaTdlibResponseProducer(kafkaParameters);
|
|
|
|
var kafkaClientBoundProducer = new KafkaClientBoundProducer(kafkaParameters);
|
2022-09-10 20:25:54 +02:00
|
|
|
var kafkaTDLibServer = new KafkaTdlibServersChannels(kafkaTDLibRequestConsumer,
|
2022-06-27 00:06:53 +02:00
|
|
|
kafkaTDLibResponseProducer,
|
|
|
|
kafkaClientBoundProducer
|
|
|
|
);
|
2022-09-10 20:25:54 +02:00
|
|
|
this.kafkaSharedTdlibServers = new KafkaSharedTdlibServers(kafkaTDLibServer);
|
2022-09-13 22:15:18 +02:00
|
|
|
} else {
|
|
|
|
this.kafkaSharedTdlibServers = null;
|
2022-01-07 23:54:18 +01:00
|
|
|
}
|
2022-06-27 00:06:53 +02:00
|
|
|
this.resultingEventTransformerSet = resultingEventTransformerSet;
|
2022-01-07 23:54:18 +01:00
|
|
|
|
|
|
|
this.diskSessions = diskSessions;
|
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Mono<Void> start() {
|
2022-06-27 00:06:53 +02:00
|
|
|
var idsSavedIntoLocalConfiguration = Mono
|
|
|
|
.<Set<Entry<Long, DiskSession>>>fromCallable(() -> {
|
|
|
|
if (diskSessions == null) {
|
|
|
|
return Set.of();
|
|
|
|
}
|
|
|
|
synchronized (diskSessions) {
|
|
|
|
return diskSessions.getSettings().userIdToSession().entrySet();
|
|
|
|
}
|
|
|
|
})
|
2022-01-07 23:54:18 +01:00
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
2022-06-27 00:06:53 +02:00
|
|
|
.flatMapIterable(a -> a)
|
|
|
|
.map(a -> new DiskSessionAndId(a.getValue(), a.getKey()));
|
|
|
|
|
2022-09-13 22:15:18 +02:00
|
|
|
var loadSessions = idsSavedIntoLocalConfiguration
|
2022-06-27 00:06:53 +02:00
|
|
|
.filter(diskSessionAndId -> {
|
|
|
|
try {
|
|
|
|
diskSessionAndId.diskSession().validate();
|
|
|
|
} catch (Throwable ex) {
|
|
|
|
LOG.error("Failed to load disk session {}", diskSessionAndId.id, ex);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
2022-01-07 23:54:18 +01:00
|
|
|
})
|
2022-06-27 00:06:53 +02:00
|
|
|
.flatMap(diskSessionAndId -> {
|
|
|
|
var id = diskSessionAndId.id;
|
|
|
|
var diskSession = diskSessionAndId.diskSession;
|
|
|
|
return createSession(new LoadSessionFromDiskRequest(id,
|
|
|
|
diskSession.token,
|
|
|
|
diskSession.phoneNumber,
|
|
|
|
true
|
|
|
|
));
|
|
|
|
})
|
|
|
|
.then()
|
|
|
|
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
|
2022-09-13 22:15:18 +02:00
|
|
|
|
|
|
|
return loadSessions.then(Mono.fromRunnable(() -> {
|
|
|
|
if (kafkaSharedTdlibServers != null) {
|
|
|
|
requestsSub = kafkaSharedTdlibServers.requests()
|
|
|
|
.doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data()))
|
|
|
|
.subscribeOn(Schedulers.parallel())
|
|
|
|
.subscribe();
|
|
|
|
}
|
|
|
|
}));
|
2022-01-09 20:20:20 +01:00
|
|
|
}
|
|
|
|
|
2022-01-07 23:54:18 +01:00
|
|
|
@Override
|
|
|
|
public Mono<CreateSessionResponse> createSession(CreateSessionRequest req) {
|
|
|
|
LOG.debug("Received create session request: {}", req);
|
|
|
|
|
2022-09-13 22:15:18 +02:00
|
|
|
if (mode == AtomixReactiveApiMode.CLIENT) {
|
2022-01-07 23:54:18 +01:00
|
|
|
return Mono.error(new UnsupportedOperationException("This is a client, it can't have own sessions"));
|
|
|
|
}
|
|
|
|
|
2022-06-27 00:06:53 +02:00
|
|
|
// Create the session instance
|
|
|
|
ReactiveApiPublisher reactiveApiPublisher;
|
|
|
|
boolean loadedFromDisk;
|
|
|
|
long userId;
|
|
|
|
String botToken;
|
|
|
|
Long phoneNumber;
|
|
|
|
if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
|
|
|
|
loadedFromDisk = false;
|
|
|
|
userId = createBotSessionRequest.userId();
|
|
|
|
botToken = createBotSessionRequest.token();
|
|
|
|
phoneNumber = null;
|
2022-09-10 20:25:54 +02:00
|
|
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers, resultingEventTransformerSet,
|
2022-06-27 00:06:53 +02:00
|
|
|
userId,
|
|
|
|
botToken
|
|
|
|
);
|
|
|
|
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
|
|
|
|
loadedFromDisk = false;
|
|
|
|
userId = createUserSessionRequest.userId();
|
|
|
|
botToken = null;
|
|
|
|
phoneNumber = createUserSessionRequest.phoneNumber();
|
2022-09-10 20:25:54 +02:00
|
|
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers, resultingEventTransformerSet,
|
2022-06-27 00:06:53 +02:00
|
|
|
userId,
|
|
|
|
phoneNumber
|
|
|
|
);
|
|
|
|
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
|
|
|
|
loadedFromDisk = true;
|
|
|
|
userId = loadSessionFromDiskRequest.userId();
|
|
|
|
botToken = loadSessionFromDiskRequest.token();
|
|
|
|
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
|
|
|
|
if (loadSessionFromDiskRequest.phoneNumber() != null) {
|
2022-09-10 20:25:54 +02:00
|
|
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(kafkaSharedTdlibServers,
|
2022-06-27 00:06:53 +02:00
|
|
|
resultingEventTransformerSet,
|
|
|
|
userId,
|
|
|
|
phoneNumber
|
|
|
|
);
|
2022-01-07 23:54:18 +01:00
|
|
|
} else {
|
2022-09-10 20:25:54 +02:00
|
|
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(kafkaSharedTdlibServers,
|
2022-06-27 00:06:53 +02:00
|
|
|
resultingEventTransformerSet,
|
|
|
|
userId,
|
|
|
|
botToken
|
|
|
|
);
|
2022-01-07 23:54:18 +01:00
|
|
|
}
|
2022-06-27 00:06:53 +02:00
|
|
|
} else {
|
|
|
|
return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
|
|
|
|
}
|
2022-01-07 23:54:18 +01:00
|
|
|
|
2022-06-27 00:06:53 +02:00
|
|
|
// Register the session instance to the local nodes map
|
|
|
|
var prev = localSessions.put(userId, reactiveApiPublisher);
|
|
|
|
if (prev != null) {
|
|
|
|
LOG.error("User id \"{}\" was already registered locally! {}", userId, prev);
|
|
|
|
}
|
2022-01-07 23:54:18 +01:00
|
|
|
|
2022-06-27 00:06:53 +02:00
|
|
|
var saveToDiskMono = Mono
|
|
|
|
.<Void>fromCallable(() -> {
|
|
|
|
// Save updated sessions configuration to disk
|
|
|
|
try {
|
|
|
|
Objects.requireNonNull(diskSessions);
|
2022-01-09 20:20:20 +01:00
|
|
|
|
2022-06-27 00:06:53 +02:00
|
|
|
synchronized (diskSessions) {
|
|
|
|
diskSessions.save();
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
} catch (IOException e) {
|
|
|
|
throw new CompletionException("Failed to save disk sessions configuration", e);
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic());
|
2022-01-07 23:54:18 +01:00
|
|
|
|
2022-06-27 00:06:53 +02:00
|
|
|
// Start the session instance
|
|
|
|
return Mono
|
|
|
|
.fromCallable(() -> {
|
|
|
|
Objects.requireNonNull(diskSessions);
|
|
|
|
synchronized (diskSessions) {
|
|
|
|
return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path),
|
|
|
|
"Session " + userId + " path is missing");
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.subscribeOn(Schedulers.boundedElastic())
|
|
|
|
.flatMap(baseSessionsPath -> {
|
|
|
|
String diskSessionFolderName = "id" + Long.toUnsignedString(userId);
|
|
|
|
Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
|
|
|
|
|
|
|
|
if (!loadedFromDisk) {
|
|
|
|
// Create the disk session configuration
|
|
|
|
var diskSession = new DiskSession(botToken, phoneNumber);
|
|
|
|
return Mono.<Void>fromCallable(() -> {
|
|
|
|
Objects.requireNonNull(diskSessions);
|
|
|
|
synchronized (diskSessions) {
|
|
|
|
diskSessions.getSettings().userIdToSession().put(userId, diskSession);
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath);
|
2022-01-07 23:54:18 +01:00
|
|
|
} else {
|
2022-06-27 00:06:53 +02:00
|
|
|
return Mono.just(sessionPath);
|
2022-01-07 23:54:18 +01:00
|
|
|
}
|
2022-06-27 00:06:53 +02:00
|
|
|
})
|
|
|
|
.doOnNext(path -> reactiveApiPublisher.start(path, () -> {
|
|
|
|
localSessions.remove(userId);
|
|
|
|
LOG.debug("Closed the session for user {} after it was closed itself", userId);
|
|
|
|
}))
|
|
|
|
.thenReturn(new CreateSessionResponse(userId));
|
2022-01-09 20:20:20 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-09-13 22:15:18 +02:00
|
|
|
public ReactiveApiMultiClient client() {
|
|
|
|
return client;
|
2022-01-09 20:20:20 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2022-06-27 00:06:53 +02:00
|
|
|
public Mono<Void> close() {
|
|
|
|
closeRequested = true;
|
|
|
|
Mono<?> kafkaServerProducersStopper;
|
2022-09-10 20:25:54 +02:00
|
|
|
if (kafkaSharedTdlibServers != null) {
|
|
|
|
kafkaServerProducersStopper = Mono.fromRunnable(kafkaSharedTdlibServers::close).subscribeOn(Schedulers.boundedElastic());
|
2022-06-27 00:06:53 +02:00
|
|
|
} else {
|
|
|
|
kafkaServerProducersStopper = Mono.empty();
|
|
|
|
}
|
2022-09-13 22:15:18 +02:00
|
|
|
Mono<?> kafkaClientProducersStopper;
|
|
|
|
if (kafkaSharedTdlibClients != null) {
|
|
|
|
kafkaClientProducersStopper = Mono
|
|
|
|
.fromRunnable(kafkaSharedTdlibClients::close)
|
|
|
|
.subscribeOn(Schedulers.boundedElastic());
|
|
|
|
} else {
|
|
|
|
kafkaClientProducersStopper = Mono.empty();
|
|
|
|
}
|
|
|
|
if (requestsSub != null) {
|
|
|
|
requestsSub.dispose();
|
|
|
|
}
|
2022-06-27 00:06:53 +02:00
|
|
|
return Mono.when(kafkaServerProducersStopper, kafkaClientProducersStopper);
|
2022-01-09 20:20:20 +01:00
|
|
|
}
|
|
|
|
|
2022-01-07 23:54:18 +01:00
|
|
|
@Override
|
2022-06-27 00:06:53 +02:00
|
|
|
public void waitForExit() {
|
|
|
|
var nanos = Duration.ofSeconds(1).toNanos();
|
|
|
|
while (!closeRequested && !Thread.interrupted()) {
|
|
|
|
LockSupport.parkNanos(nanos);
|
|
|
|
}
|
2022-01-07 23:54:18 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
private record DiskSessionAndId(DiskSession diskSession, long id) {}
|
|
|
|
|
2022-01-09 20:20:20 +01:00
|
|
|
private Mono<DiskSessionAndId> getLocalDiskSession(Long localUserId) {
|
2022-01-07 23:54:18 +01:00
|
|
|
return Mono.fromCallable(() -> {
|
|
|
|
Objects.requireNonNull(diskSessions);
|
|
|
|
synchronized (diskSessions) {
|
2022-01-09 20:20:20 +01:00
|
|
|
var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localUserId),
|
|
|
|
"Id not found: " + localUserId
|
2022-01-07 23:54:18 +01:00
|
|
|
);
|
|
|
|
try {
|
|
|
|
diskSession.validate();
|
|
|
|
} catch (Throwable ex) {
|
2022-01-09 20:20:20 +01:00
|
|
|
LOG.error("Failed to load disk session {}", localUserId, ex);
|
2022-01-07 23:54:18 +01:00
|
|
|
return null;
|
|
|
|
}
|
2022-01-09 20:20:20 +01:00
|
|
|
return new DiskSessionAndId(diskSession, localUserId);
|
2022-01-07 23:54:18 +01:00
|
|
|
}
|
|
|
|
}).subscribeOn(Schedulers.boundedElastic());
|
|
|
|
}
|
|
|
|
}
|