Update reactor, fix scheduling

This commit is contained in:
Andrea Cavalli 2021-09-09 12:10:50 +02:00
parent 497c4f75e1
commit 84bf1eee85
5 changed files with 8 additions and 13 deletions

View File

@ -112,12 +112,12 @@
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> <artifactId>reactor-core</artifactId>
<version>3.4.8</version> <version>3.4.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId> <artifactId>reactor-tools</artifactId>
<version>3.4.8</version> <version>3.4.9</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.akaita.java</groupId> <groupId>com.akaita.java</groupId>

View File

@ -120,6 +120,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
// Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false. // Try to emit false, so that if it has not been closed from tdlib, now it is explicitly false.
closedFromTd.tryEmitValue(false); closedFromTd.tryEmitValue(false);
}) })
.subscribeOn(Schedulers.parallel()); .subscribeOn(Schedulers.boundedElastic());
} }
} }

View File

@ -97,7 +97,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
.doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName())) .doOnNext(s -> logger.trace("Received update from tdlib: {}", s.getClass().getSimpleName()))
.doOnError(ex -> logger.info("TdMiddle verticle error", ex)) .doOnError(ex -> logger.info("TdMiddle verticle error", ex))
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"))
.subscribeOn(Schedulers.parallel()); .subscribeOn(Schedulers.boundedElastic());
} }
@Override @Override

View File

@ -107,7 +107,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local); return onSuccessfulStartRequest(td, botAddress, botAlias, botId, local);
}) })
.flatMap(voidMono -> voidMono.hide().subscribeOn(Schedulers.boundedElastic()))
.doOnSuccess(s -> logger.trace("Stated verticle")) .doOnSuccess(s -> logger.trace("Stated verticle"))
); );
} }

View File

@ -38,8 +38,7 @@ public class BinlogUtils {
) )
// Open file // Open file
.map(x -> new BinlogAsyncFile(vertxFilesystem, path)) .map(x -> new BinlogAsyncFile(vertxFilesystem, path))
.single() .single();
.publishOn(Schedulers.boundedElastic());
} }
public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, Buffer data) { public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, Buffer data) {
@ -66,8 +65,7 @@ public class BinlogUtils {
.then(retrieveBinlog(vertxFilesystem, binlogPath)) .then(retrieveBinlog(vertxFilesystem, binlogPath))
) )
.single() .single()
.then() .then();
.publishOn(Schedulers.boundedElastic());
} }
public static Mono<Void> cleanSessionPath(FileSystem vertxFilesystem, public static Mono<Void> cleanSessionPath(FileSystem vertxFilesystem,
@ -87,8 +85,7 @@ public class BinlogUtils {
.flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono)) .flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono))
.onErrorResume(ex -> Mono.empty()) .onErrorResume(ex -> Mono.empty())
.then() .then()
) );
.publishOn(Schedulers.boundedElastic());
} }
public static String humanReadableByteCountBin(long bytes) { public static String humanReadableByteCountBin(long bytes) {
@ -126,7 +123,6 @@ public class BinlogUtils {
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis()); var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts); tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts);
}) })
.then() .then();
.publishOn(Schedulers.boundedElastic());
} }
} }