Improve reliability

This commit is contained in:
Andrea Cavalli 2021-01-22 12:25:04 +01:00
parent fab2b6b56f
commit cc56dd4598
3 changed files with 101 additions and 76 deletions

View File

@ -40,6 +40,12 @@ public class EventBusFlux {
}
/**
* If the flux is fast and you are on a network, please do this:
*
* <pre>flux
.bufferTimeout(Duration.ofMillis(100))
.windowTimeout(1, Duration.ofSeconds(5))
.flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))</pre>
*
* @return tuple. T1 = flux served, T2 = error that caused cancelling of the subscription
*/
@ -72,7 +78,7 @@ public class EventBusFlux {
long subscriptionId = 0;
var subscriptionAddress = fluxAddress + "." + subscriptionId;
MessageConsumer<byte[]> subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady");
MessageConsumer<byte[]> subscriptionReady = eventBus.consumer(subscriptionAddress + ".subscriptionReady");
MessageConsumer<byte[]> dispose = eventBus.consumer(subscriptionAddress + ".dispose");
MessageConsumer<byte[]> ping = eventBus.consumer(subscriptionAddress + ".ping");
MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel");
@ -105,9 +111,17 @@ public class EventBusFlux {
logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error);
}
fatalErrorSink.tryEmitValue(error);
disposeFlux(atomicSubscription.get(), fatalErrorSink, ping, cancel, dispose, fluxAddress, () -> {
logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error");
});
disposeFlux(atomicSubscription.get(),
fatalErrorSink,
subscriptionReady,
ping,
cancel,
dispose,
fluxAddress,
() -> {
logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error");
}
);
}, () -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions, msg2 -> {
logger.info("Completed flux \"" + fluxAddress + "\"");
@ -134,6 +148,7 @@ public class EventBusFlux {
dispose.handler(msg2 -> {
disposeFlux(subscription,
fatalErrorSink,
subscriptionReady,
ping,
cancel,
dispose,
@ -191,13 +206,14 @@ public class EventBusFlux {
sink.success();
}
});
});
}).subscribeOn(Schedulers.single()).share();
return Tuples.of(servedMono, fatalErrorSink.asMono());
}
private static void disposeFlux(@Nullable Disposable subscription,
One<Throwable> fatalErrorSink,
MessageConsumer<byte[]> subscriptionReady,
MessageConsumer<byte[]> ping,
MessageConsumer<byte[]> cancel,
MessageConsumer<byte[]> dispose,
@ -208,20 +224,25 @@ public class EventBusFlux {
if (subscription != null) {
subscription.dispose();
}
ping.unregister(v0 -> {
subscriptionReady.unregister(v0 -> {
if (v0.failed()) {
logger.error("Failed to unregister ping", v0.cause());
logger.error("Failed to unregister subscriptionReady", v0.cause());
}
cancel.unregister(v1 -> {
ping.unregister(v1 -> {
if (v1.failed()) {
logger.error("Failed to unregister cancel", v1.cause());
logger.error("Failed to unregister ping", v1.cause());
}
dispose.unregister(v2 -> {
cancel.unregister(v2 -> {
if (v2.failed()) {
logger.error("Failed to unregister dispose", v2.cause());
logger.error("Failed to unregister cancel", v2.cause());
}
logger.debug("Disposed flux \"" + fluxAddress + "\"");
after.run();
dispose.unregister(v3 -> {
if (v3.failed()) {
logger.error("Failed to unregister dispose", v3.cause());
}
logger.debug("Disposed flux \"" + fluxAddress + "\"");
after.run();
});
});
});
});
@ -259,7 +280,7 @@ public class EventBusFlux {
});
signalConsumer.completionHandler(h -> {
if (h.succeeded()) {
eventBus.<Long>request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> {
eventBus.<Long>request(subscriptionAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to tell that the subscription is ready");
}

View File

@ -59,7 +59,7 @@ public class AsyncTdEasy {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdEasy.class);
private final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false);
private static final Scheduler scheduler = Schedulers.newSingle("AsyncTdEasy", false);
private final ReplayProcessor<AuthorizationState> authState = ReplayProcessor.create(1);
private final ReplayProcessor<Boolean> requestedDefinitiveExit = ReplayProcessor.cacheLastOrDefault(false);
private final ReplayProcessor<TdEasySettings> settings = ReplayProcessor.cacheLast();
@ -141,7 +141,6 @@ public class AsyncTdEasy {
return true;
})
.subscribeOn(Schedulers.boundedElastic())
.publishOn(scheduler)
.flatMap(_v -> {
this.settings.onNext(settings);
return Mono.empty();

View File

@ -7,7 +7,6 @@ import io.vertx.core.AsyncResult;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import it.tdlight.common.ConstructorDetector;
import it.tdlight.jni.TdApi;
@ -32,7 +31,6 @@ import java.net.ConnectException;
import java.time.Duration;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.warp.commonutils.error.InitializationException;
@ -41,6 +39,7 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle {
@ -52,6 +51,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
private final Many<Boolean> tdCloseRequested = Sinks.many().replay().latestOrDefault(false);
private final Many<Boolean> tdClosed = Sinks.many().replay().latestOrDefault(false);
private final One<Error> tdCrash = Sinks.one();
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
@ -61,7 +61,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
private String botAlias;
private boolean local;
private long initTime;
private MessageConsumer<byte[]> readyToStartConsumer;
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusClient(TdClusterManager clusterManager) {
@ -131,38 +130,39 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
logger.debug("Waiting for " + botAddress + ".readyToStart");
AtomicBoolean alreadyReceived = new AtomicBoolean(false);
this.readyToStartConsumer = cluster.getEventBus().consumer(botAddress + ".readyToStart", (Message<byte[]> pingMsg) -> {
// Reply instantly
pingMsg.reply(new byte[0]);
if (!alreadyReceived.getAndSet(true)) {
logger.debug("Received ping reply (succeeded)");
logger.debug("Requesting " + botAddress + ".start");
cluster
.getEventBus()
.request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> {
if (startMsg.succeeded()) {
logger.debug("Requesting " + botAddress + ".isWorking");
cluster
.getEventBus()
.request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> {
if (msg.succeeded()) {
this.listen()
.timeout(Duration.ofSeconds(30))
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(msg.cause());
}
});
} else {
future.fail(startMsg.cause());
}
});
} else {
// Already received
}
var readyToStartConsumer = cluster.getEventBus().<byte[]>consumer(botAddress + ".readyToStart");
readyToStartConsumer.handler((Message<byte[]> pingMsg) -> {
logger.debug("Received ping reply (succeeded)");
readyToStartConsumer.unregister(unregistered -> {
// Reply instantly
pingMsg.reply(new byte[0]);
if (unregistered.succeeded()) {
logger.debug("Requesting " + botAddress + ".start");
cluster
.getEventBus()
.request(botAddress + ".start", EMPTY, deliveryOptionsWithTimeout, startMsg -> {
if (startMsg.succeeded()) {
logger.debug("Requesting " + botAddress + ".isWorking");
cluster
.getEventBus()
.request(botAddress + ".isWorking", EMPTY, deliveryOptionsWithTimeout, msg -> {
if (msg.succeeded()) {
this.listen()
.timeout(Duration.ofSeconds(30))
.subscribeOn(Schedulers.single())
.subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(msg.cause());
}
});
} else {
future.fail(startMsg.cause());
}
});
} else {
logger.error("Failed to unregister readyToStartConsumer", unregistered.cause());
}
});
});
} catch (Exception ex) {
future.fail(ex);
@ -178,29 +178,26 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
@Override
public void stop(Promise<Void> stopPromise) {
logger.debug("Stopping AsyncTdMiddle client verticle...");
readyToStartConsumer.unregister(result -> {
if (result.failed()) {
logger.error("Failed to unregister readyToStart consumer");
tdCloseRequested.asFlux().take(1).single().flatMap(closeRequested -> {
if (!closeRequested) {
return tdCrash.asMono().switchIfEmpty(Mono
.fromRunnable(() -> logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping..."))
.then(this.execute(new TdApi.Close(), false)
).then()
.cast(TdApi.Error.class)
).doOnTerminate(() -> {
logger.debug("Close() sent to td");
markCloseRequested();
}).then();
} else {
logger.debug("Unregistered readyToStart consumer");
return Mono.empty();
}
tdCloseRequested.asFlux().take(1).single().flatMap(closeRequested -> {
if (!closeRequested) {
logger.warn("Verticle is being stopped before closing TDLib with Close()! Sending Close() before stopping...");
return this.execute(new TdApi.Close(), false).doOnTerminate(() -> {
logger.debug("Close() sent to td");
markCloseRequested();
}).then();
} else {
return Mono.empty();
}
}).thenMany(tdClosed.asFlux()).filter(closed -> closed).take(1).subscribe(v -> {}, cause -> {
logger.debug("Failed to stop AsyncTdMiddle client verticle");
stopPromise.fail(cause);
}, () -> {
logger.debug("Stopped AsyncTdMiddle client verticle");
stopPromise.complete();
});
}).thenMany(tdClosed.asFlux()).filter(closed -> closed).take(1).subscribe(v -> {}, cause -> {
logger.debug("Failed to stop AsyncTdMiddle client verticle");
stopPromise.fail(cause);
}, () -> {
logger.debug("Stopped AsyncTdMiddle client verticle");
stopPromise.complete();
});
}
@ -241,14 +238,17 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.flatMap(block -> Flux.fromIterable(block.getValues()))
.filter(Objects::nonNull)
.onErrorResume(error -> {
TdApi.Error theError;
if (error instanceof ConnectException) {
return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED")));
theError = new Error(444, "CONNECTION_KILLED");
} else if (error.getMessage().contains("Timed out")) {
return Flux.just(TdResult.failed(new Error(444, "CONNECTION_KILLED")));
theError = new Error(444, "CONNECTION_KILLED");
} else {
theError = new Error(406, "INVALID_UPDATE");
logger.error("Bot updates request failed! Marking as closed.", error);
return Flux.just(TdResult.failed(new Error(406, "INVALID_UPDATE")));
}
tdCrash.tryEmitValue(theError);
return Flux.just(TdResult.failed(theError));
}).flatMap(item -> Mono.fromCallable(item::orElseThrow))
.filter(Objects::nonNull)
.doOnNext(item -> {
@ -289,6 +289,9 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
private void markClosed() {
if (tdCrash.tryEmitEmpty().isFailure()) {
logger.debug("TDLib already crashed");
}
if (tdClosed.tryEmitNext(true).isFailure()) {
logger.error("Failed to set tdClosed");
if (tdClosed.tryEmitComplete().isFailure()) {
@ -309,7 +312,8 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
.replace(" = ", "="));
}
return tdCloseRequested.asFlux().take(1).single().filter(close -> !close).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
var crashMono = tdCrash.asMono().<TdResult<T>>map(TdResult::failed);
var executeMono = tdCloseRequested.asFlux().take(1).single().filter(close -> !close).<TdResult<T>>flatMap((_x) -> Mono.create(sink -> {
try {
cluster
.getEventBus()
@ -351,5 +355,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}).switchIfEmpty(Mono.fromSupplier(() -> {
return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"));
}));
return Mono.firstWithValue(crashMono, executeMono);
}
}