tdlib-session-container/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java

335 lines
11 KiB
Java
Raw Normal View History

2022-01-07 23:54:18 +01:00
package it.tdlight.reactiveapi;
import static java.util.Objects.requireNonNull;
2022-10-04 12:43:24 +02:00
import it.tdlight.jni.TdApi.Object;
2022-01-07 23:54:18 +01:00
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
2022-10-04 12:43:24 +02:00
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnRequest;
import it.tdlight.reactiveapi.Event.OnResponse;
2022-01-07 23:54:18 +01:00
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
2022-01-08 18:13:40 +01:00
import java.time.Duration;
2022-09-22 15:46:31 +02:00
import java.util.HashMap;
2022-01-07 23:54:18 +01:00
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
2022-06-27 00:06:53 +02:00
import java.util.concurrent.locks.LockSupport;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
2022-01-09 18:27:14 +01:00
import org.jetbrains.annotations.NotNull;
2022-01-07 23:54:18 +01:00
import org.jetbrains.annotations.Nullable;
2022-09-13 22:15:18 +02:00
import reactor.core.Disposable;
2022-01-07 23:54:18 +01:00
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class AtomixReactiveApi implements ReactiveApi {
private static final Logger LOG = LogManager.getLogger(AtomixReactiveApi.class);
2022-01-07 23:54:18 +01:00
2022-09-13 22:15:18 +02:00
private final AtomixReactiveApiMode mode;
2022-06-27 00:06:53 +02:00
2022-10-06 00:36:00 +02:00
private final TdlibChannelsSharedReceive sharedTdlibClients;
2022-01-07 23:54:18 +01:00
@Nullable
2022-10-06 00:36:00 +02:00
private final TdlibChannelsSharedHost sharedTdlibServers;
2022-09-13 22:15:18 +02:00
private final ReactiveApiMultiClient client;
2022-01-07 23:54:18 +01:00
2022-06-27 00:06:53 +02:00
private final Set<ResultingEventTransformer> resultingEventTransformerSet;
2022-01-07 23:54:18 +01:00
/**
2022-06-27 00:06:53 +02:00
* user id -> session
2022-01-07 23:54:18 +01:00
*/
2022-06-27 00:06:53 +02:00
private final ConcurrentMap<Long, ReactiveApiPublisher> localSessions = new ConcurrentHashMap<>();
2022-01-07 23:54:18 +01:00
/**
* DiskSessions is null when nodeId is null
*/
@Nullable
private final DiskSessionsManager diskSessions;
2022-06-27 00:06:53 +02:00
private volatile boolean closeRequested;
2022-09-13 22:15:18 +02:00
private volatile Disposable requestsSub;
2022-01-07 23:54:18 +01:00
2022-09-13 22:15:18 +02:00
public enum AtomixReactiveApiMode {
CLIENT,
SERVER,
FULL
}
public AtomixReactiveApi(AtomixReactiveApiMode mode,
2022-10-05 02:26:30 +02:00
ChannelsParameters channelsParameters,
2022-01-09 18:27:14 +01:00
@Nullable DiskSessionsManager diskSessions,
@NotNull Set<ResultingEventTransformer> resultingEventTransformerSet) {
2022-09-13 22:15:18 +02:00
this.mode = mode;
2022-10-05 02:26:30 +02:00
ChannelFactory channelFactory = ChannelFactory.getFactoryFromParameters(channelsParameters);
2022-09-13 22:15:18 +02:00
if (mode != AtomixReactiveApiMode.SERVER) {
2022-10-06 00:36:00 +02:00
EventProducer<OnRequest<?>> tdRequestProducer = ChannelProducerTdlibRequest.create(channelFactory);
EventConsumer<OnResponse<Object>> tdResponseConsumer = ChannelConsumerTdlibResponse.create(channelFactory);
2022-10-05 02:26:30 +02:00
HashMap<String, EventConsumer<ClientBoundEvent>> clientBoundConsumers = new HashMap<>();
for (String lane : channelsParameters.getAllLanes()) {
2022-10-06 00:36:00 +02:00
clientBoundConsumers.put(lane, ChannelConsumerClientBoundEvent.create(channelFactory, lane));
2022-09-22 15:46:31 +02:00
}
2022-10-05 02:26:30 +02:00
var tdClientsChannels = new TdlibChannelsClients(tdRequestProducer,
tdResponseConsumer,
clientBoundConsumers
2022-09-22 15:46:31 +02:00
);
2022-10-06 00:36:00 +02:00
this.sharedTdlibClients = new TdlibChannelsSharedReceive(tdClientsChannels);
2022-10-05 02:26:30 +02:00
this.client = new LiveAtomixReactiveApiClient(sharedTdlibClients);
2022-01-07 23:54:18 +01:00
} else {
2022-10-05 02:26:30 +02:00
this.sharedTdlibClients = null;
2022-09-13 22:15:18 +02:00
this.client = null;
}
if (mode != AtomixReactiveApiMode.CLIENT) {
2022-10-06 00:36:00 +02:00
EventConsumer<OnRequest<Object>> tdRequestConsumer = ChannelConsumerTdlibRequest.create(channelFactory);
EventProducer<OnResponse<Object>> tdResponseProducer = ChannelProducerTdlibResponse.create(channelFactory);
2022-10-05 02:26:30 +02:00
var clientBoundProducers = new HashMap<String, EventProducer<ClientBoundEvent>>();
for (String lane : channelsParameters.getAllLanes()) {
2022-10-06 00:36:00 +02:00
clientBoundProducers.put(lane, ChannelProducerClientBoundEvent.create(channelFactory, lane));
2022-09-22 15:46:31 +02:00
}
2022-10-05 02:26:30 +02:00
var tdServer = new TdlibChannelsServers(tdRequestConsumer,
tdResponseProducer,
clientBoundProducers
2022-06-27 00:06:53 +02:00
);
2022-10-06 00:36:00 +02:00
this.sharedTdlibServers = new TdlibChannelsSharedHost(channelsParameters.getAllLanes(), tdServer);
2022-09-13 22:15:18 +02:00
} else {
2022-10-05 02:26:30 +02:00
this.sharedTdlibServers = null;
2022-01-07 23:54:18 +01:00
}
2022-06-27 00:06:53 +02:00
this.resultingEventTransformerSet = resultingEventTransformerSet;
2022-01-07 23:54:18 +01:00
this.diskSessions = diskSessions;
}
@Override
public Mono<Void> start() {
2022-06-27 00:06:53 +02:00
var idsSavedIntoLocalConfiguration = Mono
.<Set<Entry<Long, DiskSession>>>fromCallable(() -> {
if (diskSessions == null) {
return Set.of();
}
synchronized (diskSessions) {
return diskSessions.getSettings().userIdToSession().entrySet();
}
})
2022-01-07 23:54:18 +01:00
.subscribeOn(Schedulers.boundedElastic())
2022-06-27 00:06:53 +02:00
.flatMapIterable(a -> a)
.map(a -> new DiskSessionAndId(a.getValue(), a.getKey()));
2022-09-13 22:15:18 +02:00
var loadSessions = idsSavedIntoLocalConfiguration
2022-06-27 00:06:53 +02:00
.filter(diskSessionAndId -> {
try {
diskSessionAndId.diskSession().validate();
} catch (Throwable ex) {
LOG.error("Failed to load disk session {}", diskSessionAndId.id, ex);
return false;
}
return true;
2022-01-07 23:54:18 +01:00
})
2022-06-27 00:06:53 +02:00
.flatMap(diskSessionAndId -> {
var id = diskSessionAndId.id;
var diskSession = diskSessionAndId.diskSession;
return createSession(new LoadSessionFromDiskRequest(id,
diskSession.token,
diskSession.phoneNumber,
2022-09-22 15:46:31 +02:00
diskSession.lane,
2022-06-27 00:06:53 +02:00
true
));
})
.then()
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
2022-09-13 22:15:18 +02:00
2022-09-22 02:26:22 +02:00
return loadSessions.<Void>then(Mono.fromRunnable(() -> {
2022-10-05 02:26:30 +02:00
if (sharedTdlibServers != null) {
requestsSub = sharedTdlibServers.requests()
2022-10-11 00:24:51 +02:00
.doOnNext(req -> {
var publisher = localSessions.get(req.data().userId());
if (publisher != null) {
publisher.handleRequest(req.data());
} else {
2022-10-12 18:31:44 +02:00
LOG.debug("Dropped request because no session is found: {}", req);
2022-10-11 00:24:51 +02:00
}
})
2022-09-13 22:15:18 +02:00
.subscribeOn(Schedulers.parallel())
2022-10-06 00:36:00 +02:00
.subscribe(n -> {}, ex -> LOG.error("Requests channel broke unexpectedly", ex));
2022-09-13 22:15:18 +02:00
}
})).transform(ReactorUtils::subscribeOnceUntilUnsubscribe);
2022-01-09 20:20:20 +01:00
}
2022-01-07 23:54:18 +01:00
@Override
public Mono<CreateSessionResponse> createSession(CreateSessionRequest req) {
LOG.debug("Received create session request: {}", req);
2022-09-13 22:15:18 +02:00
if (mode == AtomixReactiveApiMode.CLIENT) {
2022-01-07 23:54:18 +01:00
return Mono.error(new UnsupportedOperationException("This is a client, it can't have own sessions"));
}
2022-06-27 00:06:53 +02:00
// Create the session instance
ReactiveApiPublisher reactiveApiPublisher;
boolean loadedFromDisk;
long userId;
String botToken;
2022-09-22 15:46:31 +02:00
String lane;
2022-06-27 00:06:53 +02:00
Long phoneNumber;
if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
loadedFromDisk = false;
userId = createBotSessionRequest.userId();
botToken = createBotSessionRequest.token();
phoneNumber = null;
2022-09-22 15:46:31 +02:00
lane = createBotSessionRequest.lane();
2022-10-05 02:26:30 +02:00
reactiveApiPublisher = ReactiveApiPublisher.fromToken(sharedTdlibServers, resultingEventTransformerSet,
2022-06-27 00:06:53 +02:00
userId,
2022-09-22 15:46:31 +02:00
botToken,
lane
2022-06-27 00:06:53 +02:00
);
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
loadedFromDisk = false;
userId = createUserSessionRequest.userId();
botToken = null;
phoneNumber = createUserSessionRequest.phoneNumber();
2022-09-22 15:46:31 +02:00
lane = createUserSessionRequest.lane();
2022-10-05 02:26:30 +02:00
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(sharedTdlibServers, resultingEventTransformerSet,
2022-06-27 00:06:53 +02:00
userId,
2022-09-22 15:46:31 +02:00
phoneNumber,
lane
2022-06-27 00:06:53 +02:00
);
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
loadedFromDisk = true;
userId = loadSessionFromDiskRequest.userId();
botToken = loadSessionFromDiskRequest.token();
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
2022-09-22 15:46:31 +02:00
lane = loadSessionFromDiskRequest.lane();
2022-06-27 00:06:53 +02:00
if (loadSessionFromDiskRequest.phoneNumber() != null) {
2022-10-05 02:26:30 +02:00
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(sharedTdlibServers,
2022-06-27 00:06:53 +02:00
resultingEventTransformerSet,
userId,
2022-09-22 15:46:31 +02:00
phoneNumber,
lane
2022-06-27 00:06:53 +02:00
);
2022-01-07 23:54:18 +01:00
} else {
2022-10-05 02:26:30 +02:00
reactiveApiPublisher = ReactiveApiPublisher.fromToken(sharedTdlibServers,
2022-06-27 00:06:53 +02:00
resultingEventTransformerSet,
userId,
2022-09-22 15:46:31 +02:00
botToken,
lane
2022-06-27 00:06:53 +02:00
);
2022-01-07 23:54:18 +01:00
}
2022-06-27 00:06:53 +02:00
} else {
return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
}
2022-01-07 23:54:18 +01:00
2022-06-27 00:06:53 +02:00
// Register the session instance to the local nodes map
var prev = localSessions.put(userId, reactiveApiPublisher);
if (prev != null) {
LOG.error("User id \"{}\" was already registered locally! {}", userId, prev);
}
2022-01-07 23:54:18 +01:00
2022-06-27 00:06:53 +02:00
var saveToDiskMono = Mono
.<Void>fromCallable(() -> {
// Save updated sessions configuration to disk
try {
Objects.requireNonNull(diskSessions);
2022-01-09 20:20:20 +01:00
2022-06-27 00:06:53 +02:00
synchronized (diskSessions) {
diskSessions.save();
return null;
}
} catch (IOException e) {
throw new CompletionException("Failed to save disk sessions configuration", e);
}
})
.subscribeOn(Schedulers.boundedElastic());
2022-01-07 23:54:18 +01:00
2022-06-27 00:06:53 +02:00
// Start the session instance
return Mono
.fromCallable(() -> {
Objects.requireNonNull(diskSessions);
synchronized (diskSessions) {
return Objects.requireNonNull(Paths.get(diskSessions.getSettings().path),
"Session " + userId + " path is missing");
}
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(baseSessionsPath -> {
String diskSessionFolderName = "id" + Long.toUnsignedString(userId);
Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
if (!loadedFromDisk) {
// Create the disk session configuration
2022-09-22 15:46:31 +02:00
var diskSession = new DiskSession(botToken, phoneNumber, lane);
2022-06-27 00:06:53 +02:00
return Mono.<Void>fromCallable(() -> {
Objects.requireNonNull(diskSessions);
synchronized (diskSessions) {
diskSessions.getSettings().userIdToSession().put(userId, diskSession);
return null;
}
}).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath);
2022-01-07 23:54:18 +01:00
} else {
2022-06-27 00:06:53 +02:00
return Mono.just(sessionPath);
2022-01-07 23:54:18 +01:00
}
2022-06-27 00:06:53 +02:00
})
.doOnNext(path -> reactiveApiPublisher.start(path, () -> {
localSessions.remove(userId);
LOG.debug("Closed the session for user {} after it was closed itself", userId);
}))
.thenReturn(new CreateSessionResponse(userId));
2022-01-09 20:20:20 +01:00
}
@Override
2022-09-13 22:15:18 +02:00
public ReactiveApiMultiClient client() {
return client;
2022-01-09 20:20:20 +01:00
}
@Override
2022-06-27 00:06:53 +02:00
public Mono<Void> close() {
closeRequested = true;
2022-10-05 02:26:30 +02:00
Mono<?> serverProducersStopper;
if (sharedTdlibServers != null) {
serverProducersStopper = Mono.fromRunnable(sharedTdlibServers::close).subscribeOn(Schedulers.boundedElastic());
2022-06-27 00:06:53 +02:00
} else {
2022-10-05 02:26:30 +02:00
serverProducersStopper = Mono.empty();
2022-06-27 00:06:53 +02:00
}
2022-10-05 02:26:30 +02:00
Mono<?> clientProducersStopper;
if (sharedTdlibClients != null) {
clientProducersStopper = Mono
.fromRunnable(sharedTdlibClients::close)
2022-09-13 22:15:18 +02:00
.subscribeOn(Schedulers.boundedElastic());
} else {
2022-10-05 02:26:30 +02:00
clientProducersStopper = Mono.empty();
2022-09-13 22:15:18 +02:00
}
if (requestsSub != null) {
requestsSub.dispose();
}
2022-10-05 02:26:30 +02:00
return Mono.when(serverProducersStopper, clientProducersStopper);
2022-01-09 20:20:20 +01:00
}
2022-01-07 23:54:18 +01:00
@Override
2022-06-27 00:06:53 +02:00
public void waitForExit() {
var nanos = Duration.ofSeconds(1).toNanos();
while (!closeRequested && !Thread.interrupted()) {
LockSupport.parkNanos(nanos);
}
2022-01-07 23:54:18 +01:00
}
private record DiskSessionAndId(DiskSession diskSession, long id) {}
2022-01-09 20:20:20 +01:00
private Mono<DiskSessionAndId> getLocalDiskSession(Long localUserId) {
2022-01-07 23:54:18 +01:00
return Mono.fromCallable(() -> {
Objects.requireNonNull(diskSessions);
synchronized (diskSessions) {
2022-01-09 20:20:20 +01:00
var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localUserId),
"Id not found: " + localUserId
2022-01-07 23:54:18 +01:00
);
try {
diskSession.validate();
} catch (Throwable ex) {
2022-01-09 20:20:20 +01:00
LOG.error("Failed to load disk session {}", localUserId, ex);
2022-01-07 23:54:18 +01:00
return null;
}
2022-01-09 20:20:20 +01:00
return new DiskSessionAndId(diskSession, localUserId);
2022-01-07 23:54:18 +01:00
}
}).subscribeOn(Schedulers.boundedElastic());
}
}