Fix out-of-order updates

This commit is contained in:
Andrea Cavalli 2021-01-25 03:36:15 +01:00
parent 620914b3cf
commit 5074d985b3
8 changed files with 31 additions and 30 deletions

View File

@ -216,8 +216,7 @@
<version>3.8.1</version>
<configuration>
<release>11</release>
<source>11</source>
<target>11</target>
<useIncrementalCompilation>false</useIncrementalCompilation>
</configuration>
</plugin>
<plugin>

View File

@ -95,7 +95,7 @@ public class EventBusFlux {
})
.then(Mono.empty())
)
.flatMap(item -> Mono.<Message<T>>create(itemSink -> {
.flatMapSequential(item -> Mono.<Message<T>>create(itemSink -> {
var responseHandler = MonoUtils.toHandler(itemSink);
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, responseHandler);
}))
@ -292,7 +292,7 @@ public class EventBusFlux {
}
});
var pingSubscription = Flux.interval(Duration.ofSeconds(10)).flatMap(n -> Mono.create(pingSink ->
var pingSubscription = Flux.interval(Duration.ofSeconds(10)).flatMapSequential(n -> Mono.create(pingSink ->
eventBus.<byte[]>request(subscriptionAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> {
if (pingMsg.succeeded()) {
pingSink.success(pingMsg.result().body());

View File

@ -165,11 +165,12 @@ public class TDLibRemoteClient implements AutoCloseable {
var mediaPath = getMediaDirectory(req.id());
var blPath = getSessionBinlogDirectory(req.id());
Schedulers.boundedElastic().schedule(() -> {
BinlogUtils
.chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
.then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath))
.then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
.subscribeOn(Schedulers.single())
.publishOn(Schedulers.single())
.subscribe(
v -> {},
ex -> {
@ -179,6 +180,7 @@ public class TDLibRemoteClient implements AutoCloseable {
() -> msg.reply(new byte[0])
);
});
});
return startBotConsumer.rxCompletionHandler().as(MonoUtils::toMono);
})
.then();

View File

@ -75,8 +75,8 @@ public class AsyncTdEasy {
// todo: use Duration.ZERO instead of 10ms interval
this.incomingUpdates = td.receive()
.flatMap(this::preprocessUpdates)
.flatMap(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update)))
.flatMapSequential(this::preprocessUpdates)
.flatMapSequential(update -> Mono.from(this.getState()).single().map(state -> new AsyncTdUpdateObj(state, update)))
.map(upd -> (TdApi.Update) upd.getUpdate())
.doOnError(ex -> {
if (ex instanceof TdError) {

View File

@ -191,8 +191,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.doOnSubscribe(s -> Schedulers.boundedElastic().schedule(() -> {
cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout);
}))
.flatMap(updates -> Mono.fromCallable((Callable<Object>) updates::body).publishOn(Schedulers.boundedElastic()))
.flatMap(updates -> {
.flatMapSequential(updates -> Mono.fromCallable((Callable<Object>) updates::body).publishOn(Schedulers.boundedElastic()))
.flatMapSequential(updates -> {
var result = (TdResultList) updates;
if (result.succeeded()) {
return Flux.fromIterable(result.value());
@ -200,7 +200,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return Mono.fromCallable(() -> TdResult.failed(result.error()).orElseThrow());
}
})
.flatMap(this::interceptUpdate)
.flatMapSequential(this::interceptUpdate)
.doOnError(crash::tryEmitError)
.doOnTerminate(updatesStreamEnd::tryEmitEmpty)
.publishOn(Schedulers.single());

View File

@ -141,7 +141,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
executeConsumer.handler(sink::next);
executeConsumer.endHandler(h -> sink.complete());
})
.flatMap(msg -> {
.flatMapSequential(msg -> {
logger.trace("Received execute request {}", msg.body());
var request = overrideRequest(msg.body().getRequest(), botId);
return td
@ -324,7 +324,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}
return false;
})
.flatMap(update -> Mono.fromCallable(() -> {
.flatMapSequential(update -> Mono.fromCallable(() -> {
if (update.getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = (Error) update;
throw new TdError(error.code, error.message);
@ -351,14 +351,14 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.sender(botAddress + ".updates", opts);
var pipeFlux = updatesFlux
.flatMap(updatesList -> updatesSender
.flatMapSequential(updatesList -> updatesSender
.rxWrite(updatesList)
.as(MonoUtils::toMono)
.thenReturn(updatesList)
)
.flatMap(updatesList -> Flux
.flatMapSequential(updatesList -> Flux
.fromIterable(updatesList.value())
.flatMap(item -> {
.flatMapSequential(item -> {
if (item instanceof Update) {
var tdUpdate = (Update) item;
if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {

View File

@ -117,7 +117,7 @@ public class BinlogUtils {
readBinlogConsumer.handler(sink::next);
readBinlogConsumer.endHandler(h -> sink.complete());
})
.flatMap(req -> BinlogUtils
.flatMapSequential(req -> BinlogUtils
.retrieveBinlog(vertx.fileSystem(), TDLibRemoteClient.getSessionBinlogDirectory(botId))
.flatMap(BinlogAsyncFile::readFullyBytes)
.single()

View File

@ -407,7 +407,7 @@ public class MonoUtils {
}
public Flux<T> readAsFlux() {
return sink.asFlux();
return sink.asFlux().publishOn(Schedulers.parallel());
}
public ReactiveReactorReadStream<T> readAsStream() {
@ -552,7 +552,7 @@ public class MonoUtils {
}
public Flux<T> readAsFlux() {
return flux;
return flux.publishOn(Schedulers.parallel());
}
public ReactiveReactorReadStream<T> readAsStream() {