Deployment shared lock not freed

This commit is contained in:
Andrea Cavalli 2020-10-28 12:27:10 +01:00
parent aaf6d79b2b
commit cc16dbca1a
2 changed files with 79 additions and 49 deletions

View File

@ -6,12 +6,13 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class RemoteClientBotAddresses { public class RemoteClientBotAddresses {
private final Set<String> addresses; private final LinkedHashSet<String> addresses;
private final Path addressesFilePath; private final Path addressesFilePath;
public RemoteClientBotAddresses(Path addressesFilePath) throws IOException { public RemoteClientBotAddresses(Path addressesFilePath) throws IOException {
@ -19,7 +20,11 @@ public class RemoteClientBotAddresses {
if (Files.notExists(addressesFilePath)) { if (Files.notExists(addressesFilePath)) {
Files.createFile(addressesFilePath); Files.createFile(addressesFilePath);
} }
addresses = Files.readAllLines(addressesFilePath, StandardCharsets.UTF_8).stream().filter(address -> !address.isBlank()).collect(Collectors.toSet()); addresses = Files
.readAllLines(addressesFilePath, StandardCharsets.UTF_8)
.stream()
.filter(address -> !address.isBlank())
.collect(Collectors.toCollection(LinkedHashSet::new));
} }
public synchronized void putAddress(String address) throws IOException { public synchronized void putAddress(String address) throws IOException {

View File

@ -130,57 +130,16 @@ public class TDLibRemoteClient implements AutoCloseable {
var deploymentLock = lockAcquisitionResult.result(); var deploymentLock = lockAcquisitionResult.result();
putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult<Void> putAllResult) -> { putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult<Void> putAllResult) -> {
if (putAllResult.succeeded()) { if (putAllResult.succeeded()) {
clusterManager listenForDeployRequests(botAddresses,
.getEventBus() clusterManager,
.consumer("tdlib.remoteclient.clients.deploy", (Message<String> msg) -> { sink,
var botAddress = msg.body(); deployableBotAddresses
if (botAddresses.has(botAddress)) { );
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500, "Failed to deploy existing bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage());
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
logger.info("Deploying new bot at address \"" + botAddress + "\"");
deployableBotAddresses.putIfAbsent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() == null) {
try {
botAddresses.putAddress(botAddress);
} catch (IOException e) {
logger.error("Can't save bot address \"" + botAddress + "\" to addresses file", e);
}
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500, "Failed to deploy new bot \"" + botAddress + "\": " + deploymentResult.cause().getLocalizedMessage());
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
logger.error("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\"");
sink.error(new UnsupportedOperationException("Can't add new bot address \"" + botAddress + "\" because it's already present! Value: \"" + putResult.result() + "\""));
deploymentLock.release();
}
} else {
logger.error("Can't update shared map", putResult.cause());
sink.error(putResult.cause());
deploymentLock.release();
}
});
}
});
} else { } else {
logger.error("Can't update shared map", putAllResult.cause()); logger.error("Can't update shared map", putAllResult.cause());
sink.error(putAllResult.cause()); sink.error(putAllResult.cause());
deploymentLock.release();
} }
deploymentLock.release();
}); });
} else { } else {
logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause()); logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause());
@ -202,6 +161,72 @@ public class TDLibRemoteClient implements AutoCloseable {
} }
} }
private void listenForDeployRequests(RemoteClientBotAddresses botAddresses,
TdClusterManager clusterManager,
reactor.core.publisher.FluxSink<Object> sink,
AsyncMap<Object, Object> deployableBotAddresses) {
clusterManager.getEventBus().consumer("tdlib.remoteclient.clients.deploy", (Message<String> msg) -> {
clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
var botAddress = msg.body();
if (botAddresses.has(botAddress)) {
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500,
"Failed to deploy existing bot \"" + botAddress + "\": " + deploymentResult
.cause()
.getLocalizedMessage()
);
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
deployableBotAddresses.putIfAbsent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() == null) {
logger.info("Deploying new bot at address \"" + botAddress + "\"");
try {
botAddresses.putAddress(botAddress);
} catch (IOException e) {
logger.error("Can't save bot address \"" + botAddress + "\" to addresses file", e);
}
deployBot(clusterManager, botAddress, deploymentResult -> {
if (deploymentResult.failed()) {
msg.fail(500,
"Failed to deploy new bot \"" + botAddress + "\": " + deploymentResult
.cause()
.getLocalizedMessage()
);
sink.error(deploymentResult.cause());
} else {
sink.next(botAddress);
}
deploymentLock.release();
});
} else {
// Ignore this bot because it's present on another cluster
deploymentLock.release();
}
} else {
logger.error("Can't update shared map", putResult.cause());
sink.error(putResult.cause());
deploymentLock.release();
}
});
}
} else {
logger.error("Can't obtain deployment lock", lockAcquisitionResult.cause());
sink.error(lockAcquisitionResult.cause());
}
});
});
}
private void deployBot(TdClusterManager clusterManager, String botAddress, Handler<AsyncResult<String>> deploymentHandler) { private void deployBot(TdClusterManager clusterManager, String botAddress, Handler<AsyncResult<String>> deploymentHandler) {
AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager); AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager);
AtomicReference<Lock> deploymentLock = new AtomicReference<>(); AtomicReference<Lock> deploymentLock = new AtomicReference<>();