Create sessions using reactor core

This commit is contained in:
Andrea Cavalli 2021-12-09 18:48:06 +01:00
parent da61270350
commit a33a7f676a
4 changed files with 99 additions and 88 deletions

View File

@ -122,7 +122,8 @@ public class Cli {
if (!invalid) { if (!invalid) {
api api
.createSession(request) .createSession(request)
.thenAccept(response -> LOG.info("Created a session with live id \"{}\"", response.sessionId())); .doOnNext(response -> LOG.info("Created a session with live id \"{}\"", response.sessionId()))
.block();
} }
} else { } else {
invalid = true; invalid = true;

View File

@ -42,8 +42,6 @@ public class ReactiveApi {
@NotNull @NotNull
private final String nodeId; private final String nodeId;
private final Atomix atomix; private final Atomix atomix;
private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.parallel());
private static final SchedulerExecutor BOUNDED_ELASTIC_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic());
private final AsyncAtomicIdGenerator nextSessionLiveId; private final AsyncAtomicIdGenerator nextSessionLiveId;
private final AsyncAtomicLock sessionModificationLock; private final AsyncAtomicLock sessionModificationLock;
@ -138,21 +136,21 @@ public class ReactiveApi {
var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> { var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> {
var id = diskSessionAndId.id; var id = diskSessionAndId.id;
var diskSession = diskSessionAndId.diskSession; var diskSession = diskSessionAndId.diskSession;
return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id, return createSession(new LoadSessionFromDiskRequest(id,
diskSession.token, diskSession.token,
diskSession.phoneNumber, diskSession.phoneNumber,
true true
))); ));
}).then(); }).then();
var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> { var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> {
var id = diskSessionAndId.id; var id = diskSessionAndId.id;
var diskSession = diskSessionAndId.diskSession; var diskSession = diskSessionAndId.diskSession;
return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id, return createSession(new LoadSessionFromDiskRequest(id,
diskSession.token, diskSession.token,
diskSession.phoneNumber, diskSession.phoneNumber,
false false
))); ));
}).then(); }).then();
var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions) var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions)
@ -166,7 +164,7 @@ public class ReactiveApi {
if (req instanceof LoadSessionFromDiskRequest) { if (req instanceof LoadSessionFromDiskRequest) {
return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster")); return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster"));
} else { } else {
return createSession(req); return createSession(req).toFuture();
} }
}, CreateSessionResponse::serializeBytes)); }, CreateSessionResponse::serializeBytes));
@ -174,7 +172,7 @@ public class ReactiveApi {
} }
private CompletableFuture<Void> destroySession(long userId, String nodeId) { private CompletableFuture<Void> destroySession(long userId, String nodeId) {
LOG.debug("Received session delete request: userid={}, nodeid=\"{}\"", userId, nodeId); LOG.debug("Received session delete request: user_id={}, node_id=\"{}\"", userId, nodeId);
// Lock sessions modification // Lock sessions modification
return sessionModificationLock return sessionModificationLock
@ -192,39 +190,35 @@ public class ReactiveApi {
.whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex)); .whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex));
} }
public CompletableFuture<CreateSessionResponse> createSession(CreateSessionRequest req) { public Mono<CreateSessionResponse> createSession(CreateSessionRequest req) {
LOG.debug("Received create session request: {}", req); LOG.debug("Received create session request: {}", req);
// Lock sessions creation
return sessionModificationLock Mono<CreateSessionResponse> unlockedSessionCreationMono = Mono.defer(() -> {
.lock() LOG.trace("Obtained session modification lock for session request: {}", req);
.thenCompose(lockVersion -> { // Generate session id
LOG.trace("Obtained session modification lock for session request: {}", req); return this
// Generate session id .nextFreeLiveId()
return this.nextFreeLiveId().thenCompose(liveId -> { .flatMap(liveId -> {
// Create the session instance // Create the session instance
ReactiveApiPublisher reactiveApiPublisher; ReactiveApiPublisher reactiveApiPublisher;
boolean loadedFromDisk; boolean loadedFromDisk;
boolean createNew;
long userId; long userId;
String botToken; String botToken;
Long phoneNumber; Long phoneNumber;
if (req instanceof CreateBotSessionRequest createBotSessionRequest) { if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
loadedFromDisk = false; loadedFromDisk = false;
createNew = true;
userId = createBotSessionRequest.userId(); userId = createBotSessionRequest.userId();
botToken = createBotSessionRequest.token(); botToken = createBotSessionRequest.token();
phoneNumber = null; phoneNumber = null;
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) { } else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
loadedFromDisk = false; loadedFromDisk = false;
createNew = true;
userId = createUserSessionRequest.userId(); userId = createUserSessionRequest.userId();
botToken = null; botToken = null;
phoneNumber = createUserSessionRequest.phoneNumber(); phoneNumber = createUserSessionRequest.phoneNumber();
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber); reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) { } else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
loadedFromDisk = true; loadedFromDisk = true;
createNew = loadSessionFromDiskRequest.createNew();
userId = loadSessionFromDiskRequest.userId(); userId = loadSessionFromDiskRequest.userId();
botToken = loadSessionFromDiskRequest.token(); botToken = loadSessionFromDiskRequest.token();
phoneNumber = loadSessionFromDiskRequest.phoneNumber(); phoneNumber = loadSessionFromDiskRequest.phoneNumber();
@ -234,7 +228,7 @@ public class ReactiveApi {
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken); reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
} }
} else { } else {
return failedFuture(new UnsupportedOperationException("Unexpected value: " + req)); return Mono.error(new UnsupportedOperationException("Unexpected value: " + req));
} }
// Register the session instance to the local nodes map // Register the session instance to the local nodes map
@ -244,57 +238,73 @@ public class ReactiveApi {
} }
// Register the session instance to the distributed nodes map // Register the session instance to the distributed nodes map
return userIdToNodeId.put(userId, nodeId).thenComposeAsync(prevDistributed -> { return Mono
if (prevDistributed != null && prevDistributed.value() != null && .fromCompletionStage(() -> userIdToNodeId.put(userId, nodeId))
!Objects.equals(this.nodeId, prevDistributed.value())) { .flatMap(prevDistributed -> {
LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value()); if (prevDistributed != null && prevDistributed.value() != null &&
} !Objects.equals(this.nodeId, prevDistributed.value())) {
LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value());
Path baseSessionsPath;
synchronized (diskSessions) {
baseSessionsPath = Paths.get(diskSessions.getSettings().path);
}
String diskSessionFolderName = Long.toUnsignedString(userId);
Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
CompletableFuture<?> saveToDiskFuture;
if (!loadedFromDisk) {
// Create the disk session configuration
var diskSession = new DiskSession(botToken, phoneNumber);
synchronized (diskSessions) {
diskSessions.getSettings().userIdToSession().put(userId, diskSession);
}
saveToDiskFuture = CompletableFuture.runAsync(() -> {
// Save updated sessions configuration to disk
try {
synchronized (diskSessions) {
diskSessions.save();
}
} catch (IOException e) {
throw new CompletionException("Failed to save disk sessions configuration", e);
} }
}, BOUNDED_ELASTIC_EXECUTOR);
} else {
saveToDiskFuture = completedFuture(null);
}
// Start the session instance var saveToDiskMono = Mono
reactiveApiPublisher.start(sessionPath); .<Void>fromCallable(() -> {
// Save updated sessions configuration to disk
try {
synchronized (diskSessions) {
diskSessions.save();
return null;
}
} catch (IOException e) {
throw new CompletionException("Failed to save disk sessions configuration", e);
}
})
.subscribeOn(Schedulers.boundedElastic());
return saveToDiskFuture.thenApply(ignored -> new CreateSessionResponse(liveId)); // Start the session instance
}, BOUNDED_ELASTIC_EXECUTOR); return Mono
.fromCallable(() -> {
synchronized (diskSessions) {
return Paths.get(diskSessions.getSettings().path);
}
})
.subscribeOn(Schedulers.boundedElastic())
.flatMap(baseSessionsPath -> {
String diskSessionFolderName = Long.toUnsignedString(userId);
Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
if (!loadedFromDisk) {
// Create the disk session configuration
var diskSession = new DiskSession(botToken, phoneNumber);
return Mono.<Void>fromCallable(() -> {
synchronized (diskSessions) {
diskSessions.getSettings().userIdToSession().put(userId, diskSession);
return null;
}
}).subscribeOn(Schedulers.boundedElastic()).then(saveToDiskMono).thenReturn(sessionPath);
} else {
return Mono.just(sessionPath);
}
})
.doOnNext(reactiveApiPublisher::start)
.thenReturn(new CreateSessionResponse(liveId));
});
}); });
}) });
.whenComplete((response, error) -> sessionModificationLock
.unlock() // Lock sessions creation
.thenRun(() -> LOG.trace("Released session modification lock for session request: {}", req)) return Mono
.usingWhen(Mono.fromCompletionStage(sessionModificationLock::lock),
lockVersion -> unlockedSessionCreationMono,
lockVersion -> Mono
.fromCompletionStage(sessionModificationLock::unlock)
.doOnTerminate(() -> LOG.trace("Released session modification lock for session request: {}", req))
) )
.whenComplete((resp, ex) -> LOG.debug("Handled session request {}, the response is: {}", req, resp, ex)); .doOnNext(resp -> LOG.debug("Handled session request {}, the response is: {}", req, resp))
.doOnError(ex -> LOG.debug("Handled session request {}, the response is: error", req, ex));
} }
public CompletableFuture<Long> nextFreeLiveId() { public Mono<Long> nextFreeLiveId() {
return nextSessionLiveId.nextId(); return Mono.fromCompletionStage(nextSessionLiveId::nextId);
} }
public Atomix getAtomix() { public Atomix getAtomix() {
@ -327,7 +337,7 @@ public class ReactiveApi {
return this.nodeId.equals(nodeId); return this.nodeId.equals(nodeId);
} }
private static record DiskSessionAndId(DiskSession diskSession, long id) {} private record DiskSessionAndId(DiskSession diskSession, long id) {}
private Mono<DiskSessionAndId> getLocalDiskSession(Long localId) { private Mono<DiskSessionAndId> getLocalDiskSession(Long localId) {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {

View File

@ -55,7 +55,6 @@ public class ReactiveApiPublisher {
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class); private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class);
private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10); private static final Duration SPECIAL_RAW_TIMEOUT_DURATION = Duration.ofSeconds(10);
private final Atomix atomix;
private final ClusterEventService eventService; private final ClusterEventService eventService;
private final ReactiveTelegramClient rawTelegramClient; private final ReactiveTelegramClient rawTelegramClient;
private final Flux<Signal> telegramClient; private final Flux<Signal> telegramClient;
@ -66,10 +65,9 @@ public class ReactiveApiPublisher {
private final String botToken; private final String botToken;
private final Long phoneNumber; private final Long phoneNumber;
private AtomicReference<Disposable> disposable = new AtomicReference<>(); private final AtomicReference<Disposable> disposable = new AtomicReference<>();
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) { private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) {
this.atomix = atomix;
this.userId = userId; this.userId = userId;
this.liveId = liveId; this.liveId = liveId;
this.botToken = botToken; this.botToken = botToken;
@ -119,7 +117,7 @@ public class ReactiveApiPublisher {
.filter(s -> s instanceof ClientBoundResultingEvent) .filter(s -> s instanceof ClientBoundResultingEvent)
.cast(ClientBoundResultingEvent.class) .cast(ClientBoundResultingEvent.class)
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-clientbound-events", .subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-client-bound-events",
clientBoundResultingEvent.event(), clientBoundResultingEvent.event(),
ReactiveApiPublisher::serializeEvent ReactiveApiPublisher::serializeEvent
)); ));
@ -141,11 +139,12 @@ public class ReactiveApiPublisher {
var update = (TdApi.Update) signal.getUpdate(); var update = (TdApi.Update) signal.getUpdate();
return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update)); return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update));
} else { } else {
LOG.trace("Signal has not been broadcasted because the session {} is not logged in: {}", userId, signal); LOG.trace("Signal has not been broadcast because the session {} is not logged in: {}", userId, signal);
return this.handleSpecialSignal(state, signal); return this.handleSpecialSignal(state, signal);
} }
} }
@SuppressWarnings("SwitchStatementWithTooFewBranches")
@Nullable @Nullable
private ResultingEvent handleSpecialSignal(State state, Signal signal) { private ResultingEvent handleSpecialSignal(State state, Signal signal) {
if (signal.isException()) { if (signal.isException()) {
@ -224,24 +223,24 @@ public class ReactiveApiPublisher {
} }
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) { private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
try (var baos = new ByteArrayOutputStream()) { try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var daos = new DataOutputStream(baos)) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
if (clientBoundEvent instanceof OnUpdateData onUpdateData) { if (clientBoundEvent instanceof OnUpdateData onUpdateData) {
daos.write(0x1); dataOutputStream.write(0x1);
onUpdateData.update().serialize(daos); onUpdateData.update().serialize(dataOutputStream);
} else if (clientBoundEvent instanceof OnUpdateError onUpdateError) { } else if (clientBoundEvent instanceof OnUpdateError onUpdateError) {
daos.write(0x2); dataOutputStream.write(0x2);
onUpdateError.error().serialize(daos); onUpdateError.error().serialize(dataOutputStream);
} else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) { } else if (clientBoundEvent instanceof OnUserLoginCodeRequested onUserLoginCodeRequested) {
daos.write(0x3); dataOutputStream.write(0x3);
daos.writeLong(onUserLoginCodeRequested.phoneNumber()); dataOutputStream.writeLong(onUserLoginCodeRequested.phoneNumber());
} else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) { } else if (clientBoundEvent instanceof OnBotLoginCodeRequested onBotLoginCodeRequested) {
daos.write(0x4); dataOutputStream.write(0x4);
daos.writeUTF(onBotLoginCodeRequested.token()); dataOutputStream.writeUTF(onBotLoginCodeRequested.token());
} else { } else {
throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent); throw new UnsupportedOperationException("Unexpected value: " + clientBoundEvent);
} }
return baos.toByteArray(); return byteArrayOutputStream.toByteArray();
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new SerializationException(ex); throw new SerializationException(ex);
@ -259,11 +258,11 @@ public class ReactiveApiPublisher {
private static byte[] serializeResponse(Response response) { private static byte[] serializeResponse(Response response) {
var id = response.getId(); var id = response.getId();
var object = response.getObject(); var object = response.getObject();
try (var baos = new ByteArrayOutputStream()) { try (var byteArrayOutputStream = new ByteArrayOutputStream()) {
try (var daos = new DataOutputStream(baos)) { try (var dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
daos.writeLong(id); dataOutputStream.writeLong(id);
object.serialize(daos); object.serialize(dataOutputStream);
return baos.toByteArray(); return byteArrayOutputStream.toByteArray();
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new SerializationException(ex); throw new SerializationException(ex);

View File

@ -4,6 +4,7 @@ import java.util.concurrent.Executor;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
@SuppressWarnings("ClassCanBeRecord")
public class SchedulerExecutor implements Executor { public class SchedulerExecutor implements Executor {
private final Scheduler scheduler; private final Scheduler scheduler;