Rewrite local sessions management
This commit is contained in:
parent
2dc4a35d9f
commit
07c6bd1140
|
@ -1,11 +1,12 @@
|
||||||
package it.tdlight.reactiveapi;
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
import io.atomix.core.Atomix;
|
import io.atomix.core.Atomix;
|
||||||
import io.atomix.core.AtomixBuilder;
|
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import net.minecrell.terminalconsole.SimpleTerminalConsole;
|
import net.minecrell.terminalconsole.SimpleTerminalConsole;
|
||||||
import org.jline.reader.LineReader;
|
import org.jline.reader.LineReader;
|
||||||
|
@ -34,13 +35,19 @@ public class Cli {
|
||||||
|
|
||||||
var console = new SimpleTerminalConsole() {
|
var console = new SimpleTerminalConsole() {
|
||||||
|
|
||||||
private static final Set<String> commands = Set.of("exit", "stop", "createsession", "help", "man", "?");
|
private static final Set<String> commands = Set.of("exit",
|
||||||
|
"stop",
|
||||||
|
"createsession",
|
||||||
|
"help",
|
||||||
|
"man",
|
||||||
|
"?",
|
||||||
|
"sessions",
|
||||||
|
"localsessions"
|
||||||
|
);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected LineReader buildReader(LineReaderBuilder builder) {
|
protected LineReader buildReader(LineReaderBuilder builder) {
|
||||||
var reader = super.buildReader(builder);
|
return super.buildReader(builder);
|
||||||
reader.addCommandsInBuffer(commands);
|
|
||||||
return reader;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -59,18 +66,40 @@ public class Cli {
|
||||||
commandArgs = "";
|
commandArgs = "";
|
||||||
}
|
}
|
||||||
switch (commandName) {
|
switch (commandName) {
|
||||||
case "exit", "stop" -> acceptInputs.set(false);
|
case "exit", "stop" -> shutdown();
|
||||||
case "createsession" -> createSession(api, commandArgs);
|
case "createsession" -> createSession(api, commandArgs);
|
||||||
case "help", "?", "man" -> LOG.info("Commands: {}", commands);
|
case "help", "?", "man" -> LOG.info("Commands: {}", commands);
|
||||||
|
case "sessions" -> printSessions(api, false);
|
||||||
|
case "localsessions" -> printSessions(api, true);
|
||||||
default -> LOG.info("Unknown command \"{}\"", command);
|
default -> LOG.info("Unknown command \"{}\"", command);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void printSessions(ReactiveApi api, boolean onlyLocal) {
|
||||||
|
api.getAllUsers().subscribe(sessions -> {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
sb.append("Sessions:\n");
|
||||||
|
for (var userEntry : sessions.entrySet()) {
|
||||||
|
var userId = userEntry.getKey();
|
||||||
|
var nodeId = userEntry.getValue();
|
||||||
|
if (!onlyLocal || api.is(nodeId)) {
|
||||||
|
sb.append(" - session #IDU").append(userId);
|
||||||
|
if (!onlyLocal) {
|
||||||
|
sb.append(": ").append(nodeId);
|
||||||
|
}
|
||||||
|
sb.append("\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info(sb.toString());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void shutdown() {
|
protected void shutdown() {
|
||||||
acceptInputs.set(false);
|
acceptInputs.set(false);
|
||||||
if (alreadyShutDown.compareAndSet(false, true)) {
|
if (alreadyShutDown.compareAndSet(false, true)) {
|
||||||
api.getAtomix().stop().join();
|
api.getAtomix().stop().join();
|
||||||
|
System.exit(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -91,8 +120,9 @@ public class Cli {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (!invalid) {
|
if (!invalid) {
|
||||||
CreateSessionResponse response = api.createSession(request).join();
|
api
|
||||||
LOG.info("Created a session with session id \"{}\"", response.sessionId());
|
.createSession(request)
|
||||||
|
.thenAccept(response -> LOG.info("Created a session with live id \"{}\"", response.sessionId()));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
invalid = true;
|
invalid = true;
|
||||||
|
|
|
@ -28,7 +28,6 @@ public sealed interface CreateSessionRequest permits CreateUserSessionRequest, C
|
||||||
case 2 -> {
|
case 2 -> {
|
||||||
var dis = new DataInputStream(new ByteArrayInputStream(bytes, 1 + Long.BYTES, bytes.length - (1 + Long.BYTES)));
|
var dis = new DataInputStream(new ByteArrayInputStream(bytes, 1 + Long.BYTES, bytes.length - (1 + Long.BYTES)));
|
||||||
try {
|
try {
|
||||||
var pathName = dis.readUTF();
|
|
||||||
var isBot = dis.readBoolean();
|
var isBot = dis.readBoolean();
|
||||||
String token;
|
String token;
|
||||||
Long phoneNumber;
|
Long phoneNumber;
|
||||||
|
@ -39,7 +38,7 @@ public sealed interface CreateSessionRequest permits CreateUserSessionRequest, C
|
||||||
token = null;
|
token = null;
|
||||||
phoneNumber = dis.readLong();
|
phoneNumber = dis.readLong();
|
||||||
}
|
}
|
||||||
yield new LoadSessionFromDiskRequest(userId, pathName, token, phoneNumber);
|
yield new LoadSessionFromDiskRequest(userId, token, phoneNumber, dis.readBoolean());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new SerializationException(e);
|
throw new SerializationException(e);
|
||||||
}
|
}
|
||||||
|
@ -52,7 +51,7 @@ public sealed interface CreateSessionRequest permits CreateUserSessionRequest, C
|
||||||
|
|
||||||
final record CreateBotSessionRequest(long userId, String token) implements CreateSessionRequest {}
|
final record CreateBotSessionRequest(long userId, String token) implements CreateSessionRequest {}
|
||||||
|
|
||||||
final record LoadSessionFromDiskRequest(long userId, String pathName, String token, Long phoneNumber) implements
|
final record LoadSessionFromDiskRequest(long userId, String token, Long phoneNumber, boolean createNew) implements
|
||||||
CreateSessionRequest {
|
CreateSessionRequest {
|
||||||
|
|
||||||
public LoadSessionFromDiskRequest {
|
public LoadSessionFromDiskRequest {
|
||||||
|
|
|
@ -9,17 +9,14 @@ import org.jetbrains.annotations.Nullable;
|
||||||
@JsonInclude(Include.NON_NULL)
|
@JsonInclude(Include.NON_NULL)
|
||||||
public class DiskSession {
|
public class DiskSession {
|
||||||
|
|
||||||
public long userId;
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public String token;
|
public String token;
|
||||||
@Nullable
|
@Nullable
|
||||||
public Long phoneNumber;
|
public Long phoneNumber;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public DiskSession(@JsonProperty(required = true, value = "userId") long userId,
|
public DiskSession(@JsonProperty("token") @Nullable String token,
|
||||||
@JsonProperty("token") @Nullable String token,
|
|
||||||
@JsonProperty("phoneNumber") @Nullable Long phoneNumber) {
|
@JsonProperty("phoneNumber") @Nullable Long phoneNumber) {
|
||||||
this.userId = userId;
|
|
||||||
this.token = token;
|
this.token = token;
|
||||||
this.phoneNumber = phoneNumber;
|
this.phoneNumber = phoneNumber;
|
||||||
this.validate();
|
this.validate();
|
||||||
|
|
|
@ -2,7 +2,6 @@ package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.jetbrains.annotations.NotNull;
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
|
@ -15,12 +14,16 @@ public class DiskSessions {
|
||||||
* key: session folder name
|
* key: session folder name
|
||||||
*/
|
*/
|
||||||
@NotNull
|
@NotNull
|
||||||
public Map<String, DiskSession> sessions;
|
private Map<Long, DiskSession> sessions;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public DiskSessions(@JsonProperty(required = true, value = "path") @NotNull String path,
|
public DiskSessions(@JsonProperty(required = true, value = "path") @NotNull String path,
|
||||||
@JsonProperty(required = true, value = "sessions") @NotNull Map<String, DiskSession> sessions) {
|
@JsonProperty(required = true, value = "sessions") @NotNull Map<Long, DiskSession> userIdToSession) {
|
||||||
this.path = path;
|
this.path = path;
|
||||||
this.sessions = sessions;
|
this.sessions = userIdToSession;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<Long, DiskSession> userIdToSession() {
|
||||||
|
return sessions;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,9 +6,11 @@ import io.atomix.cluster.Node;
|
||||||
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
|
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
|
||||||
import io.atomix.core.Atomix;
|
import io.atomix.core.Atomix;
|
||||||
import io.atomix.core.AtomixBuilder;
|
import io.atomix.core.AtomixBuilder;
|
||||||
|
import io.atomix.core.profile.ConsensusProfileConfig;
|
||||||
import io.atomix.core.profile.Profile;
|
import io.atomix.core.profile.Profile;
|
||||||
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup;
|
import io.atomix.protocols.backup.partition.PrimaryBackupPartitionGroup;
|
||||||
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
|
import io.atomix.protocols.raft.partition.RaftPartitionGroup;
|
||||||
|
import io.atomix.storage.StorageLevel;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
|
@ -56,8 +58,10 @@ public class Entrypoint {
|
||||||
|
|
||||||
atomixBuilder.withClusterId(clusterSettings.id);
|
atomixBuilder.withClusterId(clusterSettings.id);
|
||||||
|
|
||||||
|
String nodeId;
|
||||||
if (instanceSettings.client) {
|
if (instanceSettings.client) {
|
||||||
atomixBuilder.withMemberId(instanceSettings.id).withAddress(instanceSettings.clientAddress);
|
atomixBuilder.withMemberId(instanceSettings.id).withAddress(instanceSettings.clientAddress);
|
||||||
|
nodeId = null;
|
||||||
} else {
|
} else {
|
||||||
// Find node settings
|
// Find node settings
|
||||||
var nodeSettingsOptional = clusterSettings.nodes
|
var nodeSettingsOptional = clusterSettings.nodes
|
||||||
|
@ -74,6 +78,7 @@ public class Entrypoint {
|
||||||
var nodeSettings = nodeSettingsOptional.get();
|
var nodeSettings = nodeSettingsOptional.get();
|
||||||
|
|
||||||
atomixBuilder.withMemberId(instanceSettings.id).withAddress(nodeSettings.address);
|
atomixBuilder.withMemberId(instanceSettings.id).withAddress(nodeSettings.address);
|
||||||
|
nodeId = nodeSettings.id;
|
||||||
}
|
}
|
||||||
|
|
||||||
var bootstrapDiscoveryProviderNodes = new ArrayList<Node>();
|
var bootstrapDiscoveryProviderNodes = new ArrayList<Node>();
|
||||||
|
@ -92,18 +97,32 @@ public class Entrypoint {
|
||||||
.builder("system")
|
.builder("system")
|
||||||
.withNumPartitions(1)
|
.withNumPartitions(1)
|
||||||
.withMembers(systemPartitionGroupMembers)
|
.withMembers(systemPartitionGroupMembers)
|
||||||
|
.withDataDirectory(Paths.get(".data-" + instanceSettings.id).toFile())
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
atomixBuilder.withPartitionGroups(PrimaryBackupPartitionGroup.builder("data").withNumPartitions(32).build());
|
/*atomixBuilder.withPartitionGroups(RaftPartitionGroup
|
||||||
|
.builder("raft")
|
||||||
|
.withNumPartitions(3)
|
||||||
|
.withFlushOnCommit()
|
||||||
|
.withStorageLevel(StorageLevel.MAPPED)
|
||||||
|
.withDataDirectory(Paths.get(".data-" + instanceSettings.id).toFile())
|
||||||
|
.build());
|
||||||
|
*/
|
||||||
|
|
||||||
atomixBuilder.withShutdownHook(false);
|
atomixBuilder.withShutdownHook(false);
|
||||||
atomixBuilder.withTypeRegistrationRequired();
|
atomixBuilder.withTypeRegistrationRequired();
|
||||||
|
|
||||||
if (instanceSettings.client) {
|
if (instanceSettings.client) {
|
||||||
atomixBuilder.addProfile(Profile.consensus(systemPartitionGroupMembers));
|
|
||||||
atomixBuilder.addProfile(Profile.dataGrid(32));
|
|
||||||
} else {
|
|
||||||
atomixBuilder.addProfile(Profile.client());
|
atomixBuilder.addProfile(Profile.client());
|
||||||
|
} else {
|
||||||
|
var prof = Profile.consensus(systemPartitionGroupMembers);
|
||||||
|
var profCfg = (ConsensusProfileConfig) prof.config();
|
||||||
|
//profCfg.setDataGroup("raft");
|
||||||
|
profCfg.setDataPath(".data-" + instanceSettings.id);
|
||||||
|
//profCfg.setPartitions(3);
|
||||||
|
//profCfg.setManagementGroup("system");
|
||||||
|
atomixBuilder.addProfile(prof);
|
||||||
|
//atomixBuilder.addProfile(Profile.dataGrid(32));
|
||||||
}
|
}
|
||||||
|
|
||||||
atomixBuilder.withCompatibleSerialization(false);
|
atomixBuilder.withCompatibleSerialization(false);
|
||||||
|
@ -114,11 +133,13 @@ public class Entrypoint {
|
||||||
|
|
||||||
atomix.start().join();
|
atomix.start().join();
|
||||||
|
|
||||||
var api = new ReactiveApi(atomix, diskSessions);
|
var api = new ReactiveApi(nodeId, atomix, diskSessions);
|
||||||
|
|
||||||
LOG.info("Starting ReactiveApi...");
|
LOG.info("Starting ReactiveApi...");
|
||||||
|
|
||||||
api.start();
|
api.start().block();
|
||||||
|
|
||||||
|
LOG.info("Started ReactiveApi");
|
||||||
|
|
||||||
return api;
|
return api;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,199 +1,348 @@
|
||||||
package it.tdlight.reactiveapi;
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
|
import static java.util.Collections.unmodifiableSet;
|
||||||
|
import static java.util.Objects.requireNonNull;
|
||||||
import static java.util.concurrent.CompletableFuture.completedFuture;
|
import static java.util.concurrent.CompletableFuture.completedFuture;
|
||||||
import static java.util.concurrent.CompletableFuture.failedFuture;
|
import static java.util.concurrent.CompletableFuture.failedFuture;
|
||||||
|
import static reactor.core.publisher.Mono.fromCompletionStage;
|
||||||
|
|
||||||
import io.atomix.core.Atomix;
|
import io.atomix.core.Atomix;
|
||||||
import io.atomix.core.idgenerator.AsyncAtomicIdGenerator;
|
import io.atomix.core.idgenerator.AsyncAtomicIdGenerator;
|
||||||
import io.atomix.core.lock.AsyncAtomicLock;
|
import io.atomix.core.lock.AsyncAtomicLock;
|
||||||
import io.atomix.core.map.AsyncAtomicMap;
|
import io.atomix.core.map.AsyncAtomicMap;
|
||||||
|
import io.atomix.protocols.raft.MultiRaftProtocol;
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateBotSessionRequest;
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
import it.tdlight.reactiveapi.CreateSessionRequest.CreateUserSessionRequest;
|
||||||
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
|
import it.tdlight.reactiveapi.CreateSessionRequest.LoadSessionFromDiskRequest;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.UUID;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionException;
|
import java.util.concurrent.CompletionException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
public class ReactiveApi {
|
public class ReactiveApi {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApi.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApi.class);
|
||||||
|
|
||||||
|
@NotNull
|
||||||
|
private final String nodeId;
|
||||||
private final Atomix atomix;
|
private final Atomix atomix;
|
||||||
private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.parallel());
|
private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.parallel());
|
||||||
private static final SchedulerExecutor BOUNDED_ELASTIC_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic());
|
private static final SchedulerExecutor BOUNDED_ELASTIC_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic());
|
||||||
private final AsyncAtomicIdGenerator nextSessionId;
|
private final AsyncAtomicIdGenerator nextSessionLiveId;
|
||||||
|
|
||||||
private final AsyncAtomicLock sessionModificationLock;
|
private final AsyncAtomicLock sessionModificationLock;
|
||||||
private final AsyncAtomicMap<Long, Long> sessionIdToUserId;
|
private final AsyncAtomicMap<Long, String> userIdToNodeId;
|
||||||
private final ConcurrentMap<Long, ReactiveApiPublisher> localNodeSessions = new ConcurrentHashMap<>();
|
/**
|
||||||
|
* User id -> session
|
||||||
|
*/
|
||||||
|
private final ConcurrentMap<Long, ReactiveApiPublisher> localLiveSessions = new ConcurrentHashMap<>();
|
||||||
private final DiskSessionsManager diskSessions;
|
private final DiskSessionsManager diskSessions;
|
||||||
|
|
||||||
public ReactiveApi(Atomix atomix, DiskSessionsManager diskSessions) {
|
public ReactiveApi(@NotNull String nodeId, Atomix atomix, DiskSessionsManager diskSessions) {
|
||||||
|
this.nodeId = nodeId;
|
||||||
this.atomix = atomix;
|
this.atomix = atomix;
|
||||||
this.nextSessionId = atomix.getAtomicIdGenerator("session-id").async();
|
|
||||||
this.sessionIdToUserId = atomix.<Long, Long>getAtomicMap("session-id-to-user-id").async();
|
var raftProtocol = MultiRaftProtocol.builder().build();
|
||||||
this.sessionModificationLock = atomix.getAtomicLock("session-modification").async();
|
this.nextSessionLiveId = atomix
|
||||||
|
.atomicIdGeneratorBuilder("session-live-id")
|
||||||
|
.withProtocol(raftProtocol)
|
||||||
|
.build()
|
||||||
|
.async();
|
||||||
|
this.sessionModificationLock = atomix
|
||||||
|
.atomicLockBuilder("session-modification")
|
||||||
|
.withProtocol(raftProtocol)
|
||||||
|
.build()
|
||||||
|
.async();
|
||||||
|
|
||||||
|
this.userIdToNodeId = atomix
|
||||||
|
.<Long, String>atomicMapBuilder("user-id-to-node-id")
|
||||||
|
//.withCacheEnabled(true)
|
||||||
|
//.withCacheSize(4096)
|
||||||
|
.withNullValues(false)
|
||||||
|
.withProtocol(raftProtocol)
|
||||||
|
.build()
|
||||||
|
.async();
|
||||||
|
|
||||||
this.diskSessions = diskSessions;
|
this.diskSessions = diskSessions;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() {
|
public Mono<Void> start() {
|
||||||
CompletableFuture.runAsync(() -> {
|
Mono<Set<Long>> idsSavedIntoLocalConfiguration = Mono.fromCallable(() -> {
|
||||||
List<CompletableFuture<CreateSessionResponse>> requests = new ArrayList<>();
|
|
||||||
synchronized (diskSessions) {
|
synchronized (diskSessions) {
|
||||||
for (Entry<String, DiskSession> entry : diskSessions.getSettings().sessions.entrySet()) {
|
return diskSessions.getSettings().userIdToSession().keySet();
|
||||||
try {
|
|
||||||
entry.getValue().validate();
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
LOG.error("Failed to load disk session {}", entry.getKey(), ex);
|
|
||||||
}
|
|
||||||
var sessionFolderName = entry.getKey();
|
|
||||||
var diskSession = entry.getValue();
|
|
||||||
requests.add(createSession(new LoadSessionFromDiskRequest(diskSession.userId,
|
|
||||||
sessionFolderName,
|
|
||||||
diskSession.token,
|
|
||||||
diskSession.phoneNumber
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
CompletableFuture
|
});
|
||||||
.allOf(requests.toArray(CompletableFuture<?>[]::new))
|
Mono<Set<Long>> distributedIds = this
|
||||||
.thenAccept(responses -> LOG.info("Loaded all saved sessions from disk"));
|
.getAllUsers()
|
||||||
}, BOUNDED_ELASTIC_EXECUTOR);
|
.flatMapIterable(Map::entrySet)
|
||||||
|
.filter(entry -> entry.getValue().equals(nodeId))
|
||||||
|
.map(Entry::getKey)
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
|
||||||
|
record DiskChanges(Set<Long> normalIds, Set<Long> addedIds, Set<Long> removedIds) {}
|
||||||
|
|
||||||
|
var diskChangesMono = Mono.zip(idsSavedIntoLocalConfiguration, distributedIds).map(tuple -> {
|
||||||
|
var localSet = tuple.getT1();
|
||||||
|
var remoteSet = tuple.getT2();
|
||||||
|
|
||||||
|
var deletedUsers = new HashSet<>(remoteSet);
|
||||||
|
deletedUsers.removeAll(localSet);
|
||||||
|
|
||||||
|
var addedUsers = new HashSet<>(localSet);
|
||||||
|
addedUsers.removeAll(remoteSet);
|
||||||
|
|
||||||
|
var normalUsers = new HashSet<>(localSet);
|
||||||
|
normalUsers.removeAll(addedUsers);
|
||||||
|
|
||||||
|
for (long user : addedUsers) {
|
||||||
|
LOG.warn("Detected a new user id from the disk configuration file: {}", user);
|
||||||
|
}
|
||||||
|
for (long user : normalUsers) {
|
||||||
|
LOG.info("Detected a user id from the disk configuration file: {}", user);
|
||||||
|
}
|
||||||
|
for (long user : deletedUsers) {
|
||||||
|
LOG.warn("The user id {} has been deleted from the disk configuration file", user);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new DiskChanges(unmodifiableSet(normalUsers), unmodifiableSet(addedUsers), unmodifiableSet(remoteSet));
|
||||||
|
}).cache();
|
||||||
|
|
||||||
|
var removeObsoleteDiskSessions = diskChangesMono
|
||||||
|
.flatMapIterable(diskChanges -> diskChanges.removedIds)
|
||||||
|
.flatMap(removedIds -> fromCompletionStage(() -> destroySession(removedIds, nodeId)))
|
||||||
|
.then();
|
||||||
|
|
||||||
|
var addedDiskSessionsFlux = diskChangesMono
|
||||||
|
.flatMapIterable(diskChanges -> diskChanges.addedIds)
|
||||||
|
.flatMap(this::getLocalDiskSession);
|
||||||
|
var normalDiskSessionsFlux = diskChangesMono
|
||||||
|
.flatMapIterable(diskChanges -> diskChanges.normalIds)
|
||||||
|
.flatMap(this::getLocalDiskSession);
|
||||||
|
|
||||||
|
var addNewDiskSessions = addedDiskSessionsFlux.flatMap(diskSessionAndId -> {
|
||||||
|
var id = diskSessionAndId.id;
|
||||||
|
var diskSession = diskSessionAndId.diskSession;
|
||||||
|
return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id,
|
||||||
|
diskSession.token,
|
||||||
|
diskSession.phoneNumber,
|
||||||
|
true
|
||||||
|
)));
|
||||||
|
}).then();
|
||||||
|
|
||||||
|
var loadExistingDiskSessions = normalDiskSessionsFlux.flatMap(diskSessionAndId -> {
|
||||||
|
var id = diskSessionAndId.id;
|
||||||
|
var diskSession = diskSessionAndId.diskSession;
|
||||||
|
return fromCompletionStage(() -> createSession(new LoadSessionFromDiskRequest(id,
|
||||||
|
diskSession.token,
|
||||||
|
diskSession.phoneNumber,
|
||||||
|
false
|
||||||
|
)));
|
||||||
|
}).then();
|
||||||
|
|
||||||
|
var diskInitMono = Mono.when(removeObsoleteDiskSessions, loadExistingDiskSessions, addNewDiskSessions)
|
||||||
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
|
.doOnTerminate(() -> LOG.info("Loaded all saved sessions from disk"));
|
||||||
|
|
||||||
// Listen for create-session signals
|
// Listen for create-session signals
|
||||||
atomix.getEventService().subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> {
|
var subscriptionMono = fromCompletionStage(() -> atomix
|
||||||
if (req instanceof LoadSessionFromDiskRequest) {
|
.getEventService()
|
||||||
return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster"));
|
.subscribe("create-session", CreateSessionRequest::deserializeBytes, req -> {
|
||||||
} else {
|
if (req instanceof LoadSessionFromDiskRequest) {
|
||||||
return createSession(req);
|
return failedFuture(new IllegalArgumentException("Can't pass a local request through the cluster"));
|
||||||
}
|
} else {
|
||||||
}, CreateSessionResponse::serializeBytes);
|
return createSession(req);
|
||||||
|
}
|
||||||
|
}, CreateSessionResponse::serializeBytes));
|
||||||
|
|
||||||
|
return diskInitMono.then(subscriptionMono).then();
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Void> destroySession(long userId, String nodeId) {
|
||||||
|
LOG.debug("Received session delete request: userid={}, nodeid=\"{}\"", userId, nodeId);
|
||||||
|
|
||||||
|
// Lock sessions modification
|
||||||
|
return sessionModificationLock
|
||||||
|
.lock()
|
||||||
|
.thenCompose(lockVersion -> {
|
||||||
|
LOG.trace("Obtained session modification lock for session delete request: {} \"{}\"", userId, nodeId);
|
||||||
|
return userIdToNodeId
|
||||||
|
.remove(userId, nodeId)
|
||||||
|
.thenAccept(deleted -> LOG.debug("Deleted session {} \"{}\": {}", userId, nodeId, deleted));
|
||||||
|
})
|
||||||
|
.whenComplete((response, error) -> sessionModificationLock
|
||||||
|
.unlock()
|
||||||
|
.thenRun(() -> LOG.trace("Released session modification lock for session delete request: {} \"{}\"", userId, nodeId))
|
||||||
|
)
|
||||||
|
.whenComplete((resp, ex) -> LOG.debug("Handled session delete request {} \"{}\", the response is: {}", userId, nodeId, resp, ex));
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<CreateSessionResponse> createSession(CreateSessionRequest req) {
|
public CompletableFuture<CreateSessionResponse> createSession(CreateSessionRequest req) {
|
||||||
|
LOG.debug("Received create session request: {}", req);
|
||||||
// Lock sessions creation
|
// Lock sessions creation
|
||||||
return sessionModificationLock.lock().thenCompose(lockVersion -> {
|
return sessionModificationLock
|
||||||
// Generate session id
|
.lock()
|
||||||
return this.nextFreeId().thenCompose(sessionId -> {
|
.thenCompose(lockVersion -> {
|
||||||
// Create the session instance
|
LOG.trace("Obtained session modification lock for session request: {}", req);
|
||||||
ReactiveApiPublisher reactiveApiPublisher;
|
// Generate session id
|
||||||
boolean loadedFromDisk;
|
return this.nextFreeLiveId().thenCompose(liveId -> {
|
||||||
long userId;
|
// Create the session instance
|
||||||
String botToken;
|
ReactiveApiPublisher reactiveApiPublisher;
|
||||||
Long phoneNumber;
|
boolean loadedFromDisk;
|
||||||
if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
|
boolean createNew;
|
||||||
loadedFromDisk = false;
|
long userId;
|
||||||
userId = createBotSessionRequest.userId();
|
String botToken;
|
||||||
botToken = createBotSessionRequest.token();
|
Long phoneNumber;
|
||||||
phoneNumber = null;
|
if (req instanceof CreateBotSessionRequest createBotSessionRequest) {
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, sessionId, userId, botToken);
|
loadedFromDisk = false;
|
||||||
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
|
createNew = true;
|
||||||
loadedFromDisk = false;
|
userId = createBotSessionRequest.userId();
|
||||||
userId = createUserSessionRequest.userId();
|
botToken = createBotSessionRequest.token();
|
||||||
botToken = null;
|
phoneNumber = null;
|
||||||
phoneNumber = createUserSessionRequest.phoneNumber();
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, sessionId, userId, phoneNumber);
|
} else if (req instanceof CreateUserSessionRequest createUserSessionRequest) {
|
||||||
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
|
loadedFromDisk = false;
|
||||||
loadedFromDisk = true;
|
createNew = true;
|
||||||
userId = loadSessionFromDiskRequest.userId();
|
userId = createUserSessionRequest.userId();
|
||||||
botToken = loadSessionFromDiskRequest.token();
|
botToken = null;
|
||||||
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
|
phoneNumber = createUserSessionRequest.phoneNumber();
|
||||||
if (loadSessionFromDiskRequest.phoneNumber() != null) {
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, sessionId, userId, phoneNumber);
|
} else if (req instanceof LoadSessionFromDiskRequest loadSessionFromDiskRequest) {
|
||||||
} else {
|
loadedFromDisk = true;
|
||||||
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, sessionId, userId, botToken);
|
createNew = loadSessionFromDiskRequest.createNew();
|
||||||
}
|
userId = loadSessionFromDiskRequest.userId();
|
||||||
} else {
|
botToken = loadSessionFromDiskRequest.token();
|
||||||
return failedFuture(new UnsupportedOperationException("Unexpected value: " + req));
|
phoneNumber = loadSessionFromDiskRequest.phoneNumber();
|
||||||
}
|
if (loadSessionFromDiskRequest.phoneNumber() != null) {
|
||||||
|
reactiveApiPublisher = ReactiveApiPublisher.fromPhoneNumber(atomix, liveId, userId, phoneNumber);
|
||||||
// Register the session instance to the local nodes map
|
} else {
|
||||||
var prev = localNodeSessions.put(sessionId, reactiveApiPublisher);
|
reactiveApiPublisher = ReactiveApiPublisher.fromToken(atomix, liveId, userId, botToken);
|
||||||
if (prev != null) {
|
|
||||||
LOG.error("Session id \"{}\" was already registered locally!", sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register the session instance to the distributed nodes map
|
|
||||||
return sessionIdToUserId.put(sessionId, req.userId()).thenComposeAsync(prevDistributed -> {
|
|
||||||
if (prevDistributed != null) {
|
|
||||||
LOG.error("Session id \"{}\" was already registered in the cluster!", sessionId);
|
|
||||||
}
|
|
||||||
|
|
||||||
CompletableFuture<?> saveToDiskFuture;
|
|
||||||
if (!loadedFromDisk) {
|
|
||||||
// Load existing session paths
|
|
||||||
HashSet<String> alreadyExistingPaths = new HashSet<>();
|
|
||||||
synchronized (diskSessions) {
|
|
||||||
for (var entry : diskSessions.getSettings().sessions.entrySet()) {
|
|
||||||
var path = entry.getKey();
|
|
||||||
var diskSessionSettings = entry.getValue();
|
|
||||||
if (diskSessionSettings.userId == userId) {
|
|
||||||
LOG.warn("User id \"{}\" session already exists in path: \"{}\"", userId, path);
|
|
||||||
}
|
|
||||||
alreadyExistingPaths.add(entry.getKey());
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
return failedFuture(new UnsupportedOperationException("Unexpected value: " + req));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a new disk session folder name
|
// Register the session instance to the local nodes map
|
||||||
String diskSessionFolderName;
|
var prev = localLiveSessions.put(liveId, reactiveApiPublisher);
|
||||||
do {
|
if (prev != null) {
|
||||||
diskSessionFolderName = UUID.randomUUID().toString();
|
LOG.error("User id \"{}\" was already registered locally! {}", liveId, prev);
|
||||||
} while (alreadyExistingPaths.contains(diskSessionFolderName));
|
|
||||||
|
|
||||||
// Create the disk session configuration
|
|
||||||
var diskSession = new DiskSession(userId, botToken, phoneNumber);
|
|
||||||
Path path;
|
|
||||||
synchronized (diskSessions) {
|
|
||||||
diskSessions.getSettings().sessions.put(diskSessionFolderName, diskSession);
|
|
||||||
path = Paths.get(diskSessions.getSettings().path).resolve(diskSessionFolderName);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the session instance
|
// Register the session instance to the distributed nodes map
|
||||||
reactiveApiPublisher.start(path);
|
return userIdToNodeId.put(userId, nodeId).thenComposeAsync(prevDistributed -> {
|
||||||
|
if (prevDistributed != null && prevDistributed.value() != null &&
|
||||||
|
!Objects.equals(this.nodeId, prevDistributed.value())) {
|
||||||
|
LOG.error("Session id \"{}\" is already registered in the node \"{}\"!", liveId, prevDistributed.value());
|
||||||
|
}
|
||||||
|
|
||||||
saveToDiskFuture = CompletableFuture.runAsync(() -> {
|
Path baseSessionsPath;
|
||||||
// Save updated sessions configuration to disk
|
synchronized (diskSessions) {
|
||||||
try {
|
baseSessionsPath = Paths.get(diskSessions.getSettings().path);
|
||||||
|
}
|
||||||
|
String diskSessionFolderName = Long.toUnsignedString(userId);
|
||||||
|
Path sessionPath = baseSessionsPath.resolve(diskSessionFolderName);
|
||||||
|
|
||||||
|
CompletableFuture<?> saveToDiskFuture;
|
||||||
|
if (!loadedFromDisk) {
|
||||||
|
// Create the disk session configuration
|
||||||
|
var diskSession = new DiskSession(botToken, phoneNumber);
|
||||||
synchronized (diskSessions) {
|
synchronized (diskSessions) {
|
||||||
diskSessions.save();
|
diskSessions.getSettings().userIdToSession().put(userId, diskSession);
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
|
||||||
throw new CompletionException("Failed to save disk sessions configuration", e);
|
|
||||||
}
|
|
||||||
}, BOUNDED_ELASTIC_EXECUTOR);
|
|
||||||
} else {
|
|
||||||
saveToDiskFuture = completedFuture(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
return saveToDiskFuture.thenApply(ignored -> new CreateSessionResponse(sessionId));
|
saveToDiskFuture = CompletableFuture.runAsync(() -> {
|
||||||
}, BOUNDED_ELASTIC_EXECUTOR);
|
// Save updated sessions configuration to disk
|
||||||
});
|
try {
|
||||||
});
|
synchronized (diskSessions) {
|
||||||
|
diskSessions.save();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new CompletionException("Failed to save disk sessions configuration", e);
|
||||||
|
}
|
||||||
|
}, BOUNDED_ELASTIC_EXECUTOR);
|
||||||
|
} else {
|
||||||
|
saveToDiskFuture = completedFuture(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the session instance
|
||||||
|
reactiveApiPublisher.start(sessionPath);
|
||||||
|
|
||||||
|
return saveToDiskFuture.thenApply(ignored -> new CreateSessionResponse(liveId));
|
||||||
|
}, BOUNDED_ELASTIC_EXECUTOR);
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.whenComplete((response, error) -> sessionModificationLock
|
||||||
|
.unlock()
|
||||||
|
.thenRun(() -> LOG.trace("Released session modification lock for session request: {}", req))
|
||||||
|
)
|
||||||
|
.whenComplete((resp, ex) -> LOG.debug("Handled session request {}, the response is: {}", req, resp, ex));
|
||||||
}
|
}
|
||||||
|
|
||||||
public CompletableFuture<Long> nextFreeId() {
|
public CompletableFuture<Long> nextFreeLiveId() {
|
||||||
return nextSessionId.nextId().thenCompose(id -> sessionIdToUserId.containsKey(id).thenCompose(exists -> {
|
return nextSessionLiveId.nextId();
|
||||||
if (exists) {
|
|
||||||
return nextFreeId();
|
|
||||||
} else {
|
|
||||||
return completedFuture(id);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Atomix getAtomix() {
|
public Atomix getAtomix() {
|
||||||
return atomix;
|
return atomix;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the list of current sessions
|
||||||
|
* @return map of user id -> node id
|
||||||
|
*/
|
||||||
|
public Mono<Map<Long, String>> getAllUsers() {
|
||||||
|
return Flux.defer(() -> {
|
||||||
|
var it = userIdToNodeId.entrySet().iterator();
|
||||||
|
var hasNextMono = fromCompletionStage(it::hasNext);
|
||||||
|
var strictNextMono = fromCompletionStage(it::next)
|
||||||
|
.map(elem -> Map.entry(elem.getKey(), elem.getValue().value()));
|
||||||
|
|
||||||
|
var nextOrNothingMono = hasNextMono.flatMap(hasNext -> {
|
||||||
|
if (hasNext) {
|
||||||
|
return strictNextMono;
|
||||||
|
} else {
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return nextOrNothingMono.repeatWhen(s -> s.takeWhile(n -> n > 0));
|
||||||
|
}).collectMap(Entry::getKey, Entry::getValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean is(String nodeId) {
|
||||||
|
return this.nodeId.equals(nodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static record DiskSessionAndId(DiskSession diskSession, long id) {}
|
||||||
|
|
||||||
|
private Mono<DiskSessionAndId> getLocalDiskSession(Long localId) {
|
||||||
|
return Mono.fromCallable(() -> {
|
||||||
|
synchronized (diskSessions) {
|
||||||
|
var diskSession = requireNonNull(diskSessions.getSettings().userIdToSession().get(localId),
|
||||||
|
"Id not found: " + localId
|
||||||
|
);
|
||||||
|
try {
|
||||||
|
diskSession.validate();
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.error("Failed to load disk session {}", localId, ex);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return new DiskSessionAndId(diskSession, localId);
|
||||||
|
}
|
||||||
|
}).subscribeOn(Schedulers.boundedElastic());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,44 +1,51 @@
|
||||||
package it.tdlight.reactiveapi;
|
package it.tdlight.reactiveapi;
|
||||||
|
|
||||||
import io.atomix.core.Atomix;
|
import io.atomix.core.Atomix;
|
||||||
import it.tdlight.jni.TdApi;
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.DataInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.StringJoiner;
|
||||||
import org.apache.commons.lang3.SerializationException;
|
import org.slf4j.Logger;
|
||||||
import reactor.core.publisher.Flux;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.FluxSink.OverflowStrategy;
|
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
public class ReactiveApiPublisher {
|
public class ReactiveApiPublisher {
|
||||||
|
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(ReactiveApiPublisher.class);
|
||||||
private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic());
|
private static final SchedulerExecutor SCHEDULER_EXECUTOR = new SchedulerExecutor(Schedulers.boundedElastic());
|
||||||
|
|
||||||
private final Atomix atomix;
|
private final Atomix atomix;
|
||||||
private final long userId;
|
private final long userId;
|
||||||
private final long sessionId;
|
private final long liveId;
|
||||||
private final String botToken;
|
private final String botToken;
|
||||||
private final Long phoneNumber;
|
private final Long phoneNumber;
|
||||||
|
|
||||||
private ReactiveApiPublisher(Atomix atomix, long sessionId, long userId, String botToken, Long phoneNumber) {
|
private ReactiveApiPublisher(Atomix atomix, long liveId, long userId, String botToken, Long phoneNumber) {
|
||||||
this.atomix = atomix;
|
this.atomix = atomix;
|
||||||
this.userId = userId;
|
this.userId = userId;
|
||||||
this.sessionId = sessionId;
|
this.liveId = liveId;
|
||||||
this.botToken = botToken;
|
this.botToken = botToken;
|
||||||
this.phoneNumber = phoneNumber;
|
this.phoneNumber = phoneNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReactiveApiPublisher fromToken(Atomix atomix, Long sessionId, long userId, String token) {
|
public static ReactiveApiPublisher fromToken(Atomix atomix, Long liveId, long userId, String token) {
|
||||||
return new ReactiveApiPublisher(atomix, sessionId, userId, token, null);
|
return new ReactiveApiPublisher(atomix, liveId, userId, token, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long sessionId, long userId, long phoneNumber) {
|
public static ReactiveApiPublisher fromPhoneNumber(Atomix atomix, Long liveId, long userId, long phoneNumber) {
|
||||||
return new ReactiveApiPublisher(atomix, sessionId, userId, null, phoneNumber);
|
return new ReactiveApiPublisher(atomix, liveId, userId, null, phoneNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start(Path path) {
|
public void start(Path path) {
|
||||||
|
LOG.info("Starting session \"{}\" in path \"{}\"", this, path);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return new StringJoiner(", ", ReactiveApiPublisher.class.getSimpleName() + "[", "]")
|
||||||
|
.add("userId=" + userId)
|
||||||
|
.add("liveId=" + liveId)
|
||||||
|
.add("botToken='" + botToken + "'")
|
||||||
|
.add("phoneNumber=" + phoneNumber)
|
||||||
|
.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@
|
||||||
<PatternLayout disableAnsi="false"
|
<PatternLayout disableAnsi="false"
|
||||||
pattern="%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} %highlight{${LOG_LEVEL_PATTERN:-%5p}}{FATAL=red blink, ERROR=red, WARN=yellow bold, INFO=green, DEBUG=green bold, TRACE=blue} %style{%processId}{magenta} [%15.15t] %style{%-20.20c{1}}{cyan} : %m%n%ex"/>
|
pattern="%d{${LOG_DATEFORMAT_PATTERN:-yyyy-MM-dd HH:mm:ss.SSS}} %highlight{${LOG_LEVEL_PATTERN:-%5p}}{FATAL=red blink, ERROR=red, WARN=yellow bold, INFO=green, DEBUG=green bold, TRACE=blue} %style{%processId}{magenta} [%15.15t] %style{%-20.20c{1}}{cyan} : %m%n%ex"/>
|
||||||
</TerminalConsole>
|
</TerminalConsole>
|
||||||
|
|
||||||
<Async name="Async">
|
<Async name="Async">
|
||||||
<AppenderRef ref="ConsoleAppender"/>
|
<AppenderRef ref="ConsoleAppender"/>
|
||||||
<LinkedTransferQueue/>
|
<LinkedTransferQueue/>
|
||||||
|
@ -14,10 +13,9 @@
|
||||||
<Loggers>
|
<Loggers>
|
||||||
<Root level="INFO">
|
<Root level="INFO">
|
||||||
<filters>
|
<filters>
|
||||||
<MarkerFilter marker="NETWORK_PACKETS" onMatch="DENY"
|
<MarkerFilter marker="NETWORK_PACKETS" onMatch="DENY" onMismatch="NEUTRAL"/>
|
||||||
onMismatch="NEUTRAL"/>
|
|
||||||
</filters>
|
</filters>
|
||||||
<AppenderRef ref="Async"/>
|
<AppenderRef ref="Async"/>
|
||||||
</Root>
|
</Root>
|
||||||
</Loggers>
|
</Loggers>
|
||||||
</Configuration>
|
</Configuration>
|
||||||
|
|
Loading…
Reference in New Issue
Block a user