From 172c770524e04cad8517208df9ac1fbac6a37e72 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 9 Jan 2022 20:20:20 +0100 Subject: [PATCH] Implement periodic restarter --- .../reactiveapi/AtomixReactiveApi.java | 66 +++++++-- .../AtomixReactiveApiMultiClient.java | 77 +++++++++++ src/main/java/it/tdlight/reactiveapi/Cli.java | 26 +++- .../DynamicAtomixReactiveApiClient.java | 17 ++- .../LiveAtomixReactiveApiClient.java | 2 +- .../reactiveapi/PeriodicRestarter.java | 129 ++++++++++++++++++ .../it/tdlight/reactiveapi/ReactiveApi.java | 15 ++ .../reactiveapi/ReactiveApiMultiClient.java | 16 +++ .../reactiveapi/ReactiveApiPublisher.java | 19 ++- .../tdlight/reactiveapi/UserIdAndLiveId.java | 3 + 10 files changed, 336 insertions(+), 34 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java create mode 100644 src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java create mode 100644 src/main/java/it/tdlight/reactiveapi/UserIdAndLiveId.java diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index e3142ac..f193079 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -55,7 +55,7 @@ public class AtomixReactiveApi implements ReactiveApi { private final AsyncAtomicLock sessionModificationLock; private final AsyncAtomicMap userIdToNodeId; /** - * User id -> session + * live id -> session */ private final ConcurrentMap localLiveSessions = new ConcurrentHashMap<>(); /** @@ -193,9 +193,9 @@ public class AtomixReactiveApi implements ReactiveApi { .doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk")); // Listen for create-session signals - Mono subscriptionMono; + Mono createSessionSubscriptionMono; if (nodeId != null) { - subscriptionMono = fromCompletionStage(() -> atomix + createSessionSubscriptionMono = fromCompletionStage(() -> atomix .getEventService() .subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> { if (req instanceof LoadSessionFromDiskRequest) { @@ -205,9 +205,24 @@ public class AtomixReactiveApi implements ReactiveApi { } }, CreateSessionResponse::serializeBytes)); } else { - subscriptionMono = Mono.empty(); + createSessionSubscriptionMono = Mono.empty(); } - return diskInitMono.then(subscriptionMono).then(); + + // Listen for revive-session signals + Mono reviveSessionSubscriptionMono; + if (nodeId != null) { + reviveSessionSubscriptionMono = fromCompletionStage(() -> atomix + .getEventService() + .subscribe("revive-session", (Long userId) -> this.getLocalDiskSession(userId).flatMap(sessionAndId -> { + var diskSession = sessionAndId.diskSession(); + var request = new LoadSessionFromDiskRequest(userId, diskSession.token, diskSession.phoneNumber, false); + return this.createSession(request); + }).onErrorResume(ex -> Mono.empty()).then(Mono.empty()).toFuture())); + } else { + reviveSessionSubscriptionMono = Mono.empty(); + } + + return diskInitMono.then(Mono.when(createSessionSubscriptionMono, reviveSessionSubscriptionMono)); } private CompletableFuture destroySession(long userId, String nodeId) { @@ -229,6 +244,13 @@ public class AtomixReactiveApi implements ReactiveApi { .whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex)); } + /** + * Send a request to the cluster to load that user id from disk + */ + public Mono tryReviveSession(long userId) { + return Mono.fromRunnable(() -> atomix.getEventService().broadcast("revive-session", userId)); + } + @Override public Mono createSession(CreateSessionRequest req) { LOG.debug("Received create session request: {}", req); @@ -412,6 +434,15 @@ public class AtomixReactiveApi implements ReactiveApi { }).collectMap(Entry::getKey, Entry::getValue); } + @Override + public Set getLocalLiveSessionIds() { + return localLiveSessions + .values() + .stream() + .map(reactiveApiPublisher -> new UserIdAndLiveId(reactiveApiPublisher.userId, reactiveApiPublisher.liveId)) + .collect(Collectors.toUnmodifiableSet()); + } + @Override public boolean is(String nodeId) { if (this.nodeId == null) { @@ -440,6 +471,21 @@ public class AtomixReactiveApi implements ReactiveApi { }); } + @Override + public ReactiveApiClient dynamicClient(long userId) { + return new DynamicAtomixReactiveApiClient(this, userId); + } + + @Override + public ReactiveApiClient liveClient(long liveId, long userId) { + return new LiveAtomixReactiveApiClient(atomix, liveId, userId); + } + + @Override + public ReactiveApiMultiClient multiClient() { + return new AtomixReactiveApiMultiClient(this); + } + @Override public Mono close() { return Mono.fromCompletionStage(this.atomix::stop); @@ -447,20 +493,20 @@ public class AtomixReactiveApi implements ReactiveApi { private record DiskSessionAndId(DiskSession diskSession, long id) {} - private Mono getLocalDiskSession(Long localId) { + private Mono getLocalDiskSession(Long localUserId) { return Mono.fromCallable(() -> { Objects.requireNonNull(diskSessions); synchronized (diskSessions) { - var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localId), - "Id not found: " + localId + var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localUserId), + "Id not found: " + localUserId ); try { diskSession.validate(); } catch (Throwable ex) { - LOG.error("Failed to load disk session {}", localId, ex); + LOG.error("Failed to load disk session {}", localUserId, ex); return null; } - return new DiskSessionAndId(diskSession, localId); + return new DiskSessionAndId(diskSession, localUserId); } }).subscribeOn(Schedulers.boundedElastic()); } diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java new file mode 100644 index 0000000..9592827 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApiMultiClient.java @@ -0,0 +1,77 @@ +package it.tdlight.reactiveapi; + +import io.atomix.cluster.messaging.ClusterEventService; +import io.atomix.cluster.messaging.MessagingException; +import io.atomix.cluster.messaging.Subscription; +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import it.tdlight.reactiveapi.Event.Request; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.CompletableFuture; +import reactor.core.publisher.BufferOverflowStrategy; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; +import reactor.core.publisher.Mono; + +public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable { + + private final ReactiveApi api; + private final ClusterEventService eventService; + + private final Flux clientBoundEvents; + + AtomixReactiveApiMultiClient(AtomixReactiveApi api) { + this.api = api; + this.eventService = api.getAtomix().getEventService(); + + clientBoundEvents = Flux + .push(sink -> { + var subscriptionFuture = eventService.subscribe("session-client-bound-events", + LiveAtomixReactiveApiClient::deserializeEvent, + s -> { + sink.next(s); + return CompletableFuture.completedFuture(null); + }, + (a) -> null + ); + sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); + }, OverflowStrategy.ERROR) + .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) + .share(); + } + + @Override + public Flux clientBoundEvents() { + return clientBoundEvents; + } + + @Override + public Mono request(long userId, long liveId, TdApi.Function request, Instant timeout) { + return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", + new Request<>(liveId, request, timeout), + LiveAtomixReactiveApiClient::serializeRequest, + LiveAtomixReactiveApiClient::deserializeResponse, + Duration.between(Instant.now(), timeout) + )) + .handle((item, sink) -> { + if (item instanceof TdApi.Error error) { + sink.error(new TdError(error.code, error.message)); + } else { + //noinspection unchecked + sink.next((T) item); + } + }) + .onErrorMap(ex -> { + if (ex instanceof MessagingException.NoRemoteHandler) { + return new TdError(404, "Bot #IDU" + userId + " (live id: " + liveId + ") is not found on the cluster"); + } else { + return ex; + } + }); + } + + @Override + public void close() { + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/Cli.java b/src/main/java/it/tdlight/reactiveapi/Cli.java index c55ad67..a3f27db 100644 --- a/src/main/java/it/tdlight/reactiveapi/Cli.java +++ b/src/main/java/it/tdlight/reactiveapi/Cli.java @@ -1,16 +1,16 @@ package it.tdlight.reactiveapi; +import static java.util.Collections.unmodifiableSet; + import io.atomix.core.Atomix; import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest; import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest; +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; import java.io.IOException; -import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.LockSupport; +import java.util.stream.Collectors; import net.minecrell.terminalconsole.SimpleTerminalConsole; import org.jline.reader.LineReader; import org.jline.reader.LineReaderBuilder; @@ -93,6 +93,16 @@ public class Cli { private void printSessions(ReactiveApi api, boolean onlyLocal) { api.getAllUsers().subscribe(sessions -> { + var userIdToLiveId = api + .getLocalLiveSessionIds() + .stream() + .collect(Collectors.toMap(UserIdAndLiveId::userId, k -> Set.of(k.liveId()), (a, b) -> { + var r = new LongOpenHashSet(a.size() + b.size()); + r.addAll(a); + r.addAll(b); + return unmodifiableSet(r); + })); + StringBuilder sb = new StringBuilder(); sb.append("Sessions:\n"); for (var userEntry : sessions.entrySet()) { @@ -102,6 +112,14 @@ public class Cli { sb.append(" - session #IDU").append(userId); if (!onlyLocal) { sb.append(": ").append(nodeId); + } else { + sb + .append(": liveId=") + .append(userIdToLiveId + .get(userId) + .stream() + .map(Object::toString) + .collect(Collectors.joining(", ", "(", ")"))); } sb.append("\n"); } diff --git a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java index 301855e..d981a8d 100644 --- a/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/DynamicAtomixReactiveApiClient.java @@ -32,7 +32,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl private final Flux liveIdChange; private final Mono liveIdResolution; - public DynamicAtomixReactiveApiClient(AtomixReactiveApi api, long userId) { + DynamicAtomixReactiveApiClient(AtomixReactiveApi api, long userId) { this.api = api; this.eventService = api.getAtomix().getEventService(); this.userId = userId; @@ -76,7 +76,13 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl LiveAtomixReactiveApiClient::serializeRequest, LiveAtomixReactiveApiClient::deserializeResponse, Duration.between(Instant.now(), timeout) - ))) + )).onErrorMap(ex -> { + if (ex instanceof MessagingException.NoRemoteHandler) { + return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster"); + } else { + return ex; + } + })) .handle((item, sink) -> { if (item instanceof TdApi.Error error) { sink.error(new TdError(error.code, error.message)); @@ -84,13 +90,6 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl //noinspection unchecked sink.next((T) item); } - }) - .onErrorMap(ex -> { - if (ex instanceof MessagingException.NoRemoteHandler) { - return new TdError(404, "Bot #IDU" + this.userId + " is not found on the cluster"); - } else { - return ex; - } }); } diff --git a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java index 46f762b..2fc1d4e 100644 --- a/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java +++ b/src/main/java/it/tdlight/reactiveapi/LiveAtomixReactiveApiClient.java @@ -34,7 +34,7 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient { private final Flux clientBoundEvents; - public LiveAtomixReactiveApiClient(Atomix atomix, long liveId, long userId) { + LiveAtomixReactiveApiClient(Atomix atomix, long liveId, long userId) { this.eventService = atomix.getEventService(); this.liveId = liveId; this.userId = userId; diff --git a/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java new file mode 100644 index 0000000..13a7189 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/PeriodicRestarter.java @@ -0,0 +1,129 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi.AuthorizationStateClosed; +import it.tdlight.jni.TdApi.AuthorizationStateClosing; +import it.tdlight.jni.TdApi.AuthorizationStateLoggingOut; +import it.tdlight.jni.TdApi.AuthorizationStateReady; +import it.tdlight.jni.TdApi.Close; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; +import it.tdlight.jni.TdApi.UpdateNewMessage; +import it.tdlight.reactiveapi.Event.OnUpdateData; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class PeriodicRestarter { + + private static final Logger LOG = LoggerFactory.getLogger(PeriodicRestarter.class); + + private final ReactiveApi api; + private final Duration interval; + private final ReactiveApiMultiClient multiClient; + + /** + * Live id -> x + */ + private final ConcurrentMap closeManagedByPeriodicRestarter = new ConcurrentHashMap<>(); + + /** + * Live id -> x + */ + private final ConcurrentMap closingByPeriodicRestarter = new ConcurrentHashMap<>(); + + /** + * Live id -> x + */ + private final ConcurrentMap sessionAuthReady = new ConcurrentHashMap<>(); + + public PeriodicRestarter(ReactiveApi api, Duration interval) { + this.api = api; + this.interval = interval; + + this.multiClient = api.multiClient(); + + } + + public Mono start() { + return Mono.fromRunnable(() -> { + LOG.info("Starting periodic restarter..."); + multiClient.clientBoundEvents().doOnNext(event -> { + if (event instanceof OnUpdateData onUpdate) { + if (onUpdate.update() instanceof UpdateAuthorizationState updateAuthorizationState) { + if (updateAuthorizationState.authorizationState instanceof AuthorizationStateReady) { + // Session is now ready + this.sessionAuthReady.put(event.liveId(), true); + onSessionReady(event.liveId(), event.userId()); + } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateLoggingOut) { + // Session is not ready anymore + this.sessionAuthReady.remove(event.liveId(), false); + } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosing) { + // Session is not ready anymore + this.sessionAuthReady.remove(event.liveId(), false); + } else if (updateAuthorizationState.authorizationState instanceof AuthorizationStateClosed) { + // Session is not ready anymore + this.sessionAuthReady.remove(event.liveId(), false); + Boolean prev = closingByPeriodicRestarter.remove(event.liveId()); + var disposable = closeManagedByPeriodicRestarter.remove(event.userId()); + boolean managed = prev != null && prev; + // Check if the live session is managed by the periodic restarter + if (managed) { + LOG.info("The session #IDU{} (liveId: {}) is being started", event.userId(), event.liveId()); + // Restart the session + api.tryReviveSession(event.userId()).subscribeOn(Schedulers.parallel()).subscribe(); + } + // Dispose restarter anyway + if (disposable != null && !disposable.isDisposed()) { + disposable.dispose(); + } + } + } else if (onUpdate.update() instanceof UpdateNewMessage) { + var wasReady = this.sessionAuthReady.getOrDefault(event.liveId(), false); + if (!wasReady) { + this.sessionAuthReady.put(event.liveId(), true); + onSessionReady(event.liveId(), event.userId()); + } + } + } + }).subscribeOn(Schedulers.parallel()).subscribe(); + LOG.info("Started periodic restarter"); + }); + } + + private void onSessionReady(long liveId, long userId) { + LOG.info("The session #IDU{} (liveId: {}) will be restarted at {}", + userId, + liveId, + Instant.now().plus(interval) + ); + + // Restart after x time + var disposable = Schedulers + .parallel() + .schedule(() -> { + LOG.info("The session #IDU{} (liveId: {}) is being stopped", userId, liveId); + closingByPeriodicRestarter.put(liveId, true); + // Request restart + multiClient + .request(userId, liveId, new Close(), Instant.now().plus(Duration.ofSeconds(15))) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + + }, interval.toMillis(), TimeUnit.MILLISECONDS); + closeManagedByPeriodicRestarter.put(liveId, disposable); + } + + public Mono stop() { + return Mono.fromRunnable(() -> { + LOG.info("Stopping periodic restarter..."); + multiClient.close(); + LOG.info("Stopped periodic restarter"); + }); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java index cf80d4b..fcd2266 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApi.java @@ -1,16 +1,25 @@ package it.tdlight.reactiveapi; +import java.util.List; import java.util.Map; +import java.util.Set; import reactor.core.publisher.Mono; public interface ReactiveApi { Mono start(); + /** + * Send a request to the cluster to load that user id from disk + */ + Mono tryReviveSession(long userId); + Mono createSession(CreateSessionRequest req); Mono> getAllUsers(); + Set getLocalLiveSessionIds(); + boolean is(String nodeId); /** @@ -18,5 +27,11 @@ public interface ReactiveApi { */ Mono resolveUserLiveId(long userId); + ReactiveApiMultiClient multiClient(); + + ReactiveApiClient dynamicClient(long userId); + + ReactiveApiClient liveClient(long liveId, long userId); + Mono close(); } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java new file mode 100644 index 0000000..63dc7d6 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiMultiClient.java @@ -0,0 +1,16 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi; +import it.tdlight.reactiveapi.Event.ClientBoundEvent; +import java.time.Instant; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface ReactiveApiMultiClient { + + Flux clientBoundEvents(); + + Mono request(long userId, long liveId, TdApi.Function request, Instant timeout); + + void close(); +} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index ae5490a..308b17d 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -12,6 +12,7 @@ import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.common.Response; import it.tdlight.common.Signal; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.AuthorizationStateWaitOtherDeviceConfirmation; import it.tdlight.jni.TdApi.AuthorizationStateWaitPassword; import it.tdlight.jni.TdApi.CheckAuthenticationBotToken; @@ -54,6 +55,7 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; +import reactor.core.publisher.BufferOverflowStrategy; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -139,10 +141,9 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof TDLibBoundResultingEvent) .map(s -> ((TDLibBoundResultingEvent) s).action()) - //.limitRate(4) - .onBackpressureBuffer() + .limitRate(4) // Buffer up to 64 requests to avoid halting the event loop, throw an error if too many requests are buffered - //.onBackpressureBuffer(64, BufferOverflowStrategy.ERROR) + .onBackpressureBuffer(64, BufferOverflowStrategy.ERROR) // Send requests to tdlib .flatMap(function -> Mono @@ -176,9 +177,6 @@ public abstract class ReactiveApiPublisher { .cast(ClientBoundResultingEvent.class) .map(ClientBoundResultingEvent::event) - // Buffer requests - .onBackpressureBuffer() - // Send events to the client .subscribeOn(Schedulers.parallel()) .subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events", @@ -189,9 +187,6 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof ClusterBoundResultingEvent) .cast(ClusterBoundResultingEvent.class) - // Buffer requests - .onBackpressureBuffer() - // Send events to the cluster .subscribeOn(Schedulers.parallel()) .subscribe(clusterBoundEvent -> { @@ -250,7 +245,10 @@ public abstract class ReactiveApiPublisher { if (signal.isClosed()) { signal.getClosed(); LOG.info("Received a closed signal"); - return List.of(new ResultingEventPublisherClosed()); + return List.of(new ClientBoundResultingEvent(new OnUpdateData(liveId, + userId, + new TdApi.UpdateAuthorizationState(new AuthorizationStateClosed()) + )), new ResultingEventPublisherClosed()); } if (signal.isUpdate() && signal.getUpdate().getConstructor() == TdApi.Error.CONSTRUCTOR) { var error = ((TdApi.Error) signal.getUpdate()); @@ -405,6 +403,7 @@ public abstract class ReactiveApiPublisher { } private static byte[] serializeResponse(Response response) { + if (response == null) return null; var id = response.getId(); var object = response.getObject(); try (var byteArrayOutputStream = new ByteArrayOutputStream()) { diff --git a/src/main/java/it/tdlight/reactiveapi/UserIdAndLiveId.java b/src/main/java/it/tdlight/reactiveapi/UserIdAndLiveId.java new file mode 100644 index 0000000..16c6e18 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/UserIdAndLiveId.java @@ -0,0 +1,3 @@ +package it.tdlight.reactiveapi; + +public record UserIdAndLiveId(long userId, long liveId) {}