Implement periodic restarter

This commit is contained in:
Andrea Cavalli 2022-01-09 20:20:20 +01:00
parent 5b9fec980e
commit 172c770524
10 changed files with 336 additions and 34 deletions

View File

@ -55,7 +55,7 @@ public class AtomixReactiveApi implements ReactiveApi {
private final AsyncAtomicLock sessionModificationLock;
private final AsyncAtomicMap<Long, String> userIdToNodeId;
/**
* User id -> session
* live id -> session
*/
private final ConcurrentMap<Long, ReactiveApiPublisher> localLiveSessions = new ConcurrentHashMap<>();
/**
@ -193,9 +193,9 @@ public class AtomixReactiveApi implements ReactiveApi {
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
// Listen for create-session signals
Mono<Subscription> subscriptionMono;
Mono<Subscription> createSessionSubscriptionMono;
if (nodeId != null) {
subscriptionMono = fromCompletionStage(() -> atomix
createSessionSubscriptionMono = fromCompletionStage(() -> atomix
.getEventService()
.subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> {
if (req instanceof LoadSessionFromDiskRequest) {
@ -205,9 +205,24 @@ public class AtomixReactiveApi implements ReactiveApi {
}
}, CreateSessionResponse::serializeBytes));
} else {
subscriptionMono = Mono.empty();
createSessionSubscriptionMono = Mono.empty();
}
return diskInitMono.then(subscriptionMono).then();
// Listen for revive-session signals
Mono<Subscription> reviveSessionSubscriptionMono;
if (nodeId != null) {
reviveSessionSubscriptionMono = fromCompletionStage(() -> atomix
.getEventService()
.subscribe("revive-session", (Long userId) -> this.getLocalDiskSession(userId).flatMap(sessionAndId -> {
var diskSession = sessionAndId.diskSession();
var request = new LoadSessionFromDiskRequest(userId, diskSession.token, diskSession.phoneNumber, false);
return this.createSession(request);
}).onErrorResume(ex -> Mono.empty()).then(Mono.empty()).toFuture()));
} else {
reviveSessionSubscriptionMono = Mono.empty();
}
return diskInitMono.then(Mono.when(createSessionSubscriptionMono, reviveSessionSubscriptionMono));
}
private CompletableFuture<Void> destroySession(long userId, String nodeId) {
@ -229,6 +244,13 @@ public class AtomixReactiveApi implements ReactiveApi {
.whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex));
}
/**
* Send a request to the cluster to load that user id from disk
*/
public Mono<Void> tryReviveSession(long userId) {
return Mono.fromRunnable(() -> atomix.getEventService().broadcast("revive-session", userId));
}
@Override
public Mono<CreateSessionResponse> createSession(CreateSessionRequest req) {
LOG.debug("Received create session request: {}", req);
@ -412,6 +434,15 @@ public class AtomixReactiveApi implements ReactiveApi {
}).collectMap(Entry::getKey, Entry::getValue);
}
@Override
public Set<UserIdAndLiveId> getLocalLiveSessionIds() {
return localLiveSessions
.values()
.stream()
.map(reactiveApiPublisher -> new UserIdAndLiveId(reactiveApiPublisher.userId, reactiveApiPublisher.liveId))
.collect(Collectors.toUnmodifiableSet());
}
@Override
public boolean is(String nodeId) {
if (this.nodeId == null) {
@ -440,6 +471,21 @@ public class AtomixReactiveApi implements ReactiveApi {
});
}
@Override
public ReactiveApiClient dynamicClient(long userId) {
return new DynamicAtomixReactiveApiClient(this, userId);
}
@Override
public ReactiveApiClient liveClient(long liveId, long userId) {
return new LiveAtomixReactiveApiClient(atomix, liveId, userId);
}
@Override
public ReactiveApiMultiClient multiClient() {
return new AtomixReactiveApiMultiClient(this);
}
@Override
public Mono<Void> close() {
return Mono.fromCompletionStage(this.atomix::stop);
@ -447,20 +493,20 @@ public class AtomixReactiveApi implements ReactiveApi {
private record DiskSessionAndId(DiskSession diskSession, long id) {}
private Mono<DiskSessionAndId> getLocalDiskSession(Long localId) {
private Mono<DiskSessionAndId> getLocalDiskSession(Long localUserId) {
return Mono.fromCallable(() -> {
Objects.requireNonNull(diskSessions);
synchronized (diskSessions) {
var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localId),
"Id not found: " + localId
var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localUserId),
"Id not found: " + localUserId
);
try {
diskSession.validate();
} catch (Throwable ex) {
LOG.error("Failed to load disk session {}", localId, ex);
LOG.error("Failed to load disk session {}", localUserId, ex);
return null;
}
return new DiskSessionAndId(diskSession, localId);
return new DiskSessionAndId(diskSession, localUserId);
}
}).subscribeOn(Schedulers.boundedElastic());
}

View File

@ -0,0 +1,77 @@
package it.tdlight.reactiveapi;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.Subscription;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.Request;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable {
private final ReactiveApi api;
private final ClusterEventService eventService;
private final Flux<ClientBoundEvent> clientBoundEvents;
AtomixReactiveApiMultiClient(AtomixReactiveApi api) {
this.api = api;
this.eventService = api.getAtomix().getEventService();
clientBoundEvents = Flux
.<ClientBoundEvent>push(sink -> {
var subscriptionFuture = eventService.subscribe("session-client-bound-events",
LiveAtomixReactiveApiClient::deserializeEvent,
s -> {
sink.next(s);
return CompletableFuture.completedFuture(null);
},
(a) -> null
);
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR)
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.share();
}
@Override
public Flux<ClientBoundEvent> clientBoundEvents() {
return clientBoundEvents;
}
@Override
public <T extends TdApi.Object> Mono<T> request(long userId, long liveId, TdApi.Function<T> request, Instant timeout) {
return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
new Request<>(liveId, request, timeout),
LiveAtomixReactiveApiClient::serializeRequest,
LiveAtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout)
))
.<T>handle((item, sink) -> {
if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message));
} else {
//noinspection unchecked
sink.next((T) item);
}
})
.onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + userId + " (live id: " + liveId + ") is not found on the cluster");
} else {
return ex;
}
});
}
@Override
public void close() {
}
}

View File

@ -1,16 +1,16 @@
package it.tdlight.reactiveapi;
import static java.util.Collections.unmodifiableSet;
import io.atomix.core.Atomix;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import net.minecrell.terminalconsole.SimpleTerminalConsole;
import org.jline.reader.LineReader;
import org.jline.reader.LineReaderBuilder;
@ -93,6 +93,16 @@ public class Cli {
private void printSessions(ReactiveApi api, boolean onlyLocal) {
api.getAllUsers().subscribe(sessions -> {
var userIdToLiveId = api
.getLocalLiveSessionIds()
.stream()
.collect(Collectors.toMap(UserIdAndLiveId::userId, k -> Set.of(k.liveId()), (a, b) -> {
var r = new LongOpenHashSet(a.size() + b.size());
r.addAll(a);
r.addAll(b);
return unmodifiableSet(r);
}));
StringBuilder sb = new StringBuilder();
sb.append("Sessions:\n");
for (var userEntry : sessions.entrySet()) {
@ -102,6 +112,14 @@ public class Cli {
sb.append(" - session #IDU").append(userId);
if (!onlyLocal) {
sb.append(": ").append(nodeId);
} else {
sb
.append(": liveId=")
.append(userIdToLiveId
.get(userId)
.stream()
.map(Object::toString)
.collect(Collectors.joining(", ", "(", ")")));
}
sb.append("\n");
}

View File

@ -32,7 +32,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
private final Flux<Long> liveIdChange;
private final Mono<Long> liveIdResolution;
public DynamicAtomixReactiveApiClient(AtomixReactiveApi api, long userId) {
DynamicAtomixReactiveApiClient(AtomixReactiveApi api, long userId) {
this.api = api;
this.eventService = api.getAtomix().getEventService();
this.userId = userId;
@ -76,7 +76,13 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
LiveAtomixReactiveApiClient::serializeRequest,
LiveAtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout)
)))
)).onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else {
return ex;
}
}))
.<T>handle((item, sink) -> {
if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message));
@ -84,13 +90,6 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
//noinspection unchecked
sink.next((T) item);
}
})
.onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " is not found on the cluster");
} else {
return ex;
}
});
}

View File

@ -34,7 +34,7 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
private final Flux<ClientBoundEvent> clientBoundEvents;
public LiveAtomixReactiveApiClient(Atomix atomix, long liveId, long userId) {
LiveAtomixReactiveApiClient(Atomix atomix, long liveId, long userId) {
this.eventService = atomix.getEventService();
this.liveId = liveId;
this.userId = userId;

View File

@ -0,0 +1,129 @@
package it.tdlight.reactiveapi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.AuthorizationStateClosing;
import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut;
import it.tdlight.jni.TdApi.AuthorizationStateReady;
import it.tdlight.jni.TdApi.Close;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.jni.TdApi.UpdateNewMessage;
import it.tdlight.reactiveapi.Event.OnUpdateData;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class PeriodicRestarter {
private static final Logger LOG = LoggerFactory.getLogger(PeriodicRestarter.class);
private final ReactiveApi api;
private final Duration interval;
private final ReactiveApiMultiClient multiClient;
/**
* Live id -> x
*/
private final ConcurrentMap<Long, Disposable> closeManagedByPeriodicRestarter = new ConcurrentHashMap<>();
/**
* Live id -> x
*/
private final ConcurrentMap<Long, Boolean> closingByPeriodicRestarter = new ConcurrentHashMap<>();
/**
* Live id -> x
*/
private final ConcurrentMap<Long, Boolean> sessionAuthReady = new ConcurrentHashMap<>();
public PeriodicRestarter(ReactiveApi api, Duration interval) {
this.api = api;
this.interval = interval;
this.multiClient = api.multiClient();
}
public Mono<Void> start() {
return Mono.fromRunnable(() -> {
LOG.info("Starting periodic restarter...");
multiClient.clientBoundEvents().doOnNext(event -> {
if (event instanceof OnUpdateData onUpdate) {
if (onUpdate.update() instanceof UpdateAuthorizationState updateAuthorizationState) {
if (updateAuthorizationState.authorizationState instanceof AuthorizationStateReady) {
// Session is now ready
this.sessionAuthReady.put(event.liveId(), true);
onSessionReady(event.liveId(), event.userId());
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateLoggingOut) {
// Session is not ready anymore
this.sessionAuthReady.remove(event.liveId(), false);
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosing) {
// Session is not ready anymore
this.sessionAuthReady.remove(event.liveId(), false);
} else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosed) {
// Session is not ready anymore
this.sessionAuthReady.remove(event.liveId(), false);
Boolean prev = closingByPeriodicRestarter.remove(event.liveId());
var disposable = closeManagedByPeriodicRestarter.remove(event.userId());
boolean managed = prev != null && prev;
// Check if the live session is managed by the periodic restarter
if (managed) {
LOG.info("The session #IDU{} (liveId: {}) is being started", event.userId(), event.liveId());
// Restart the session
api.tryReviveSession(event.userId()).subscribeOn(Schedulers.parallel()).subscribe();
}
// Dispose restarter anyway
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}
}
} else if (onUpdate.update() instanceof UpdateNewMessage) {
var wasReady = this.sessionAuthReady.getOrDefault(event.liveId(), false);
if (!wasReady) {
this.sessionAuthReady.put(event.liveId(), true);
onSessionReady(event.liveId(), event.userId());
}
}
}
}).subscribeOn(Schedulers.parallel()).subscribe();
LOG.info("Started periodic restarter");
});
}
private void onSessionReady(long liveId, long userId) {
LOG.info("The session #IDU{} (liveId: {}) will be restarted at {}",
userId,
liveId,
Instant.now().plus(interval)
);
// Restart after x time
var disposable = Schedulers
.parallel()
.schedule(() -> {
LOG.info("The session #IDU{} (liveId: {}) is being stopped", userId, liveId);
closingByPeriodicRestarter.put(liveId, true);
// Request restart
multiClient
.request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15)))
.subscribeOn(Schedulers.parallel())
.subscribe();
}, interval.toMillis(), TimeUnit.MILLISECONDS);
closeManagedByPeriodicRestarter.put(liveId, disposable);
}
public Mono<Void> stop() {
return Mono.fromRunnable(() -> {
LOG.info("Stopping periodic restarter...");
multiClient.close();
LOG.info("Stopped periodic restarter");
});
}
}

View File

@ -1,16 +1,25 @@
package it.tdlight.reactiveapi;
import java.util.List;
import java.util.Map;
import java.util.Set;
import reactor.core.publisher.Mono;
public interface ReactiveApi {
Mono<Void> start();
/**
* Send a request to the cluster to load that user id from disk
*/
Mono<Void> tryReviveSession(long userId);
Mono<CreateSessionResponse> createSession(CreateSessionRequest req);
Mono<Map<Long, String>> getAllUsers();
Set<UserIdAndLiveId> getLocalLiveSessionIds();
boolean is(String nodeId);
/**
@ -18,5 +27,11 @@ public interface ReactiveApi {
*/
Mono<Long> resolveUserLiveId(long userId);
ReactiveApiMultiClient multiClient();
ReactiveApiClient dynamicClient(long userId);
ReactiveApiClient liveClient(long liveId, long userId);
Mono<Void> close();
}

View File

@ -0,0 +1,16 @@
package it.tdlight.reactiveapi;
import it.tdlight.jni.TdApi;
import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import java.time.Instant;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface ReactiveApiMultiClient {
Flux<ClientBoundEvent> clientBoundEvents();
<T extends TdApi.Object> Mono<T> request(long userId, long liveId, TdApi.Function<T> request, Instant timeout);
void close();
}

View File

@ -12,6 +12,7 @@ import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.common.Response;
import it.tdlight.common.Signal;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.AuthorizationStateWaitOtherDeviceConfirmation;
import it.tdlight.jni.TdApi.AuthorizationStateWaitPassword;
import it.tdlight.jni.TdApi.CheckAuthenticationBotToken;
@ -54,6 +55,7 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@ -139,10 +141,9 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof TDLibBoundResultingEvent<?>)
.map(s -> ((TDLibBoundResultingEvent<?>) s).action())
//.limitRate(4)
.onBackpressureBuffer()
.limitRate(4)
// Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered
//.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR)
.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR)
// Send requests to tdlib
.flatMap(function -> Mono
@ -176,9 +177,6 @@ public abstract class ReactiveApiPublisher {
.cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event)
// Buffer requests
.onBackpressureBuffer()
// Send events to the client
.subscribeOn(Schedulers.parallel())
.subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events",
@ -189,9 +187,6 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof ClusterBoundResultingEvent)
.cast(ClusterBoundResultingEvent.class)
// Buffer requests
.onBackpressureBuffer()
// Send events to the cluster
.subscribeOn(Schedulers.parallel())
.subscribe(clusterBoundEvent -> {
@ -250,7 +245,10 @@ public abstract class ReactiveApiPublisher {
if (signal.isClosed()) {
signal.getClosed();
LOG.info("Received a closed signal");
return List.of(new ResultingEventPublisherClosed());
return List.of(new ClientBoundResultingEvent(new OnUpdateData(liveId,
userId,
new TdApi.UpdateAuthorizationState(new AuthorizationStateClosed())
)), new ResultingEventPublisherClosed());
}
if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = ((TdApi.Error) signal.getUpdate());
@ -405,6 +403,7 @@ public abstract class ReactiveApiPublisher {
}
private static byte[] serializeResponse(Response response) {
if (response == null) return null;
var id = response.getId();
var object = response.getObject();
try (var byteArrayOutputStream = new ByteArrayOutputStream()) {

View File

@ -0,0 +1,3 @@
package it.tdlight.reactiveapi;
public record UserIdAndLiveId(long userId, long liveId) {}