Rewrite requests management
This commit is contained in:
parent
f0d5706d77
commit
da61270350
@ -11,6 +11,7 @@ import it.tdlight.jni.TdApi;
|
|||||||
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
|
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
|
||||||
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
import it.tdlight.jni.TdApi.CheckDatabaseEncryptionKey;
|
||||||
import it.tdlight.jni.TdApi.Close;
|
import it.tdlight.jni.TdApi.Close;
|
||||||
|
import it.tdlight.jni.TdApi.Function;
|
||||||
import it.tdlight.jni.TdApi.Object;
|
import it.tdlight.jni.TdApi.Object;
|
||||||
import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings;
|
import it.tdlight.jni.TdApi.PhoneNumberAuthenticationSettings;
|
||||||
import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber;
|
import it.tdlight.jni.TdApi.SetAuthenticationPhoneNumber;
|
||||||
@ -23,6 +24,8 @@ import it.tdlight.reactiveapi.Event.OnUpdateData;
|
|||||||
import it.tdlight.reactiveapi.Event.OnUpdateError;
|
import it.tdlight.reactiveapi.Event.OnUpdateError;
|
||||||
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
|
import it.tdlight.reactiveapi.Event.OnUserLoginCodeRequested;
|
||||||
import it.tdlight.reactiveapi.Event.Request;
|
import it.tdlight.reactiveapi.Event.Request;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
import it.tdlight.tdlight.ClientManager;
|
import it.tdlight.tdlight.ClientManager;
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
@ -35,11 +38,15 @@ import java.time.Instant;
|
|||||||
import java.util.StringJoiner;
|
import java.util.StringJoiner;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import javax.annotation.Nullable;
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.apache.commons.lang3.SerializationException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import reactor.core.Disposable;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.publisher.Sinks;
|
||||||
|
import reactor.core.publisher.Sinks.Many;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
public class ReactiveApiPublisher {
|
public class ReactiveApiPublisher {
|
||||||
@ -52,12 +59,15 @@ public class ReactiveApiPublisher {
|
|||||||
private final ClusterEventService eventService;
|
private final ClusterEventService eventService;
|
||||||
private final ReactiveTelegramClient rawTelegramClient;
|
private final ReactiveTelegramClient rawTelegramClient;
|
||||||
private final Flux<Signal> telegramClient;
|
private final Flux<Signal> telegramClient;
|
||||||
|
|
||||||
private final AtomicReference<State> state = new AtomicReference<>(new State(LOGGED_OUT));
|
private final AtomicReference<State> state = new AtomicReference<>(new State(LOGGED_OUT));
|
||||||
private final long userId;
|
private final long userId;
|
||||||
private final long liveId;
|
private final long liveId;
|
||||||
private final String botToken;
|
private final String botToken;
|
||||||
private final Long phoneNumber;
|
private final Long phoneNumber;
|
||||||
|
|
||||||
|
private AtomicReference<Disposable> disposable = new AtomicReference<>();
|
||||||
|
|
||||||
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) {
|
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) {
|
||||||
this.atomix = atomix;
|
this.atomix = atomix;
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
@ -86,36 +96,70 @@ public class ReactiveApiPublisher {
|
|||||||
|
|
||||||
public void start(Path path) {
|
public void start(Path path) {
|
||||||
LOG.info("Starting session \"{}\" in path \"{}\"", this, path);
|
LOG.info("Starting session \"{}\" in path \"{}\"", this, path);
|
||||||
telegramClient.subscribeOn(Schedulers.parallel()).subscribe(this::onSignal);
|
var publishedResultingEvents = telegramClient
|
||||||
|
.subscribeOn(Schedulers.parallel())
|
||||||
|
.mapNotNull(this::onSignal)
|
||||||
|
.publish();
|
||||||
|
|
||||||
|
publishedResultingEvents
|
||||||
|
.filter(s -> s instanceof TDLibBoundResultingEvent<?>)
|
||||||
|
.map(s -> ((TDLibBoundResultingEvent<?>) s).action())
|
||||||
|
.flatMap(function -> Mono
|
||||||
|
.from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION))
|
||||||
|
.doOnNext(resp -> {
|
||||||
|
if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
||||||
|
LOG.error("Received error for special request {}: {}", function, resp);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.doOnError(ex -> LOG.error("Failed to receive the response for special request {}", function, ex))
|
||||||
|
)
|
||||||
|
.subscribeOn(Schedulers.parallel())
|
||||||
|
.subscribe();
|
||||||
|
publishedResultingEvents
|
||||||
|
.filter(s -> s instanceof ClientBoundResultingEvent)
|
||||||
|
.cast(ClientBoundResultingEvent.class)
|
||||||
|
.subscribeOn(Schedulers.parallel())
|
||||||
|
.subscribe(clientBoundResultingEvent -> eventService.broadcast("session-" + liveId + "-clientbound-events",
|
||||||
|
clientBoundResultingEvent.event(),
|
||||||
|
ReactiveApiPublisher::serializeEvent
|
||||||
|
));
|
||||||
|
|
||||||
|
|
||||||
|
var prev = this.disposable.getAndSet(publishedResultingEvents.connect());
|
||||||
|
if (prev != null) {
|
||||||
|
LOG.error("The API started twice!");
|
||||||
|
prev.dispose();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onSignal(Signal signal) {
|
@Nullable
|
||||||
|
private ResultingEvent onSignal(Signal signal) {
|
||||||
// Update the state
|
// Update the state
|
||||||
var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal));
|
var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal));
|
||||||
|
|
||||||
if (state.authPhase() == LOGGED_IN) {
|
if (state.authPhase() == LOGGED_IN) {
|
||||||
var update = (TdApi.Update) signal.getUpdate();
|
var update = (TdApi.Update) signal.getUpdate();
|
||||||
var event = new OnUpdateData(liveId, userId, update);
|
return new ClientBoundResultingEvent(new OnUpdateData(liveId, userId, update));
|
||||||
sendEvent(event);
|
|
||||||
} else {
|
} else {
|
||||||
LOG.trace("Signal has not been broadcasted because the session {} is not logged in: {}", userId, signal);
|
LOG.trace("Signal has not been broadcasted because the session {} is not logged in: {}", userId, signal);
|
||||||
this.handleSpecialSignal(state, signal);
|
return this.handleSpecialSignal(state, signal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleSpecialSignal(State state, Signal signal) {
|
@Nullable
|
||||||
|
private ResultingEvent handleSpecialSignal(State state, Signal signal) {
|
||||||
if (signal.isException()) {
|
if (signal.isException()) {
|
||||||
LOG.error("Received an error signal", signal.getException());
|
LOG.error("Received an error signal", signal.getException());
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
||||||
var error = ((TdApi.Error) signal.getUpdate());
|
var error = ((TdApi.Error) signal.getUpdate());
|
||||||
LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message);
|
LOG.error("Received a TDLib error signal! Error {}: {}", error.code, error.message);
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
if (!signal.isUpdate()) {
|
if (!signal.isUpdate()) {
|
||||||
LOG.error("Received a signal that's not an update: {}", signal);
|
LOG.error("Received a signal that's not an update: {}", signal);
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
var update = signal.getUpdate();
|
var update = signal.getUpdate();
|
||||||
switch (state.authPhase()) {
|
switch (state.authPhase()) {
|
||||||
@ -125,7 +169,9 @@ public class ReactiveApiPublisher {
|
|||||||
case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> {
|
case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> {
|
||||||
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
||||||
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
||||||
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> sendSpecialRaw(new SetTdlibParameters());
|
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> {
|
||||||
|
return new TDLibBoundResultingEvent<>(new SetTdlibParameters());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -135,8 +181,9 @@ public class ReactiveApiPublisher {
|
|||||||
case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> {
|
case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> {
|
||||||
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
||||||
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
||||||
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR ->
|
case TdApi.AuthorizationStateWaitTdlibParameters.CONSTRUCTOR -> {
|
||||||
sendSpecialRaw(new CheckDatabaseEncryptionKey());
|
return new TDLibBoundResultingEvent<>(new CheckDatabaseEncryptionKey());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -146,21 +193,26 @@ public class ReactiveApiPublisher {
|
|||||||
case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> {
|
case TdApi.UpdateAuthorizationState.CONSTRUCTOR -> {
|
||||||
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
|
||||||
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
switch (updateAuthorizationState.authorizationState.getConstructor()) {
|
||||||
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR ->
|
case TdApi.AuthorizationStateWaitCode.CONSTRUCTOR -> {
|
||||||
sendEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber));
|
return new ClientBoundResultingEvent(new OnUserLoginCodeRequested(liveId, userId, phoneNumber));
|
||||||
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR ->
|
}
|
||||||
sendEvent(new OnOtherDeviceLoginRequested(liveId, userId));
|
case TdApi.AuthorizationStateWaitOtherDeviceConfirmation.CONSTRUCTOR -> {
|
||||||
case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR ->
|
return new ClientBoundResultingEvent(new OnOtherDeviceLoginRequested(liveId, userId));
|
||||||
sendEvent(new OnPasswordRequested(liveId, userId));
|
}
|
||||||
|
case TdApi.AuthorizationStateWaitPassword.CONSTRUCTOR -> {
|
||||||
|
return new ClientBoundResultingEvent(new OnPasswordRequested(liveId, userId));
|
||||||
|
}
|
||||||
case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> {
|
case TdApi.AuthorizationStateWaitPhoneNumber.CONSTRUCTOR -> {
|
||||||
if (botToken != null) {
|
if (botToken != null) {
|
||||||
sendSpecialRaw(new CheckAuthenticationBotToken(botToken));
|
return new TDLibBoundResultingEvent<>(new CheckAuthenticationBotToken(botToken));
|
||||||
} else {
|
} else {
|
||||||
var authSettings = new PhoneNumberAuthenticationSettings();
|
var authSettings = new PhoneNumberAuthenticationSettings();
|
||||||
authSettings.allowFlashCall = false;
|
authSettings.allowFlashCall = false;
|
||||||
authSettings.allowSmsRetrieverApi = false;
|
authSettings.allowSmsRetrieverApi = false;
|
||||||
authSettings.isCurrentPhoneNumber = false;
|
authSettings.isCurrentPhoneNumber = false;
|
||||||
sendSpecialRaw(new SetAuthenticationPhoneNumber("+" + phoneNumber, authSettings));
|
return new TDLibBoundResultingEvent<>(new SetAuthenticationPhoneNumber("+" + phoneNumber,
|
||||||
|
authSettings
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -168,24 +220,7 @@ public class ReactiveApiPublisher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return null;
|
||||||
|
|
||||||
private void sendEvent(ClientBoundEvent clientBoundEvent) {
|
|
||||||
eventService.broadcast("session-" + liveId + "-clientbound-events",
|
|
||||||
clientBoundEvent,
|
|
||||||
ReactiveApiPublisher::serializeEvent
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void sendSpecialRaw(TdApi.Function<?> function) {
|
|
||||||
Mono
|
|
||||||
.from(rawTelegramClient.send(function, SPECIAL_RAW_TIMEOUT_DURATION))
|
|
||||||
.subscribeOn(Schedulers.parallel())
|
|
||||||
.subscribe(resp -> {
|
|
||||||
if (resp.getConstructor() == TdApi.Error.CONSTRUCTOR) {
|
|
||||||
LOG.error("Received error for special request {}: {}", function, resp);
|
|
||||||
}
|
|
||||||
}, ex -> LOG.error("Failed to receive the response for special request {}", function, ex));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
|
private static byte[] serializeEvent(ClientBoundEvent clientBoundEvent) {
|
||||||
|
13
src/main/java/it/tdlight/reactiveapi/ResultingEvent.java
Normal file
13
src/main/java/it/tdlight/reactiveapi/ResultingEvent.java
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import it.tdlight.jni.TdApi;
|
||||||
|
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
|
||||||
|
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
|
||||||
|
|
||||||
|
public sealed interface ResultingEvent permits ClientBoundResultingEvent, TDLibBoundResultingEvent {
|
||||||
|
|
||||||
|
record ClientBoundResultingEvent(ClientBoundEvent event) implements ResultingEvent {}
|
||||||
|
|
||||||
|
record TDLibBoundResultingEvent<T extends TdApi.Object>(TdApi.Function<T> action) implements ResultingEvent {}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user