diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java index 94facfa..ee0b39c 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/RemoteClientBotAddresses.java @@ -6,12 +6,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Set; import java.util.stream.Collectors; public class RemoteClientBotAddresses { - private final Set addresses; + private final LinkedHashSet addresses; private final Path addressesFilePath; public RemoteClientBotAddresses(Path addressesFilePath) throws IOException { @@ -19,7 +20,11 @@ public class RemoteClientBotAddresses { if (Files.notExists(addressesFilePath)) { Files.createFile(addressesFilePath); } - addresses = Files.readAllLines(addressesFilePath, StandardCharsets.UTF_8).stream().filter(address -> !address.isBlank()).collect(Collectors.toSet()); + addresses = Files + .readAllLines(addressesFilePath, StandardCharsets.UTF_8) + .stream() + .filter(address -> !address.isBlank()) + .collect(Collectors.toCollection(LinkedHashSet::new)); } public synchronized void putAddress(String address) throws IOException { diff --git a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java index a0dbf48..3a39ccb 100644 --- a/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java +++ b/src/main/java/it/tdlight/tdlibsession/remoteclient/TDLibRemoteClient.java @@ -130,57 +130,16 @@ public class TDLibRemoteClient implements AutoCloseable { var deploymentLock = lockAcquisitionResult.result(); putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult putAllResult) -> { if (putAllResult.succeeded()) { - clusterManager - .getEventBus() - .consumer("tdlib.remoteclient.clients.deploy", (Message msg) -> { - var botAddress = msg.body(); - if (botAddresses.has(botAddress)) { - deployBot(clusterManager, botAddress, deploymentResult -> { - if (deploymentResult.failed()) { - msg.fail(500, "Failed to deploy existing bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage()); - sink.error(deploymentResult.cause()); - } else { - sink.next(botAddress); - } - deploymentLock.release(); - }); - } else { - logger.info("Deploying new bot at address \"" + botAddress + "\""); - deployableBotAddresses.putIfAbsent(botAddress, netInterface, putResult -> { - if (putResult.succeeded()) { - if (putResult.result() == null) { - try { - botAddresses.putAddress(botAddress); - } catch (IOException e) { - logger.error("Can't save bot address \"" + botAddress + "\" to addresses file", e); - } - deployBot(clusterManager, botAddress, deploymentResult -> { - if (deploymentResult.failed()) { - msg.fail(500, "Failed to deploy new bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage()); - sink.error(deploymentResult.cause()); - } else { - sink.next(botAddress); - } - deploymentLock.release(); - }); - } else { - logger.error("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\""); - sink.error(new UnsupportedOperationException("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\"")); - deploymentLock.release(); - } - } else { - logger.error("Can't update shared map", putResult.cause()); - sink.error(putResult.cause()); - deploymentLock.release(); - } - }); - } - }); + listenForDeployRequests(botAddresses, + clusterManager, + sink, + deployableBotAddresses + ); } else { logger.error("Can't update shared map", putAllResult.cause()); sink.error(putAllResult.cause()); - deploymentLock.release(); } + deploymentLock.release(); }); } else { logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause()); @@ -202,6 +161,72 @@ public class TDLibRemoteClient implements AutoCloseable { } } + private void listenForDeployRequests(RemoteClientBotAddresses botAddresses, + TdClusterManager clusterManager, + reactor.core.publisher.FluxSink sink, + AsyncMap deployableBotAddresses) { + clusterManager.getEventBus().consumer("tdlib.remoteclient.clients.deploy", (Message msg) -> { + + clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> { + if (lockAcquisitionResult.succeeded()) { + var deploymentLock = lockAcquisitionResult.result(); + var botAddress = msg.body(); + if (botAddresses.has(botAddress)) { + deployBot(clusterManager, botAddress, deploymentResult -> { + if (deploymentResult.failed()) { + msg.fail(500, + "Failed to deploy existing bot \"" + botAddress + "\": " + deploymentResult + .cause() + .getLocalizedMessage() + ); + sink.error(deploymentResult.cause()); + } else { + sink.next(botAddress); + } + deploymentLock.release(); + }); + } else { + deployableBotAddresses.putIfAbsent(botAddress, netInterface, putResult -> { + if (putResult.succeeded()) { + if (putResult.result() == null) { + logger.info("Deploying new bot at address \"" + botAddress + "\""); + try { + botAddresses.putAddress(botAddress); + } catch (IOException e) { + logger.error("Can't save bot address \"" + botAddress + "\" to addresses file", e); + } + deployBot(clusterManager, botAddress, deploymentResult -> { + if (deploymentResult.failed()) { + msg.fail(500, + "Failed to deploy new bot \"" + botAddress + "\": " + deploymentResult + .cause() + .getLocalizedMessage() + ); + sink.error(deploymentResult.cause()); + } else { + sink.next(botAddress); + } + deploymentLock.release(); + }); + } else { + // Ignore this bot because it's present on another cluster + deploymentLock.release(); + } + } else { + logger.error("Can't update shared map", putResult.cause()); + sink.error(putResult.cause()); + deploymentLock.release(); + } + }); + } + } else { + logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause()); + sink.error(lockAcquisitionResult.cause()); + } + }); + }); + } + private void deployBot(TdClusterManager clusterManager, String botAddress, Handler> deploymentHandler) { AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager); AtomicReference deploymentLock = new AtomicReference<>();