Update EventBusFlux.java, SignalMessage.java, and 11 more files...

This commit is contained in:
Andrea Cavalli 2021-01-13 17:22:14 +01:00
parent 7d6122e777
commit d3f813d8cb
11 changed files with 620 additions and 392 deletions

View File

@ -0,0 +1,164 @@
package it.tdlight.tdlibsession;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageCodec;
import io.vertx.core.eventbus.MessageConsumer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class EventBusFlux {
private static final Logger logger = LoggerFactory.getLogger(EventBusFlux.class);
private static final byte[] EMPTY = new byte[0];
public static <T> void registerFluxCodec(EventBus eventBus, MessageCodec<T, T> itemsCodec) {
var signalsCodec = new SignalMessageCodec<T>(itemsCodec);
try {
eventBus.registerCodec(signalsCodec);
} catch (IllegalStateException ex) {
if (!ex.getMessage().contains("Already a codec registered with name")) {
throw ex;
}
}
}
public static <T> Mono<Void> serve(Flux<T> flux,
EventBus eventBus,
String fluxAddress,
DeliveryOptions baseDeliveryOptions,
MessageCodec<T, T> itemsCodec,
Duration connectionTimeout) {
var signalsCodec = new SignalMessageCodec<T>(itemsCodec);
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
.setSendTimeout(connectionTimeout.toMillis());
var signalDeliveryOptions = new DeliveryOptions(deliveryOptions)
.setCodecName(signalsCodec.name());
AtomicInteger subscriptionsCount = new AtomicInteger();
return Mono.create(sink -> {
MessageConsumer<byte[]> subscribe = eventBus.consumer(fluxAddress + ".subscribe");
subscribe.handler(msg -> {
if (subscriptionsCount.incrementAndGet() > 1) {
subscriptionsCount.decrementAndGet();
logger.error("Another client tried to connect to the same flux. Rejecting the request.");
msg.fail(500, "This flux is already in use!");
return;
}
long subscriptionId = 0;
var subscriptionAddress = fluxAddress + "." + subscriptionId;
MessageConsumer<byte[]> dispose = eventBus.consumer(subscriptionAddress + ".dispose");
MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel");
var subscription = flux.subscribe(item -> {
eventBus.send(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions);
}, error -> {
eventBus.send(subscriptionAddress + ".signal", SignalMessage.<T>onError(error), signalDeliveryOptions);
}, () -> {
eventBus.send(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions);
});
cancel.handler(msg3 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
msg3.reply(EMPTY, deliveryOptions);
});
dispose.handler(msg2 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
cancel.unregister(v -> {
if (v.failed()) {
logger.error("Failed to unregister cancel", v.cause());
}
dispose.unregister(v2 -> {
if (v.failed()) {
logger.error("Failed to unregister dispose", v2.cause());
}
subscribe.unregister(v3 -> {
if (v2.failed()) {
logger.error("Failed to unregister subscribe", v3.cause());
}
msg2.reply(EMPTY);
});
});
});
});
cancel.completionHandler(h -> {
if (h.succeeded()) {
dispose.completionHandler(h2 -> {
if (h2.succeeded()) {
sink.success();
} else {
sink.error(h.cause());
}
});
} else {
sink.error(h.cause());
}
});
msg.reply((Long) subscriptionId);
});
subscribe.completionHandler(h -> {
if (h.failed()) {
sink.error(h.cause());
}
});
});
}
public static <T> Flux<T> connect(EventBus eventBus,
String fluxAddress,
DeliveryOptions baseDeliveryOptions,
MessageCodec<T, T> itemsCodec,
Duration connectionTimeout) {
return Flux.<T>create(emitter -> {
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
.setSendTimeout(connectionTimeout.toMillis());
eventBus.<Long>request(fluxAddress + ".subscribe", EMPTY, deliveryOptions, msg -> {
if (msg.succeeded()) {
long subscriptionId = msg.result().body();
var subscriptionAddress = fluxAddress + "." + subscriptionId;
var signalConsumer = eventBus.<SignalMessage<T>>consumer(subscriptionAddress + ".signal");
signalConsumer.handler(msg2 -> {
var signal = msg2.body();
switch (signal.getSignalType()) {
case ITEM:
emitter.next(signal.getItem());
break;
case ERROR:
emitter.error(new Exception(signal.getErrorMessage()));
break;
case COMPLETE:
emitter.complete();
break;
}
msg2.reply(EMPTY);
});
signalConsumer.completionHandler(h -> {
if (h.failed()) {
emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause()));
}
});
emitter.onDispose(() -> eventBus.send(subscriptionAddress + ".dispose", EMPTY, deliveryOptions));
emitter.onCancel(() -> eventBus.send(subscriptionAddress + ".cancel", EMPTY, deliveryOptions));
} else {
emitter.error(new IllegalStateException("Subscription failed", msg.cause()));
}
});
});
}
}

View File

@ -0,0 +1,82 @@
package it.tdlight.tdlibsession;
import java.util.Objects;
import java.util.StringJoiner;
class SignalMessage<T> {
private final SignalType signalType;
private final T item;
private final String errorMessage;
private SignalMessage(SignalType signalType, T item, String errorMessage) {
this.signalType = signalType;
this.item = item;
this.errorMessage = errorMessage;
}
public static <T> SignalMessage<T> onNext(T item) {
return new SignalMessage<>(SignalType.ITEM, Objects.requireNonNull(item), null);
}
public static <T> SignalMessage<T> onError(Throwable throwable) {
return new SignalMessage<T>(SignalType.ERROR, null, Objects.requireNonNull(throwable.getMessage()));
}
static <T> SignalMessage<T> onDecodedError(String throwable) {
return new SignalMessage<T>(SignalType.ERROR, null, Objects.requireNonNull(throwable));
}
public static <T> SignalMessage<T> onComplete() {
return new SignalMessage<T>(SignalType.COMPLETE, null, null);
}
public SignalType getSignalType() {
return signalType;
}
public String getErrorMessage() {
return Objects.requireNonNull(errorMessage);
}
public T getItem() {
return Objects.requireNonNull(item);
}
@Override
public String toString() {
return new StringJoiner(", ", SignalMessage.class.getSimpleName() + "[", "]")
.add("signalType=" + signalType)
.add("item=" + item)
.add("errorMessage='" + errorMessage + "'")
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SignalMessage<?> that = (SignalMessage<?>) o;
if (signalType != that.signalType) {
return false;
}
if (!Objects.equals(item, that.item)) {
return false;
}
return Objects.equals(errorMessage, that.errorMessage);
}
@Override
public int hashCode() {
int result = signalType != null ? signalType.hashCode() : 0;
result = 31 * result + (item != null ? item.hashCode() : 0);
result = 31 * result + (errorMessage != null ? errorMessage.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,99 @@
package it.tdlight.tdlibsession;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.unimi.dsi.fastutil.io.FastByteArrayInputStream;
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class SignalMessageCodec<T> implements MessageCodec<SignalMessage<T>, SignalMessage<T>> {
private final String codecName;
private final MessageCodec<T, T> typeCodec;
public SignalMessageCodec(MessageCodec<T, T> typeCodec) {
super();
this.codecName = "SignalCodec-" + typeCodec.name();
this.typeCodec = typeCodec;
}
@Override
public void encodeToWire(Buffer buffer, SignalMessage<T> t) {
try (var bos = new FastByteArrayOutputStream()) {
try (var dos = new DataOutputStream(bos)) {
switch (t.getSignalType()) {
case ITEM:
dos.writeByte(0x01);
break;
case ERROR:
dos.writeByte(0x02);
break;
case COMPLETE:
dos.writeByte(0x03);
break;
default:
throw new UnsupportedOperationException();
}
}
bos.trim();
buffer.appendBytes(bos.array);
switch (t.getSignalType()) {
case ITEM:
typeCodec.encodeToWire(buffer, t.getItem());
break;
case ERROR:
var stringBytes = t.getErrorMessage().getBytes(StandardCharsets.UTF_8);
buffer.appendInt(stringBytes.length);
buffer.appendBytes(stringBytes);
break;
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
@Override
public SignalMessage<T> decodeFromWire(int pos, Buffer buffer) {
try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) {
try (var dis = new DataInputStream(fis)) {
switch (dis.readByte()) {
case 0x01:
return SignalMessage.onNext(typeCodec.decodeFromWire(pos + 1, buffer));
case 0x02:
var size = buffer.getInt(pos + 1);
return SignalMessage.onDecodedError(new String(buffer.getBytes(pos + 2, pos + 2 + size),
StandardCharsets.UTF_8
));
case 0x03:
return SignalMessage.onComplete();
default:
throw new UnsupportedOperationException();
}
}
} catch (IOException ex) {
ex.printStackTrace();
}
return null;
}
@Override
public SignalMessage<T> transform(SignalMessage<T> t) {
// If a message is sent *locally* across the event bus.
// This sends message just as is
return t;
}
@Override
public String name() {
return codecName;
}
@Override
public byte systemCodecID() {
// Always -1
return -1;
}
}

View File

@ -0,0 +1,5 @@
package it.tdlight.tdlibsession;
enum SignalType {
COMPLETE, ERROR, ITEM
}

View File

@ -7,7 +7,6 @@ import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JksOptions;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.Lock;
import it.tdlight.common.Init;
import it.tdlight.common.utils.CantLoadLibrary;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
@ -19,13 +18,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class TDLibRemoteClient implements AutoCloseable {
@ -37,6 +37,7 @@ public class TDLibRemoteClient implements AutoCloseable {
private final int port;
private final Set<String> membersAddresses;
private final Many<TdClusterManager> clusterManager = Sinks.many().replay().latest();
private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false);
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses) {
this.securityInfo = securityInfo;
@ -120,12 +121,11 @@ public class TDLibRemoteClient implements AutoCloseable {
.doOnError(clusterManager::tryEmitError)
.flatMapMany(clusterManager -> {
return Flux.create(sink -> {
var sharedData = clusterManager.getSharedData();
sharedData.getClusterWideMap("deployableBotAddresses", mapResult -> {
clusterManager.getSharedData().getClusterWideMap("deployableBotAddresses", mapResult -> {
if (mapResult.succeeded()) {
var deployableBotAddresses = mapResult.result();
sharedData.getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
putAllAsync(deployableBotAddresses, botAddresses.values(), (AsyncResult<Void> putAllResult) -> {
@ -229,41 +229,41 @@ public class TDLibRemoteClient implements AutoCloseable {
private void deployBot(TdClusterManager clusterManager, String botAddress, Handler<AsyncResult<String>> deploymentHandler) {
AsyncTdMiddleEventBusServer verticle = new AsyncTdMiddleEventBusServer(clusterManager);
AtomicReference<Lock> deploymentLock = new AtomicReference<>();
verticle.onBeforeStop(handler -> {
clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
deploymentLock.set(lockAcquisitionResult.result());
var sharedData = clusterManager.getSharedData();
sharedData.getClusterWideMap("runningBotAddresses", (AsyncResult<AsyncMap<String, String>> mapResult) -> {
if (mapResult.succeeded()) {
var runningBotAddresses = mapResult.result();
runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() != null) {
handler.complete();
} else {
handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed");
}
} else {
handler.fail(putResult.cause());
}
vertxStatusScheduler.schedule(() -> {
clusterManager.getSharedData().getLockWithTimeout("deployment", 15000, lockAcquisitionResult -> {
if (lockAcquisitionResult.succeeded()) {
var deploymentLock = lockAcquisitionResult.result();
verticle.onAfterStop(handler2 -> {
vertxStatusScheduler.schedule(() -> {
deploymentLock.release();
handler2.complete();
});
} else {
handler.fail(mapResult.cause());
}
});
} else {
handler.fail(lockAcquisitionResult.cause());
}
});
clusterManager.getSharedData().getClusterWideMap("runningBotAddresses", (AsyncResult<AsyncMap<String, String>> mapResult) -> {
if (mapResult.succeeded()) {
var runningBotAddresses = mapResult.result();
runningBotAddresses.removeIfPresent(botAddress, netInterface, putResult -> {
if (putResult.succeeded()) {
if (putResult.result() != null) {
handler.complete();
} else {
handler.fail("Can't destroy bot with address \"" + botAddress + "\" because it has been already destroyed");
}
} else {
handler.fail(putResult.cause());
}
});
} else {
handler.fail(mapResult.cause());
}
});
} else {
handler.fail(lockAcquisitionResult.cause());
}
});
});
});
verticle.onAfterStop(handler -> {
if (deploymentLock.get() != null) {
deploymentLock.get().release();
}
handler.complete();
});
clusterManager
.getVertx()
.deployVerticle(verticle,
@ -307,5 +307,6 @@ public class TDLibRemoteClient implements AutoCloseable {
@Override
public void close() {
clusterManager.asFlux().blockFirst();
vertxStatusScheduler.dispose();
}
}

View File

@ -51,12 +51,14 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AsyncTdEasy {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class);
private final Scheduler scheduler = Schedulers.newSingle("TdEasyUpdates");
private final ReplayProcessor<AuthorizationState> authState = ReplayProcessor.create(1);
private final ReplayProcessor<Boolean> requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false);
private final ReplayProcessor<TdEasySettings> settings = ReplayProcessor.cacheLast();
@ -70,13 +72,9 @@ public class AsyncTdEasy {
this.td = td;
this.logName = logName;
var sch = Schedulers.newSingle("TdEasyUpdates");
// todo: use Duration.ZERO instead of 10ms interval
this.incomingUpdatesCo = td.receive()
.filterWhen(update -> Mono.from(requestedDefinitiveExit).map(requestedDefinitiveExit -> !requestedDefinitiveExit))
.subscribeOn(sch)
.publishOn(sch)
.flatMap(this::preprocessUpdates)
.flatMap(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update)))
.filter(upd -> upd.getState().getConstructor() == AuthorizationStateReady.CONSTRUCTOR)
@ -88,6 +86,7 @@ public class AsyncTdEasy {
}).doOnComplete(() -> {
authState.onNext(new AuthorizationStateClosed());
})
.subscribeOn(scheduler)
.publish().refCount(1);
}
@ -104,11 +103,12 @@ public class AsyncTdEasy {
}
// Register fatal error handler
fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribe();
fatalError.asMono().flatMap(settings.getFatalErrorHandler()::onFatalError).subscribeOn(scheduler).subscribe();
return true;
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(scheduler)
.flatMap(_v -> {
this.settings.onNext(settings);
return Mono.empty();
@ -119,7 +119,7 @@ public class AsyncTdEasy {
* Get TDLib state
*/
public Flux<AuthorizationState> getState() {
return Flux.from(authState);
return Flux.from(authState).subscribeOn(scheduler);
}
/**
@ -130,21 +130,21 @@ public class AsyncTdEasy {
}
private Flux<TdApi.Update> getIncomingUpdates(boolean includePreAuthUpdates) {
return Flux.from(incomingUpdatesCo);
return Flux.from(incomingUpdatesCo).subscribeOn(scheduler);
}
/**
* Get generic error updates from TDLib (When they are not linked to a precise request).
*/
public Flux<TdApi.Error> getIncomingErrors() {
return Flux.from(globalErrors);
return Flux.from(globalErrors).subscribeOn(scheduler);
}
/**
* Receives fatal errors from TDLib.
*/
public Mono<FatalErrorType> getFatalErrors() {
return fatalError.asMono();
return Mono.from(fatalError.asMono()).subscribeOn(scheduler);
}
/**
@ -156,7 +156,7 @@ public class AsyncTdEasy {
}
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(TdApi.Function obj, boolean synchronous) {
return td.execute(obj, synchronous);
return td.<T>execute(obj, synchronous).subscribeOn(scheduler);
}
/**
@ -164,7 +164,7 @@ public class AsyncTdEasy {
* @param i level
*/
public Mono<Void> setVerbosityLevel(int i) {
return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true));
return thenOrFatalError(sendDirectly(new TdApi.SetLogVerbosityLevel(i), true)).subscribeOn(scheduler);
}
/**
@ -172,7 +172,7 @@ public class AsyncTdEasy {
* @param name option name
*/
public Mono<Void> clearOption(String name) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false));
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueEmpty()), false)).subscribeOn(scheduler);
}
/**
@ -181,7 +181,7 @@ public class AsyncTdEasy {
* @param value option value
*/
public Mono<Void> setOptionString(String name, String value) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false));
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueString(value)), false)).subscribeOn(scheduler);
}
/**
@ -190,7 +190,7 @@ public class AsyncTdEasy {
* @param value option value
*/
public Mono<Void> setOptionInteger(String name, long value) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false));
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueInteger(value)), false)).subscribeOn(scheduler);
}
/**
@ -199,7 +199,7 @@ public class AsyncTdEasy {
* @param value option value
*/
public Mono<Void> setOptionBoolean(String name, boolean value) {
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false));
return thenOrFatalError(sendDirectly(new TdApi.SetOption(name, new TdApi.OptionValueBoolean(value)), false)).subscribeOn(scheduler);
}
/**
@ -218,7 +218,7 @@ public class AsyncTdEasy {
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
+ value.getClass().getSimpleName()));
}
});
}).subscribeOn(scheduler);
}
/**
@ -237,7 +237,7 @@ public class AsyncTdEasy {
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
+ value.getClass().getSimpleName()));
}
});
}).subscribeOn(scheduler);
}
/**
@ -256,7 +256,7 @@ public class AsyncTdEasy {
return Mono.error(new UnsupportedOperationException("The option " + name + " is of type "
+ value.getClass().getSimpleName()));
}
});
}).subscribeOn(scheduler);
}
/**
@ -267,7 +267,7 @@ public class AsyncTdEasy {
* @return The request response.
*/
public <T extends Object> Mono<TdResult<T>> execute(TdApi.Function request) {
return td.execute(request, true);
return td.<T>execute(request, true).subscribeOn(scheduler);
}
/**
@ -305,7 +305,8 @@ public class AsyncTdEasy {
.doOnNext(ok -> {
logger.info("Received AuthorizationStateClosed after TdApi.Close");
})
.then();
.then()
.subscribeOn(scheduler);
}
/**
@ -325,25 +326,27 @@ public class AsyncTdEasy {
}
private Mono<Update> catchErrors(Object obj) {
if (obj.getConstructor() == Error.CONSTRUCTOR) {
var error = (Error) obj;
return Mono.<Update>fromCallable(() -> {
if (obj.getConstructor() == Error.CONSTRUCTOR) {
var error = (Error) obj;
switch (error.message) {
case "PHONE_CODE_INVALID":
globalErrors.onNext(error);
return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitCode()));
case "PASSWORD_HASH_INVALID":
globalErrors.onNext(error);
return Mono.just(new UpdateAuthorizationState(new AuthorizationStateWaitPassword()));
default:
globalErrors.onNext(error);
break;
switch (error.message) {
case "PHONE_CODE_INVALID":
globalErrors.onNext(error);
return new UpdateAuthorizationState(new AuthorizationStateWaitCode());
case "PASSWORD_HASH_INVALID":
globalErrors.onNext(error);
return new UpdateAuthorizationState(new AuthorizationStateWaitPassword());
default:
globalErrors.onNext(error);
break;
}
analyzeFatalErrors(error);
return null;
} else {
return (Update) obj;
}
analyzeFatalErrors(error);
return Mono.empty();
} else {
return Mono.just((Update) obj);
}
}).subscribeOn(scheduler);
}
private void analyzeFatalErrors(Object obj) {
@ -370,7 +373,7 @@ public class AsyncTdEasy {
}
public Mono<Boolean> isBot() {
return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet);
return Mono.from(settings).single().map(TdEasySettings::isBotTokenSet).subscribeOn(scheduler);
}
private Publisher<TdApi.Update> preprocessUpdates(TdApi.Object updateObj) {
@ -535,7 +538,8 @@ public class AsyncTdEasy {
return Mono.empty();
}
})
.then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj));
.then(Mono.justOrEmpty(updateObj.getConstructor() == Error.CONSTRUCTOR ? null : (Update) updateObj))
.subscribeOn(scheduler);
}
public <T extends TdApi.Object> Mono<Void> thenOrFatalError(Mono<TdResult<T>> optionalMono) {

View File

@ -40,13 +40,11 @@ public class TdClusterManager {
private final ClusterManager mgr;
private final VertxOptions vertxOptions;
private final Vertx vertx;
private final EventBus eb;
public TdClusterManager(ClusterManager mgr, VertxOptions vertxOptions, Vertx vertx, EventBus eventBus) {
public TdClusterManager(ClusterManager mgr, VertxOptions vertxOptions, Vertx vertx) {
this.mgr = mgr;
this.vertxOptions = vertxOptions;
this.vertx = vertx;
this.eb = eventBus;
}
public static Mono<TdClusterManager> ofMaster(JksOptions keyStoreOptions, JksOptions trustStoreOptions, boolean onlyLocal, String masterHostname, String netInterface, int port, Set<String> nodesAddresses) {
@ -150,7 +148,7 @@ public class TdClusterManager {
sink.success(Vertx.vertx(vertxOptions));
}
})
.map(vertx -> new TdClusterManager(mgr, vertxOptions, vertx, vertx.eventBus()));
.map(vertx -> new TdClusterManager(mgr, vertxOptions, vertx));
}
public Vertx getVertx() {
@ -158,7 +156,7 @@ public class TdClusterManager {
}
public EventBus getEventBus() {
return eb;
return vertx.eventBus();
}
public VertxOptions getVertxOptions() {
@ -178,7 +176,7 @@ public class TdClusterManager {
*/
public <T> boolean registerDefaultCodec(Class<T> objectClass, MessageCodec<T, T> messageCodec) {
try {
eb.registerDefaultCodec(objectClass, messageCodec);
vertx.eventBus().registerDefaultCodec(objectClass, messageCodec);
return true;
} catch (IllegalStateException ex) {
if (ex.getMessage().startsWith("Already a default codec registered for class")) {
@ -204,9 +202,9 @@ public class TdClusterManager {
*/
public <T> MessageConsumer<T> consumer(String address, boolean localOnly) {
if (localOnly) {
return eb.localConsumer(address);
return vertx.eventBus().localConsumer(address);
} else {
return eb.consumer(address);
return vertx.eventBus().consumer(address);
}
}
@ -221,9 +219,9 @@ public class TdClusterManager {
*/
public <T> MessageConsumer<T> consumer(String address, boolean localOnly, Handler<Message<T>> handler) {
if (localOnly) {
return eb.localConsumer(address, handler);
return vertx.eventBus().localConsumer(address, handler);
} else {
return eb.consumer(address, handler);
return vertx.eventBus().consumer(address, handler);
}
}

View File

@ -6,19 +6,13 @@ import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
public class TdOptionalList {
private final boolean isSet;
public class TdResultList {
private final List<TdResult<TdApi.Object>> values;
public TdOptionalList(boolean isSet, List<TdResult<TdApi.Object>> values) {
this.isSet = isSet;
public TdResultList(List<TdResult<TdApi.Object>> values) {
this.values = values;
}
public boolean isSet() {
return isSet;
}
public List<TdResult<TdApi.Object>> getValues() {
return values;
}
@ -32,26 +26,18 @@ public class TdOptionalList {
return false;
}
TdOptionalList that = (TdOptionalList) o;
TdResultList that = (TdResultList) o;
if (isSet != that.isSet) {
return false;
}
return Objects.equals(values, that.values);
}
@Override
public int hashCode() {
int result = (isSet ? 1 : 0);
result = 31 * result + (values != null ? values.hashCode() : 0);
return result;
return values != null ? values.hashCode() : 0;
}
@Override
public String toString() {
return new StringJoiner(", ", TdOptionalList.class.getSimpleName() + "[", "]")
.add("isSet=" + isSet)
.add("values=" + values)
.toString();
return new StringJoiner(", ", TdResultList.class.getSimpleName() + "[", "]").add("values=" + values).toString();
}
}

View File

@ -13,30 +13,26 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
public class TdOptListMessageCodec implements MessageCodec<TdOptionalList, TdOptionalList> {
public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdResultList> {
public TdOptListMessageCodec() {
public TdResultListMessageCodec() {
super();
}
@Override
public void encodeToWire(Buffer buffer, TdOptionalList ts) {
public void encodeToWire(Buffer buffer, TdResultList ts) {
try (var bos = new FastByteArrayOutputStream()) {
try (var dos = new DataOutputStream(bos)) {
if (ts.isSet()) {
var t = ts.getValues();
dos.writeInt(t.size());
for (TdResult<TdApi.Object> t1 : t) {
if (t1.succeeded()) {
dos.writeBoolean(true);
t1.result().serialize(dos);
} else {
dos.writeBoolean(false);
t1.cause().serialize(dos);
}
var t = ts.getValues();
dos.writeInt(t.size());
for (TdResult<TdApi.Object> t1 : t) {
if (t1.succeeded()) {
dos.writeBoolean(true);
t1.result().serialize(dos);
} else {
dos.writeBoolean(false);
t1.cause().serialize(dos);
}
} else {
dos.writeInt(-1);
}
}
bos.trim();
@ -47,32 +43,28 @@ public class TdOptListMessageCodec implements MessageCodec<TdOptionalList, TdOpt
}
@Override
public TdOptionalList decodeFromWire(int pos, Buffer buffer) {
public TdResultList decodeFromWire(int pos, Buffer buffer) {
try (var fis = new FastByteArrayInputStream(buffer.getBytes(pos, buffer.length()))) {
try (var dis = new DataInputStream(fis)) {
var size = dis.readInt();
if (size < 0) {
return new TdOptionalList(false, Collections.emptyList());
} else {
ArrayList<TdResult<TdApi.Object>> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
if (dis.readBoolean()) {
list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis)));
} else {
list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis)));
}
ArrayList<TdResult<TdApi.Object>> list = new ArrayList<>();
for (int i = 0; i < size; i++) {
if (dis.readBoolean()) {
list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis)));
} else {
list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis)));
}
return new TdOptionalList(true, list);
}
return new TdResultList(list);
}
} catch (IOException | UnsupportedOperationException ex) {
ex.printStackTrace();
return new TdOptionalList(false, Collections.emptyList());
return new TdResultList(Collections.emptyList());
}
}
@Override
public TdOptionalList transform(TdOptionalList ts) {
public TdResultList transform(TdResultList ts) {
return ts;
}

View File

@ -15,6 +15,7 @@ import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.EventBusFlux;
import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
@ -23,22 +24,21 @@ import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdOptListMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdOptionalList;
import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec;
import it.tdlight.utils.MonoUtils;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.error.InitializationException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle {
@ -47,12 +47,10 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
public static final boolean OUTPUT_REQUESTS = false;
public static final byte[] EMPTY = new byte[0];
private final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
private final Many<Boolean> tdClosed = Sinks.many().replay().latestOrDefault(false);
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
private ReplayProcessor<Flux<TdApi.Object>> incomingUpdatesCo = ReplayProcessor.cacheLast();
private TdClusterManager cluster;
private String botAddress;
@ -64,7 +62,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) {
cluster = clusterManager;
if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) {
if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) {
cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec());
cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec());
for (Class<?> value : ConstructorDetector.getTDConstructorsUnsafe().values()) {
@ -146,7 +144,9 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.getEventBus()
.request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> {
if (msg.succeeded()) {
this.listen().then(this.pipe()).timeout(Duration.ofSeconds(30)).subscribe(v -> {}, future::fail, future::complete);
this.listen()
.timeout(Duration.ofSeconds(30))
.subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(msg.cause());
}
@ -173,7 +173,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
@Override
public void stop(Promise<Void> stopPromise) {
readyToStartConsumer.unregister(result -> {
tdClosed.onNext(true);
tdClosed.tryEmitNext(true);
stopPromise.complete();
});
}
@ -183,55 +183,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
return Mono.empty();
}
private Mono<Void> pipe() {
var updates = this.requestUpdatesBatchFromNetwork()
.repeatWhen(nFlux -> nFlux.takeWhile(n -> n > 0)) // Repeat when there is one batch with a flux of updates
.takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed
.flatMap(batch -> batch)
.onErrorResume(error -> {
logger.error("Bot updates request failed! Marking as closed.", error);
if (error.getMessage().contains("Timed out")) {
return Flux.just(new Error(444, "CONNECTION_KILLED"));
} else {
return Flux.just(new Error(406, "INVALID_UPDATE"));
}
})
.flatMap(update -> {
return Mono.<TdApi.Object>create(sink -> {
if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var state = (UpdateAuthorizationState) update;
if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
// Send tdClosed early to avoid errors
tdClosed.onNext(true);
this.getVertx().undeploy(this.deploymentID(), undeployed -> {
if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause());
}
sink.success(update);
});
} else {
sink.success(update);
}
} else {
sink.success(update);
}
});
})
.log("TdMiddle", Level.FINEST)
.takeUntilOther(tdClosed.distinct().filter(tdClosed -> tdClosed)) // Stop when closed
.publish()
.autoConnect(1);
updates.subscribe(t -> incomingUpdatesCo.onNext(Flux.just(t)),
incomingUpdatesCo::onError,
incomingUpdatesCo::onComplete
);
return Mono.empty();
}
private static class UpdatesBatchResult {
public final Flux<TdApi.Object> updatesFlux;
public final boolean completed;
@ -250,63 +201,47 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
}
private Mono<Flux<TdApi.Object>> requestUpdatesBatchFromNetwork() {
return Mono
.from(tdClosed.distinct())
.single()
.filter(tdClosed -> !tdClosed)
.flatMap(_x -> Mono.<Flux<TdApi.Object>>create(sink -> {
cluster.getEventBus().<TdOptionalList>request(botAddress + ".getNextUpdatesBlock",
EMPTY,
deliveryOptionsWithTimeout,
msg -> {
if (msg.failed()) {
//if (System.currentTimeMillis() - initTime <= 30000) {
// // The serve has not been started
// sink.success(Flux.empty());
//} else {
// // Timeout
sink.error(msg.cause());
//}
} else {
var result = msg.result();
if (result.body() == null) {
sink.success();
} else {
var resultBody = msg.result().body();
if (resultBody.isSet()) {
List<TdResult<TdApi.Object>> updates = resultBody.getValues();
for (TdResult<TdApi.Object> updateObj : updates) {
if (updateObj.succeeded()) {
if (OUTPUT_REQUESTS) {
System.out.println(" <- " + updateObj.result()
.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
} else {
logger.error("Received an errored update",
ResponseError.newResponseError("incoming update", botAlias, updateObj.cause())
);
}
}
sink.success(Flux.fromIterable(updates).filter(TdResult::succeeded).map(TdResult::result));
} else {
// the stream has ended
sink.success();
}
}
}
}
);
}));
}
@Override
public Flux<TdApi.Object> receive() {
return incomingUpdatesCo.filter(Objects::nonNull).flatMap(v -> v);
var fluxCodec = new TdResultListMessageCodec();
EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec);
return Mono.from(tdClosed.asFlux()).single().filter(tdClosed -> !tdClosed).flatMapMany(_closed -> EventBusFlux
.<TdResultList>connect(cluster.getEventBus(),
botAddress + ".updates",
deliveryOptions,
fluxCodec,
Duration.ofMillis(deliveryOptionsWithTimeout.getSendTimeout())
)
.filter(Objects::nonNull)
.flatMap(block -> Flux.fromIterable(block.getValues()))
.filter(Objects::nonNull)
.onErrorResume(error -> {
logger.error("Bot updates request failed! Marking as closed.", error);
if (error.getMessage().contains("Timed out")) {
return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED")));
} else {
return Flux.just(TdResult.failed(new Error(406, "INVALID_UPDATE")));
}
}).flatMap(item -> Mono.fromCallable(item::orElseThrow))
.filter(Objects::nonNull)
.doOnNext(item -> {
if (OUTPUT_REQUESTS) {
System.out.println(" <- " + item.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "=")
);
}
if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var state = (UpdateAuthorizationState) item;
if (state.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
// Send tdClosed early to avoid errors
tdClosed.tryEmitNext(true);
}
}
}));
}
@Override
@ -321,7 +256,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.replace(" = ", "="));
}
return Mono.from(tdClosed.distinct()).single().filter(tdClosed -> !tdClosed).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
return Mono.from(tdClosed.asFlux()).single().filter(tdClosed -> !tdClosed).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
try {
cluster
.getEventBus()

View File

@ -11,6 +11,7 @@ import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import it.tdlight.common.ConstructorDetector;
import it.tdlight.jni.TdApi;
import it.tdlight.tdlibsession.EventBusFlux;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
@ -19,8 +20,8 @@ import it.tdlight.tdlibsession.td.middle.ExecuteObject;
import it.tdlight.tdlibsession.td.middle.TdClusterManager;
import it.tdlight.tdlibsession.td.middle.TdExecuteObjectMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdOptListMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdOptionalList;
import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec;
import it.tdlight.utils.MonoUtils;
import java.time.Duration;
@ -32,11 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Empty;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
@ -59,6 +56,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
protected AsyncTdDirectImpl td;
private final Scheduler tdSrvPoll;
private final Scheduler vertxStatusScheduler = Schedulers.newSingle("VertxStatus", false);
/**
* Value is not important, emits when a request is received
*/
@ -66,7 +64,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private final List<Consumer<Promise<Void>>> onAfterStopListeners = new CopyOnWriteArrayList<>();
private MessageConsumer<?> startConsumer;
private MessageConsumer<byte[]> isWorkingConsumer;
private MessageConsumer<byte[]> getNextUpdatesBlockConsumer;
private MessageConsumer<ExecuteObject> executeConsumer;
@SuppressWarnings({"unchecked", "rawtypes"})
@ -74,7 +71,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
this.cluster = clusterManager;
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000);
this.tdSrvPoll = Schedulers.newSingle("TdSrvPoll");
if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) {
if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) {
cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec());
cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec());
for (Class<?> value : ConstructorDetector.getTDConstructorsUnsafe().values()) {
@ -85,55 +82,57 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) {
var botAddress = config().getString("botAddress");
if (botAddress == null || botAddress.isEmpty()) {
throw new IllegalArgumentException("botAddress is not set!");
}
this.botAddress = botAddress;
var botAlias = config().getString("botAlias");
if (botAlias == null || botAlias.isEmpty()) {
throw new IllegalArgumentException("botAlias is not set!");
}
this.botAlias = botAlias;
var local = config().getBoolean("local");
if (local == null) {
throw new IllegalArgumentException("local is not set!");
}
this.local = local;
this.td = new AsyncTdDirectImpl(botAlias);
vertxStatusScheduler.schedule(() -> {
var botAddress = config().getString("botAddress");
if (botAddress == null || botAddress.isEmpty()) {
throw new IllegalArgumentException("botAddress is not set!");
}
this.botAddress = botAddress;
var botAlias = config().getString("botAlias");
if (botAlias == null || botAlias.isEmpty()) {
throw new IllegalArgumentException("botAlias is not set!");
}
this.botAlias = botAlias;
var local = config().getBoolean("local");
if (local == null) {
throw new IllegalArgumentException("local is not set!");
}
this.local = local;
this.td = new AsyncTdDirectImpl(botAlias);
AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) {
this.listen().then(this.pipe()).then(Mono.<Void>create(registrationSink -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})).subscribeOn(this.tdSrvPoll)
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
AtomicBoolean alreadyDeployed = new AtomicBoolean(false);
this.startConsumer = cluster.getEventBus().consumer(botAddress + ".start", (Message<byte[]> msg) -> {
if (alreadyDeployed.compareAndSet(false, true)) {
this.listen().then(this.pipe()).then(Mono.<Void>create(registrationSink -> {
this.isWorkingConsumer = cluster.getEventBus().consumer(botAddress + ".isWorking", (Message<byte[]> workingMsg) -> {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})).subscribeOn(this.tdSrvPoll)
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
msg.fail(500, ex.getLocalizedMessage());
}, () -> {
logger.info(botAddress + " server deployed and started. succeeded: true");
msg.reply(EMPTY);
});
} else {
msg.reply(EMPTY);
});
} else {
msg.reply(EMPTY);
}
}
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().request(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000), msg -> {
startPromise.complete(h.result());
});
} else {
startPromise.fail(h.cause());
}
});
});
startConsumer.completionHandler(h -> {
logger.info(botAddress + " server deployed. succeeded: " + h.succeeded());
if (h.succeeded()) {
startPromise.complete(h.result());
} else {
startPromise.fail(h.cause());
}
});
logger.debug("Sending " + botAddress + ".readyToStart");
cluster.getEventBus().send(botAddress + ".readyToStart", EMPTY, cluster.newDeliveryOpts().setSendTimeout(30000));
}
public void onBeforeStop(Consumer<Promise<Void>> r) {
@ -146,23 +145,19 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
@Override
public void stop(Promise<Void> stopPromise) {
runAll(onBeforeStopListeners, onBeforeStopHandler -> {
if (onBeforeStopHandler.failed()) {
logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause());
}
Mono.create(sink -> this.isWorkingConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Can't unregister consumer", result.cause());
vertxStatusScheduler.schedule(() -> {
runAll(onBeforeStopListeners, onBeforeStopHandler -> {
if (onBeforeStopHandler.failed()) {
logger.error("A beforeStop listener failed: "+ onBeforeStopHandler.cause());
}
this.startConsumer.unregister(result2 -> {
if (result2.failed()) {
logger.error("Can't unregister consumer", result2.cause());
}
this.getNextUpdatesBlockConsumer.unregister(result3 -> {
if (result3.failed()) {
logger.error("Can't unregister consumer", result3.cause());
Mono.create(sink -> this.isWorkingConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Can't unregister consumer", result.cause());
}
this.startConsumer.unregister(result2 -> {
if (result2.failed()) {
logger.error("Can't unregister consumer", result2.cause());
}
this.executeConsumer.unregister(result4 -> {
@ -173,20 +168,19 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
sink.success();
});
});
});
})).doFinally(signalType -> {
logger.info("TdMiddle verticle \"" + botAddress + "\" stopped");
})).doFinally(signalType -> {
logger.info("TdMiddle verticle \"" + botAddress + "\" stopped");
runAll(onAfterStopListeners, onAfterStopHandler -> {
if (onAfterStopHandler.failed()) {
logger.error("An afterStop listener failed: " + onAfterStopHandler.cause());
}
stopPromise.complete();
});
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when stopping", ex);
}, () -> {});
runAll(onAfterStopListeners, onAfterStopHandler -> {
if (onAfterStopHandler.failed()) {
logger.error("An afterStop listener failed: " + onAfterStopHandler.cause());
}
stopPromise.complete();
});
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when stopping", ex);
}, () -> {});
});
});
}
@ -261,66 +255,34 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
private Mono<Void> pipe() {
return Mono.create(registeredSink -> {
Many<Boolean> getNextUpdatesBlockTrigger = Sinks.many().replay().latestOrDefault(true);
Flux<Message<byte[]>> getNextUpdatesBlockFlux = Flux.create(sink -> {
this.getNextUpdatesBlockConsumer = cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message<byte[]> msg) -> {
getNextUpdatesBlockTrigger.tryEmitNext(true);
sink.next(msg);
var updatesFlux = td.receive(tdOptions).doOnNext(update -> {
if (OUTPUT_REQUESTS) {
System.out.println("<=: " + update
.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
}).bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100))
.windowTimeout(1, Duration.ofSeconds(5))
.flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))
.map(TdResultList::new).doOnTerminate(() -> {
System.out.println("<=: end (1)");
}).doOnComplete(() -> {
System.out.println("<=: end (2)");
}).doFinally(s -> {
System.out.println("<=: end (3)");
this.undeploy(() -> {});
});
registeredSink.success();
});
Empty<Boolean> needClose = Sinks.empty();
Flux<TdOptionalList> updatesFlux = Flux.mergeSequential(td.receive(tdOptions)
.doOnSubscribe(s -> {
// After 60 seconds of not receiving request for updates, dispose td flux assuming the middle client died.
getNextUpdatesBlockTrigger.asFlux().timeout(Duration.ofSeconds(30), timeout -> {
needClose.tryEmitEmpty();
s.cancel();
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
logger.error("Error when signalling that the next update block request has been received", ex);
}, () -> {
needClose.tryEmitEmpty();
});
})
.doOnNext(update -> {
if (OUTPUT_REQUESTS) {
System.out.println("<=: " + update
.toString()
.replace("\n", " ")
.replace("\t", "")
.replace(" ", "")
.replace(" = ", "="));
}
})
.bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100))
.windowTimeout(1, Duration.ofSeconds(5))
.flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))
.map(updatesGroup -> new TdOptionalList(true, updatesGroup)),
Mono.fromSupplier(() -> {
return new TdOptionalList(false, Collections.emptyList());
})
);
Flux.zip(updatesFlux, getNextUpdatesBlockFlux)
.subscribeOn(this.tdSrvPoll)
.subscribe(tuple -> {
var results = tuple.getT1();
var messageHandle = tuple.getT2();
if (!results.isSet()) {
System.out.println("<=: end (1)");
}
messageHandle.reply(results, cluster.newDeliveryOpts().setLocalOnly(local));
}, error -> logger.error("Error when receiving or forwarding updates", error), () -> {
needClose.tryEmitEmpty();
});
needClose.asMono().subscribeOn(this.tdSrvPoll).subscribe(v_ -> {}, e -> {}, () -> {
this.undeploy(() -> {});
});
});
var fluxCodec = new TdResultListMessageCodec();
EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec);
return EventBusFlux.<TdResultList>serve(updatesFlux,
cluster.getEventBus(),
botAddress + ".updates",
cluster.newDeliveryOpts().setLocalOnly(local),
fluxCodec,
Duration.ofSeconds(30)
).subscribeOn(tdSrvPoll);
}
}