Replace verticles with normal class

This commit is contained in:
Andrea Cavalli 2021-01-13 19:46:46 +01:00
parent ed7d12e9ba
commit a3a2893fb8
6 changed files with 109 additions and 132 deletions

View File

@ -56,11 +56,23 @@ public class EventBusFlux {
MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel");
var subscription = flux.subscribe(item -> {
eventBus.send(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions);
var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
}, error -> {
eventBus.send(subscriptionAddress + ".signal", SignalMessage.<T>onError(error), signalDeliveryOptions);
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onError(error), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
}, () -> {
eventBus.send(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions);
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
});
cancel.handler(msg3 -> {

View File

@ -4,7 +4,6 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.core.shareddata.AsyncMap;
import it.tdlight.common.Init;
@ -37,7 +36,7 @@ public class TDLibRemoteClient implements AutoCloseable {
private final int port;
private final Set<String> membersAddresses;
private final Many<TdClusterManager> clusterManager = Sinks.many().replay().latest();
private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false);
private final Scheduler deploymentScheduler = Schedulers.single();
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses) {
this.securityInfo = securityInfo;
@ -155,7 +154,9 @@ public class TDLibRemoteClient implements AutoCloseable {
})
.doOnError(ex -> {
logger.error(ex.getLocalizedMessage(), ex);
}).subscribe(i -> {}, e -> {}, () -> startedEventHandler.handle(null));
}).subscribe(i -> {}, e -> {
logger.error("Remote client error", e);
}, () -> startedEventHandler.handle(null));
} catch (IOException ex) {
logger.error("Remote client error", ex);
}
@ -230,56 +231,43 @@ public class TDLibRemoteClient implements AutoCloseable {
private void deployBot(TdClusterManager clusterManager, String botAddress, Handler<AsyncResult<String>> deploymentHandler) {
AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager);
verticle.onBeforeStop(handler -> {
vertxStatusScheduler.schedule(() -> {
clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
verticle.onAfterStop(handler2 -> {
vertxStatusScheduler.schedule(() -> {
deploymentLock.release();
handler2.complete();
});
});
clusterManager.getSharedData().getClusterWideMap("runningBotAddresses", (AsyncResult<AsyncMap<String, String>> mapResult) -> {
if (mapResult.succeeded()) {
var runningBotAddresses = mapResult.result();
runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() != null) {
handler.complete();
} else {
handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed");
}
clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
verticle.onAfterStop(handler2 -> {
deploymentLock.release();
handler2.complete();
});
clusterManager.getSharedData().getClusterWideMap("runningBotAddresses", (AsyncResult<AsyncMap<String, String>> mapResult) -> {
if (mapResult.succeeded()) {
var runningBotAddresses = mapResult.result();
runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() != null) {
handler.complete();
} else {
handler.fail(putResult.cause());
handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed");
}
});
} else {
handler.fail(mapResult.cause());
}
});
} else {
handler.fail(lockAcquisitionResult.cause());
}
});
} else {
handler.fail(putResult.cause());
}
});
} else {
handler.fail(mapResult.cause());
}
});
} else {
handler.fail(lockAcquisitionResult.cause());
}
});
});
clusterManager
.getVertx()
.deployVerticle(verticle,
clusterManager
.newDeploymentOpts()
.setConfig(new JsonObject()
.put("botAddress", botAddress)
.put("botAlias", botAddress)
.put("local", false)),
(deployed) -> {
if (deployed.failed()) {
logger.error("Can't deploy bot \"" + botAddress + "\"", deployed.cause());
}
deploymentHandler.handle(deployed);
}
);
verticle.start(botAddress, botAddress, false).doOnError(error -> {
logger.error("Can't deploy bot \"" + botAddress + "\"", error);
}).subscribeOn(deploymentScheduler).subscribe(v -> {}, err -> {
deploymentHandler.handle(Future.failedFuture(err));
}, () -> {
deploymentHandler.handle(Future.succeededFuture());
});
}
private void putAllAsync(AsyncMap<Object, Object> sharedMap,
@ -307,6 +295,5 @@ public class TDLibRemoteClient implements AutoCloseable {
@Override
public void close() {
clusterManager.asFlux().blockFirst();
vertxStatusScheduler.dispose();
}
}

View File

@ -58,7 +58,7 @@ public class AsyncTdEasy {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class);
private final Scheduler scheduler = Schedulers.newSingle("TdEasyUpdates");
private final Scheduler scheduler = Schedulers.single();
private final ReplayProcessor<AuthorizationState> authState = ReplayProcessor.create(1);
private final ReplayProcessor<Boolean> requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false);
private final ReplayProcessor<TdEasySettings> settings = ReplayProcessor.cacheLast();

View File

@ -39,6 +39,8 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle {
@ -47,6 +49,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
public static final boolean OUTPUT_REQUESTS = false;
public static final byte[] EMPTY = new byte[0];
private final Scheduler tdMiddleScheduler = Schedulers.single();
private final Many<Boolean> tdClosed = Sinks.many().replay().latestOrDefault(false);
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
@ -146,6 +149,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
if (msg.succeeded()) {
this.listen()
.timeout(Duration.ofSeconds(30))
.subscribeOn(tdMiddleScheduler)
.subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(msg.cause());
@ -241,7 +245,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
tdClosed.tryEmitNext(true);
}
}
}));
})).subscribeOn(tdMiddleScheduler);
}
@Override
@ -297,6 +301,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
}).switchIfEmpty(Mono.fromSupplier(() -> {
return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"));
}));
})).subscribeOn(tdMiddleScheduler);
}
}

View File

@ -1,6 +1,5 @@
package it.tdlight.tdlibsession.td.middle.direct;
import io.vertx.core.json.JsonObject;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.Object;
@ -10,7 +9,6 @@ import it.tdlight.tdlibsession.td.middle.AsyncTdMiddle;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient;
import it.tdlight.tdlibsession.td.middle.server.AsyncTdMiddleEventBusServer;
import it.tdlight.utils.MonoUtils;
import java.util.Objects;
import org.warp.commonutils.error.InitializationException;
import reactor.core.publisher.Flux;
@ -35,14 +33,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
}
public Mono<AsyncTdMiddleLocal> start() {
return Mono.<String>create(sink -> {
masterClusterManager
.getVertx()
.deployVerticle(srv,
masterClusterManager.newDeploymentOpts().setConfig(new JsonObject().put("botAddress", botAddress).put("botAlias", botAlias).put("local", true)),
MonoUtils.toHandler(sink)
);
}).onErrorMap(InitializationException::new).flatMap(_x -> {
return srv.start(botAddress, botAlias, true).onErrorMap(InitializationException::new).flatMap(_x -> {
try {
return AsyncTdMiddleEventBusClient.getAndDeployInstance(masterClusterManager, botAlias, botAddress, true).doOnNext(cli -> {
this.cli.onNext(cli);

View File

@ -2,7 +2,6 @@ package it.tdlight.tdlibsession.td.middle.server;
import static it.tdlight.tdlibsession.td.middle.client.AsyncTdMiddleEventBusClient.OUTPUT_REQUESTS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
@ -37,7 +36,7 @@ import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
public class AsyncTdMiddleEventBusServer {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdMiddleEventBusServer.class);
@ -69,7 +68,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
this.cluster = clusterManager;
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000);
this.tdSrvPoll = Schedulers.newSingle("TdSrvPoll");
this.tdSrvPoll = Schedulers.single();
if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) {
cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec());
cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec());
@ -79,57 +78,52 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
}
@Override
public void start(Promise<Void> startPromise) {
var botAddress = config().getString("botAddress");
if (botAddress == null || botAddress.isEmpty()) {
throw new IllegalArgumentException("botAddress is not set!");
}
this.botAddress = botAddress;
var botAlias = config().getString("botAlias");
if (botAlias == null || botAlias.isEmpty()) {
throw new IllegalArgumentException("botAlias is not set!");
}
this.botAlias = botAlias;
var local = config().getBoolean("local");
if (local == null) {
throw new IllegalArgumentException("local is not set!");
}
this.local = local;
this.td = new AsyncTdDirectImpl(botAlias);
public Mono<Void> start(String botAddress, String botAlias, boolean local) {
return Mono.<Void>create(sink -> {
if (botAddress == null || botAddress.isEmpty()) {
sink.error(new IllegalArgumentException("botAddress is not set!"));
}
this.botAddress = botAddress;
if (botAlias == null || botAlias.isEmpty()) {
sink.error(new IllegalArgumentException("botAlias is not set!"));
}
this.botAlias = botAlias;
this.local = local;
this.td = new AsyncTdDirectImpl(botAlias);
AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) {
this.listen().then(this.pipe()).then(Mono.<Void>create(registrationSink -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})).subscribeOn(this.tdSrvPoll)
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
msg.reply(EMPTY);
AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) {
this.listen().then(this.pipe()).then(Mono.<Void>create(registrationSink -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
} else {
msg.reply(EMPTY);
}
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> {
startPromise.complete(h.result());
});
} else {
startPromise.fail(h.cause());
}
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})).subscribeOn(this.tdSrvPoll)
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
msg.reply(EMPTY);
});
} else {
msg.reply(EMPTY);
}
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> {
sink.success();
});
} else {
sink.error(h.cause());
}
});
}).subscribeOn(tdSrvPoll);
}
public void onBeforeStop(Consumer<Promise<Void>> r) {
@ -140,11 +134,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
this.onAfterStopListeners.add(r);
}
@Override
public void stop(Promise<Void> stopPromise) {
stopPromise.complete();
}
private void runAll(List<Consumer<Promise<Void>>> actions, Handler<AsyncResult<Void>> resultHandler) {
if (actions.isEmpty()) {
resultHandler.handle(Future.succeededFuture());
@ -202,8 +191,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
});
executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
});
}).subscribeOn(tdSrvPoll);
}
private void undeploy(Runnable whenUndeployed) {
@ -237,12 +225,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
logger.error("An afterStop listener failed: " + onAfterStopHandler.cause());
}
vertx.undeploy(deploymentID(), undeployed -> {
if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause());
}
whenUndeployed.run();
});
whenUndeployed.run();
});
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when stopping", ex);
@ -260,7 +243,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.replace(" ", "")
.replace(" = ", "="));
}
}).bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100))
}).bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100))
.windowTimeout(1, Duration.ofSeconds(5))
.flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))
.map(TdResultList::new).doFinally(s -> {