From a33a7f676ae99037d76a46299e9441258bde6444 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 9 Dec 2021 18:48:06 +0100 Subject: [PATCH] Create sessions using reactor core --- src/main/java/it/tdlight/reactiveapi/Cli.java | 3 +- .../it/tdlight/reactiveapi/ReactiveApi.java | 142 ++++++++++-------- .../reactiveapi/ReactiveApiPublisher.java | 41 +++-- .../reactiveapi/SchedulerExecutor.java | 1 + 4 files changed, 99 insertions(+), 88 deletions(-) diff --git a/src/main/java/it/tdlight/reactiveapi/Cli.java b/src/main/java/it/tdlight/reactiveapi/Cli.java index 490c683..2510105 100644 --- a/src/main/java/it/tdlight/reactiveapi/Cli.java +++ b/src/main/java/it/tdlight/reactiveapi/Cli.java @@ -122,7 +122,8 @@ public class Cli { if (!invalid) { api .createSession(request) - .thenAccept(response -> LOG.info("Created a session with live id \"{}\"", response.sessionId())); + .doOnNext(response -> LOG.info("Created a session with live id \"{}\"", response.sessionId())) + .block(); } } else { invalid = true; diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java index ac9044c..d488ecf 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java @@ -42,8 +42,6 @@ public class ReactiveApi { @NotNull private final String nodeId; private final Atomix atomix; - private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.parallel()); - private static final SchedulerExecutor BOUNDED_ELASTIC_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic()); private final AsyncAtomicIdGenerator nextSessionLiveId; private final AsyncAtomicLock sessionModificationLock; @@ -138,21 +136,21 @@ public class ReactiveApi { var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> { var id = diskSessionAndId.id; var diskSession = diskSessionAndId.diskSession; - return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id, + return createSession(new LoadSessionFromDiskRequest(id, diskSession.token, diskSession.phoneNumber, true - ))); + )); }).then(); var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> { var id = diskSessionAndId.id; var diskSession = diskSessionAndId.diskSession; - return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id, + return createSession(new LoadSessionFromDiskRequest(id, diskSession.token, diskSession.phoneNumber, false - ))); + )); }).then(); var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions) @@ -166,7 +164,7 @@ public class ReactiveApi { if (req instanceof LoadSessionFromDiskRequest) { return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster")); } else { - return createSession(req); + return createSession(req).toFuture(); } }, CreateSessionResponse::serializeBytes)); @@ -174,7 +172,7 @@ public class ReactiveApi { } private CompletableFuture destroySession(long userId, String nodeId) { - LOG.debug("Received session delete request: userid={}, nodeid=\"{}\"", userId, nodeId); + LOG.debug("Received session delete request: user_id={}, node_id=\"{}\"", userId, nodeId); // Lock sessions modification return sessionModificationLock @@ -192,39 +190,35 @@ public class ReactiveApi { .whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex)); } - public CompletableFuture createSession(CreateSessionRequest req) { + public Mono createSession(CreateSessionRequest req) { LOG.debug("Received create session request: {}", req); - // Lock sessions creation - return sessionModificationLock - .lock() - .thenCompose(lockVersion -> { - LOG.trace("Obtained session modification lock for session request: {}", req); - // Generate session id - return this.nextFreeLiveId().thenCompose(liveId -> { + + Mono unlockedSessionCreationMono = Mono.defer(() -> { + LOG.trace("Obtained session modification lock for session request: {}", req); + // Generate session id + return this + .nextFreeLiveId() + .flatMap(liveId -> { // Create the session instance ReactiveApiPublisher reactiveApiPublisher; boolean loadedFromDisk; - boolean createNew; long userId; String botToken; Long phoneNumber; if (req instanceof CreateBotSessionRequest createBotSessionRequest) { loadedFromDisk = false; - createNew = true; userId = createBotSessionRequest.userId(); botToken = createBotSessionRequest.token(); phoneNumber = null; reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) { loadedFromDisk = false; - createNew = true; userId = createUserSessionRequest.userId(); botToken = null; phoneNumber = createUserSessionRequest.phoneNumber(); reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber); } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) { loadedFromDisk = true; - createNew = loadSessionFromDiskRequest.createNew(); userId = loadSessionFromDiskRequest.userId(); botToken = loadSessionFromDiskRequest.token(); phoneNumber = loadSessionFromDiskRequest.phoneNumber(); @@ -234,7 +228,7 @@ public class ReactiveApi { reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); } } else { - return failedFuture(new UnsupportedOperationException("Unexpected value: " + req)); + return Mono.error(new UnsupportedOperationException("Unexpected value: " + req)); } // Register the session instance to the local nodes map @@ -244,57 +238,73 @@ public class ReactiveApi { } // Register the session instance to the distributed nodes map - return userIdToNodeId.put(userId, nodeId).thenComposeAsync(prevDistributed -> { - if (prevDistributed != null && prevDistributed.value() != null && - !Objects.equals(this.nodeId, prevDistributed.value())) { - LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value()); - } - - Path baseSessionsPath; - synchronized (diskSessions) { - baseSessionsPath = Paths.get(diskSessions.getSettings().path); - } - String diskSessionFolderName = Long.toUnsignedString(userId); - Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName); - - CompletableFuture saveToDiskFuture; - if (!loadedFromDisk) { - // Create the disk session configuration - var diskSession = new DiskSession(botToken, phoneNumber); - synchronized (diskSessions) { - diskSessions.getSettings().userIdToSession().put(userId, diskSession); - } - - saveToDiskFuture = CompletableFuture.runAsync(() -> { - // Save updated sessions configuration to disk - try { - synchronized (diskSessions) { - diskSessions.save(); - } - } catch (IOException e) { - throw new CompletionException("Failed to save disk sessions configuration", e); + return Mono + .fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId)) + .flatMap(prevDistributed -> { + if (prevDistributed != null && prevDistributed.value() != null && + !Objects.equals(this.nodeId, prevDistributed.value())) { + LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value()); } - }, BOUNDED_ELASTIC_EXECUTOR); - } else { - saveToDiskFuture = completedFuture(null); - } - // Start the session instance - reactiveApiPublisher.start(sessionPath); + var saveToDiskMono = Mono + .fromCallable(() -> { + // Save updated sessions configuration to disk + try { + synchronized (diskSessions) { + diskSessions.save(); + return null; + } + } catch (IOException e) { + throw new CompletionException("Failed to save disk sessions configuration", e); + } + }) + .subscribeOn(Schedulers.boundedElastic()); - return saveToDiskFuture.thenApply(ignored -> new CreateSessionResponse(liveId)); - }, BOUNDED_ELASTIC_EXECUTOR); + // Start the session instance + return Mono + .fromCallable(() -> { + synchronized (diskSessions) { + return Paths.get(diskSessions.getSettings().path); + } + }) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(baseSessionsPath -> { + String diskSessionFolderName = Long.toUnsignedString(userId); + Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName); + + if (!loadedFromDisk) { + // Create the disk session configuration + var diskSession = new DiskSession(botToken, phoneNumber); + return Mono.fromCallable(() -> { + synchronized (diskSessions) { + diskSessions.getSettings().userIdToSession().put(userId, diskSession); + return null; + } + }).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath); + } else { + return Mono.just(sessionPath); + } + }) + .doOnNext(reactiveApiPublisher::start) + .thenReturn(new CreateSessionResponse(liveId)); + }); }); - }) - .whenComplete((response, error) -> sessionModificationLock - .unlock() - .thenRun(() -> LOG.trace("Released session modification lock for session request: {}", req)) + }); + + // Lock sessions creation + return Mono + .usingWhen(Mono.fromCompletionStage(sessionModificationLock::lock), + lockVersion -> unlockedSessionCreationMono, + lockVersion -> Mono + .fromCompletionStage(sessionModificationLock::unlock) + .doOnTerminate(() -> LOG.trace("Released session modification lock for session request: {}", req)) ) - .whenComplete((resp, ex) -> LOG.debug("Handled session request {}, the response is: {}", req, resp, ex)); + .doOnNext(resp -> LOG.debug("Handled session request {}, the response is: {}", req, resp)) + .doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex)); } - public CompletableFuture nextFreeLiveId() { - return nextSessionLiveId.nextId(); + public Mono nextFreeLiveId() { + return Mono.fromCompletionStage(nextSessionLiveId::nextId); } public Atomix getAtomix() { @@ -327,7 +337,7 @@ public class ReactiveApi { return this.nodeId.equals(nodeId); } - private static record DiskSessionAndId(DiskSession diskSession, long id) {} + private record DiskSessionAndId(DiskSession diskSession, long id) {} private Mono getLocalDiskSession(Long localId) { return Mono.fromCallable(() -> { diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 8e0c9a0..3155d7d 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -55,7 +55,6 @@ public class ReactiveApiPublisher { private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10); - private final Atomix atomix; private final ClusterEventService eventService; private final ReactiveTelegramClient rawTelegramClient; private final Flux telegramClient; @@ -66,10 +65,9 @@ public class ReactiveApiPublisher { private final String botToken; private final Long phoneNumber; - private AtomicReference disposable = new AtomicReference<>(); + private final AtomicReference disposable = new AtomicReference<>(); private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) { - this.atomix = atomix; this.userId = userId; this.liveId = liveId; this.botToken = botToken; @@ -119,7 +117,7 @@ public class ReactiveApiPublisher { .filter(s -> s instanceof ClientBoundResultingEvent) .cast(ClientBoundResultingEvent.class) .subscribeOn(Schedulers.parallel()) - .subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-clientbound-events", + .subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events", clientBoundResultingEvent.event(), ReactiveApiPublisher::serializeEvent )); @@ -141,11 +139,12 @@ public class ReactiveApiPublisher { var update = (TdApi.Update) signal.getUpdate(); return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update)); } else { - LOG.trace("Signal has not been broadcasted because the session {} is not logged in: {}", userId, signal); + LOG.trace("Signal has not been broadcast because the session {} is not logged in: {}", userId, signal); return this.handleSpecialSignal(state, signal); } } + @SuppressWarnings("SwitchStatementWithTooFewBranches") @Nullable private ResultingEvent handleSpecialSignal(State state, Signal signal) { if (signal.isException()) { @@ -224,24 +223,24 @@ public class ReactiveApiPublisher { } private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { - try (var baos = new ByteArrayOutputStream()) { - try (var daos = new DataOutputStream(baos)) { + try (var byteArrayOutputStream = new ByteArrayOutputStream()) { + try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { if (clientBoundEvent instanceof OnUpdateData onUpdateData) { - daos.write(0x1); - onUpdateData.update().serialize(daos); + dataOutputStream.write(0x1); + onUpdateData.update().serialize(dataOutputStream); } else if (clientBoundEvent instanceof OnUpdateError onUpdateError) { - daos.write(0x2); - onUpdateError.error().serialize(daos); + dataOutputStream.write(0x2); + onUpdateError.error().serialize(dataOutputStream); } else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) { - daos.write(0x3); - daos.writeLong(onUserLoginCodeRequested.phoneNumber()); + dataOutputStream.write(0x3); + dataOutputStream.writeLong(onUserLoginCodeRequested.phoneNumber()); } else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) { - daos.write(0x4); - daos.writeUTF(onBotLoginCodeRequested.token()); + dataOutputStream.write(0x4); + dataOutputStream.writeUTF(onBotLoginCodeRequested.token()); } else { throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent); } - return baos.toByteArray(); + return byteArrayOutputStream.toByteArray(); } } catch (IOException ex) { throw new SerializationException(ex); @@ -259,11 +258,11 @@ public class ReactiveApiPublisher { private static byte[] serializeResponse(Response response) { var id = response.getId(); var object = response.getObject(); - try (var baos = new ByteArrayOutputStream()) { - try (var daos = new DataOutputStream(baos)) { - daos.writeLong(id); - object.serialize(daos); - return baos.toByteArray(); + try (var byteArrayOutputStream = new ByteArrayOutputStream()) { + try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + dataOutputStream.writeLong(id); + object.serialize(dataOutputStream); + return byteArrayOutputStream.toByteArray(); } } catch (IOException ex) { throw new SerializationException(ex); diff --git a/src/main/java/it/tdlight/reactiveapi/SchedulerExecutor.java b/src/main/java/it/tdlight/reactiveapi/SchedulerExecutor.java index 1bc8140..1c004dd 100644 --- a/src/main/java/it/tdlight/reactiveapi/SchedulerExecutor.java +++ b/src/main/java/it/tdlight/reactiveapi/SchedulerExecutor.java @@ -4,6 +4,7 @@ import java.util.concurrent.Executor; import org.jetbrains.annotations.NotNull; import reactor.core.scheduler.Scheduler; +@SuppressWarnings("ClassCanBeRecord") public class SchedulerExecutor implements Executor { private final Scheduler scheduler;