Fix scheduling

This commit is contained in:
Andrea Cavalli 2022-01-11 19:59:27 +01:00
parent 37d3355ca4
commit 3cd57bf61f
6 changed files with 26 additions and 42 deletions

25
pom.xml
View File

@ -275,29 +275,4 @@
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
<profiles>
<profile>
<id>standalone</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j18-impl</artifactId>
<version>2.17.1</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project> </project>

View File

@ -490,7 +490,7 @@ public class AtomixReactiveApi implements ReactiveApi {
@Override @Override
public Mono<Void> close() { public Mono<Void> close() {
return Mono.fromCompletionStage(this.atomix::stop); return Mono.fromCompletionStage(this.atomix::stop).timeout(Duration.ofSeconds(8), Mono.empty());
} }
private record DiskSessionAndId(DiskSession diskSession, long id) {} private record DiskSessionAndId(DiskSession diskSession, long id) {}

View File

@ -14,6 +14,7 @@ import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable { public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, AutoCloseable {
@ -38,6 +39,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
); );
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR) }, OverflowStrategy.ERROR)
.subscribeOn(Schedulers.boundedElastic())
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.flatMapIterable(list -> list) .flatMapIterable(list -> list)
.takeUntil(s -> closed) .takeUntil(s -> closed)
@ -61,7 +63,7 @@ public class AtomixReactiveApiMultiClient implements ReactiveApiMultiClient, Aut
LiveAtomixReactiveApiClient::deserializeResponse, LiveAtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout) Duration.between(Instant.now(), timeout)
); );
}).<T>handle((item, sink) -> { }).subscribeOn(Schedulers.boundedElastic()).<T>handle((item, sink) -> {
if (item instanceof TdApi.Error error) { if (item instanceof TdApi.Error error) {
sink.error(new TdError(error.code, error.message)); sink.error(new TdError(error.code, error.message));
} else { } else {

View File

@ -50,6 +50,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
); );
sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close)); sink.onDispose(() -> subscriptionFuture.thenAccept(Subscription::close));
}, OverflowStrategy.ERROR) }, OverflowStrategy.ERROR)
.subscribeOn(Schedulers.boundedElastic())
.onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR) .onBackpressureBuffer(0xFFFF, BufferOverflowStrategy.ERROR)
.flatMapIterable(list -> list) .flatMapIterable(list -> list)
.filter(e -> e.userId() == userId) .filter(e -> e.userId() == userId)
@ -78,7 +79,7 @@ public class DynamicAtomixReactiveApiClient implements ReactiveApiClient, AutoCl
LiveAtomixReactiveApiClient::serializeRequest, LiveAtomixReactiveApiClient::serializeRequest,
LiveAtomixReactiveApiClient::deserializeResponse, LiveAtomixReactiveApiClient::deserializeResponse,
Duration.between(Instant.now(), timeout) Duration.between(Instant.now(), timeout)
)).onErrorMap(ex -> { )).subscribeOn(Schedulers.boundedElastic()).onErrorMap(ex -> {
if (ex instanceof MessagingException.NoRemoteHandler) { if (ex instanceof MessagingException.NoRemoteHandler) {
return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster"); return new TdError(404, "Bot #IDU" + this.userId + " (liveId: " + liveId + ") is not found on the cluster");
} else { } else {

View File

@ -27,6 +27,7 @@ import reactor.core.publisher.BufferOverflowStrategy;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class LiveAtomixReactiveApiClient implements ReactiveApiClient { public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
@ -65,19 +66,22 @@ public class LiveAtomixReactiveApiClient implements ReactiveApiClient {
@Override @Override
public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) { public <T extends TdApi.Object> Mono<T> request(TdApi.Function<T> request, Instant timeout) {
return Mono.fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests", return Mono
new Request<>(liveId, request, timeout), .fromCompletionStage(() -> eventService.send("session-" + liveId + "-requests",
LiveAtomixReactiveApiClient::serializeRequest, new Request<>(liveId, request, timeout),
LiveAtomixReactiveApiClient::deserializeResponse, LiveAtomixReactiveApiClient::serializeRequest,
Duration.between(Instant.now(), timeout) LiveAtomixReactiveApiClient::deserializeResponse,
)).handle((item, sink) -> { Duration.between(Instant.now(), timeout)
if (item instanceof TdApi.Error error) { ))
sink.error(new TdError(error.code, error.message)); .subscribeOn(Schedulers.boundedElastic())
} else { .handle((item, sink) -> {
//noinspection unchecked if (item instanceof TdApi.Error error) {
sink.next((T) item); sink.error(new TdError(error.code, error.message));
} } else {
}); //noinspection unchecked
sink.next((T) item);
}
});
} }
@Override @Override

View File

@ -97,7 +97,7 @@ public abstract class ReactiveApiPublisher {
subscription.close(); subscription.close();
rawTelegramClient.dispose(); rawTelegramClient.dispose();
}); });
})).share(); })).publishOn(Schedulers.parallel()).share();
} }
public static ReactiveApiPublisher fromToken(Atomix atomix, public static ReactiveApiPublisher fromToken(Atomix atomix,
@ -181,6 +181,7 @@ public abstract class ReactiveApiPublisher {
// Send events to the client // Send events to the client
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.boundedElastic())
.subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events", .subscribe(clientBoundEvent -> eventService.broadcast("session-client-bound-events",
clientBoundEvent, ReactiveApiPublisher::serializeEvents)); clientBoundEvent, ReactiveApiPublisher::serializeEvents));
@ -466,6 +467,7 @@ public abstract class ReactiveApiPublisher {
} }
}) })
.map(responseObj -> new Response(liveId, responseObj)) .map(responseObj -> new Response(liveId, responseObj))
.publishOn(Schedulers.boundedElastic())
.toFuture(); .toFuture();
} }