Try to avoid insidious unexpected blocking code from hazelcast, netty, vert.x
This commit is contained in:
parent
57918a3ab9
commit
620914b3cf
@ -181,7 +181,9 @@ public class TdClusterManager {
|
|||||||
} else {
|
} else {
|
||||||
sink.success(Vertx.vertx(vertxOptions));
|
sink.success(Vertx.vertx(vertxOptions));
|
||||||
}
|
}
|
||||||
}).flatMap(vertx -> Mono
|
})
|
||||||
|
.publishOn(Schedulers.boundedElastic())
|
||||||
|
.flatMap(vertx -> Mono
|
||||||
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
|
.fromCallable(() -> new TdClusterManager(mgr, vertxOptions, vertx))
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
.subscribeOn(Schedulers.boundedElastic())
|
||||||
);
|
);
|
||||||
|
@ -24,6 +24,7 @@ import it.tdlight.utils.MonoUtils.SinkRWStream;
|
|||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
@ -45,7 +46,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
|
|
||||||
private final One<BinlogAsyncFile> binlog = Sinks.one();
|
private final One<BinlogAsyncFile> binlog = Sinks.one();
|
||||||
|
|
||||||
SinkRWStream<Message<TdResultList>> updates = MonoUtils.unicastBackpressureSinkStreak();
|
private final One<SinkRWStream<Message<TdResultList>>> updates = Sinks.one();
|
||||||
// This will only result in a successful completion, never completes in other ways
|
// This will only result in a successful completion, never completes in other ways
|
||||||
private final Empty<Void> updatesStreamEnd = Sinks.one();
|
private final Empty<Void> updatesStreamEnd = Sinks.one();
|
||||||
// This will only result in a crash, never completes in other ways
|
// This will only result in a crash, never completes in other ways
|
||||||
@ -63,25 +64,32 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000);
|
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Mono<AsyncTdMiddleEventBusClient> initialize() {
|
||||||
|
return MonoUtils.<Message<TdResultList>>unicastBackpressureSinkStream()
|
||||||
|
.flatMap(updates -> MonoUtils.emitValue(this.updates, updates))
|
||||||
|
.thenReturn(this);
|
||||||
|
}
|
||||||
|
|
||||||
public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager,
|
public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager,
|
||||||
int botId,
|
int botId,
|
||||||
String botAlias,
|
String botAlias,
|
||||||
boolean local,
|
boolean local,
|
||||||
JsonObject implementationDetails,
|
JsonObject implementationDetails,
|
||||||
Path binlogsArchiveDirectory) {
|
Path binlogsArchiveDirectory) {
|
||||||
var instance = new AsyncTdMiddleEventBusClient(clusterManager);
|
return new AsyncTdMiddleEventBusClient(clusterManager)
|
||||||
return retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId)
|
.initialize()
|
||||||
.flatMap(binlog -> binlog
|
.flatMap(instance -> retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId)
|
||||||
.getLastModifiedTime()
|
.flatMap(binlog -> binlog
|
||||||
.filter(modTime -> modTime == 0)
|
.getLastModifiedTime()
|
||||||
.doOnNext(v -> LoggerFactory
|
.filter(modTime -> modTime == 0)
|
||||||
.getLogger(AsyncTdMiddleEventBusClient.class)
|
.doOnNext(v -> LoggerFactory
|
||||||
.error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one..."))
|
.getLogger(AsyncTdMiddleEventBusClient.class)
|
||||||
.thenReturn(binlog)
|
.error("Can't retrieve binlog of bot " + botId + " " + botAlias + ". Creating a new one..."))
|
||||||
)
|
.thenReturn(binlog)).<AsyncTdMiddle>flatMap(binlog -> instance
|
||||||
.<AsyncTdMiddle>flatMap(binlog -> instance
|
.start(botId, botAlias, local, implementationDetails, binlog)
|
||||||
.start(botId, botAlias, local, implementationDetails, binlog)
|
.thenReturn(instance)
|
||||||
.thenReturn(instance)
|
)
|
||||||
|
.single()
|
||||||
)
|
)
|
||||||
.single();
|
.single();
|
||||||
}
|
}
|
||||||
@ -124,29 +132,42 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
implementationDetails
|
implementationDetails
|
||||||
);
|
);
|
||||||
return setupUpdatesListener()
|
return setupUpdatesListener()
|
||||||
.then(Mono.defer(() -> local ? Mono.empty()
|
.then(Mono.defer(() -> {
|
||||||
: cluster.getEventBus().<byte[]>rxRequest("bots.start-bot", msg).as(MonoUtils::toMono)))
|
if (local) {
|
||||||
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
return cluster.getEventBus()
|
||||||
|
.<byte[]>rxRequest("bots.start-bot", msg).as(MonoUtils::toMono)
|
||||||
|
.publishOn(Schedulers.boundedElastic());
|
||||||
|
}))
|
||||||
.then();
|
.then();
|
||||||
})
|
})
|
||||||
.publishOn(Schedulers.single());
|
.publishOn(Schedulers.single());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("CallingSubscribeInNonBlockingScope")
|
|
||||||
private Mono<Void> setupUpdatesListener() {
|
private Mono<Void> setupUpdatesListener() {
|
||||||
MessageConsumer<TdResultList> updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(
|
return MonoUtils
|
||||||
botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate());
|
.fromBlockingMaybe(() -> {
|
||||||
updateConsumer.endHandler(h -> {
|
MessageConsumer<TdResultList> updateConsumer = MessageConsumer.newInstance(cluster.getEventBus().<TdResultList>consumer(
|
||||||
logger.error("<<<<<<<<<<<<<<<<EndHandler?>>>>>>>>>>>>>");
|
botAddress + ".updates").setMaxBufferedMessages(5000).getDelegate());
|
||||||
});
|
updateConsumer.endHandler(h -> {
|
||||||
|
logger.error("<<<<<<<<<<<<<<<<EndHandler?>>>>>>>>>>>>>");
|
||||||
|
});
|
||||||
|
|
||||||
// Here the updates will be piped from the server to the client
|
// Here the updates will be piped from the server to the client
|
||||||
updateConsumer
|
updates
|
||||||
.rxPipeTo(updates.writeAsStream()).as(MonoUtils::toMono)
|
.asMono()
|
||||||
.subscribeOn(Schedulers.single())
|
.timeout(Duration.ofSeconds(5))
|
||||||
.subscribe();
|
.flatMap(updates -> updateConsumer
|
||||||
|
.rxPipeTo(updates.writeAsStream()).as(MonoUtils::toMono)
|
||||||
|
)
|
||||||
|
.publishOn(Schedulers.newSingle("td-client-updates-pipe"))
|
||||||
|
.subscribe();
|
||||||
|
|
||||||
// Return when the registration of all the consumers has been done across the cluster
|
return updateConsumer;
|
||||||
return updateConsumer.rxCompletionHandler().as(MonoUtils::toMono);
|
})
|
||||||
|
// Return when the registration of all the consumers has been done across the cluster
|
||||||
|
.flatMap(updateConsumer -> updateConsumer.rxCompletionHandler().as(MonoUtils::toMono));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -158,7 +179,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
.then()
|
.then()
|
||||||
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
|
.doOnSuccess(s -> logger.trace("Sent ready-to-receive, received reply"))
|
||||||
.doOnSuccess(s -> logger.trace("About to read updates flux"))
|
.doOnSuccess(s -> logger.trace("About to read updates flux"))
|
||||||
.thenMany(updates.readAsFlux())
|
.then(updates.asMono().timeout(Duration.ofSeconds(5)))
|
||||||
|
.flatMapMany(SinkRWStream::readAsFlux)
|
||||||
// Cast to fix bug of reactivex
|
// Cast to fix bug of reactivex
|
||||||
.cast(io.vertx.core.eventbus.Message.class)
|
.cast(io.vertx.core.eventbus.Message.class)
|
||||||
.timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> {
|
.timeout(Duration.ofMinutes(1), Mono.fromCallable(() -> {
|
||||||
@ -166,9 +188,12 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
ex.setStackTrace(new StackTraceElement[0]);
|
ex.setStackTrace(new StackTraceElement[0]);
|
||||||
throw ex;
|
throw ex;
|
||||||
}))
|
}))
|
||||||
.doOnSubscribe(s -> cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout))
|
.doOnSubscribe(s -> Schedulers.boundedElastic().schedule(() -> {
|
||||||
|
cluster.getEventBus().<byte[]>send(botAddress + ".ready-to-receive", EMPTY, deliveryOptionsWithTimeout);
|
||||||
|
}))
|
||||||
|
.flatMap(updates -> Mono.fromCallable((Callable<Object>) updates::body).publishOn(Schedulers.boundedElastic()))
|
||||||
.flatMap(updates -> {
|
.flatMap(updates -> {
|
||||||
var result = (TdResultList) updates.body();
|
var result = (TdResultList) updates;
|
||||||
if (result.succeeded()) {
|
if (result.succeeded()) {
|
||||||
return Flux.fromIterable(result.value());
|
return Flux.fromIterable(result.value());
|
||||||
} else {
|
} else {
|
||||||
@ -192,6 +217,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
.doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length)))
|
.doOnNext(l -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(l.body().binlog().length)))
|
||||||
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog()))
|
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.body().binlog()))
|
||||||
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
|
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
|
||||||
|
.publishOn(Schedulers.boundedElastic())
|
||||||
.thenReturn(update);
|
.thenReturn(update);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -210,13 +236,13 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
.then(cluster.getEventBus().<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono))
|
.then(cluster.getEventBus().<TdResultMessage>rxRequest(botAddress + ".execute", req, deliveryOptions).as(MonoUtils::toMono))
|
||||||
.onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex))
|
.onErrorMap(ex -> ResponseError.newResponseError(request, botAlias, ex))
|
||||||
.<TdResult<T>>flatMap(resp -> Mono
|
.<TdResult<T>>flatMap(resp -> Mono
|
||||||
.fromCallable(() -> {
|
.<TdResult<T>>fromCallable(() -> {
|
||||||
if (resp.body() == null) {
|
if (resp.body() == null) {
|
||||||
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty"));
|
throw ResponseError.newResponseError(request, botAlias, new TdError(500, "Response is empty"));
|
||||||
} else {
|
} else {
|
||||||
return resp.body().toTdResult();
|
return resp.body().toTdResult();
|
||||||
}
|
}
|
||||||
})
|
}).publishOn(Schedulers.boundedElastic())
|
||||||
)
|
)
|
||||||
.doOnSuccess(s -> logger.trace("Executed request"))
|
.doOnSuccess(s -> logger.trace("Executed request"))
|
||||||
)
|
)
|
||||||
|
@ -128,7 +128,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private Mono<Void> listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
|
private Mono<Void> listen(AsyncTdDirectImpl td, String botAddress, String botAlias, int botId, boolean local) {
|
||||||
return Mono.<Void>create(registrationSink -> {
|
return Mono.<Void>create(registrationSink -> Schedulers.boundedElastic().schedule(() -> {
|
||||||
logger.trace("Preparing listeners");
|
logger.trace("Preparing listeners");
|
||||||
|
|
||||||
MessageConsumer<ExecuteObject> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute");
|
MessageConsumer<ExecuteObject> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute");
|
||||||
@ -249,7 +249,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
|||||||
.subscribeOn(io.reactivex.schedulers.Schedulers.single())
|
.subscribeOn(io.reactivex.schedulers.Schedulers.single())
|
||||||
.doOnComplete(() -> logger.trace("Finished preparing listeners"))
|
.doOnComplete(() -> logger.trace("Finished preparing listeners"))
|
||||||
.subscribe(registrationSink::success, registrationSink::error);
|
.subscribe(registrationSink::success, registrationSink::error);
|
||||||
});
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -282,13 +282,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
|||||||
.then(readBinlogConsumer
|
.then(readBinlogConsumer
|
||||||
.asMono()
|
.asMono()
|
||||||
.timeout(Duration.ofSeconds(10), Mono.empty())
|
.timeout(Duration.ofSeconds(10), Mono.empty())
|
||||||
.doOnNext(ec -> Mono
|
.doOnNext(ec -> Schedulers.boundedElastic().schedule(() -> Mono
|
||||||
// ReadBinLog will live for another 30 minutes.
|
// ReadBinLog will live for another 30 minutes.
|
||||||
// Since every consumer of ReadBinLog is identical, this should not pose a problem.
|
// Since every consumer of ReadBinLog is identical, this should not pose a problem.
|
||||||
.delay(Duration.ofMinutes(30))
|
.delay(Duration.ofMinutes(30))
|
||||||
.then(ec.rxUnregister().as(MonoUtils::toMono))
|
.then(ec.rxUnregister().as(MonoUtils::toMono))
|
||||||
.subscribeOn(Schedulers.single())
|
.subscribeOn(Schedulers.single())
|
||||||
.subscribe()
|
.subscribe())
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.then(readyToReceiveConsumer
|
.then(readyToReceiveConsumer
|
||||||
|
@ -19,6 +19,7 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
import reactor.util.function.Tuple2;
|
import reactor.util.function.Tuple2;
|
||||||
import reactor.util.function.Tuples;
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
@ -39,7 +40,8 @@ public class BinlogUtils {
|
|||||||
// Open file
|
// Open file
|
||||||
.flatMap(x -> vertxFilesystem.rxOpen(path, openOptions).as(MonoUtils::toMono))
|
.flatMap(x -> vertxFilesystem.rxOpen(path, openOptions).as(MonoUtils::toMono))
|
||||||
.map(file -> new BinlogAsyncFile(vertxFilesystem, path, file))
|
.map(file -> new BinlogAsyncFile(vertxFilesystem, path, file))
|
||||||
.single();
|
.single()
|
||||||
|
.publishOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, byte[] data) {
|
public static Mono<Void> saveBinlog(BinlogAsyncFile binlog, byte[] data) {
|
||||||
@ -66,7 +68,8 @@ public class BinlogUtils {
|
|||||||
.then(retrieveBinlog(vertxFilesystem, binlogPath))
|
.then(retrieveBinlog(vertxFilesystem, binlogPath))
|
||||||
)
|
)
|
||||||
.single()
|
.single()
|
||||||
.then();
|
.then()
|
||||||
|
.publishOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Mono<Void> cleanSessionPath(FileSystem vertxFilesystem,
|
public static Mono<Void> cleanSessionPath(FileSystem vertxFilesystem,
|
||||||
@ -86,7 +89,8 @@ public class BinlogUtils {
|
|||||||
.flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono))
|
.flatMap(file -> vertxFilesystem.rxDeleteRecursive(file, true).as(MonoUtils::toMono))
|
||||||
.onErrorResume(ex -> Mono.empty())
|
.onErrorResume(ex -> Mono.empty())
|
||||||
.then()
|
.then()
|
||||||
);
|
)
|
||||||
|
.publishOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String humanReadableByteCountBin(long bytes) {
|
public static String humanReadableByteCountBin(long bytes) {
|
||||||
@ -123,6 +127,7 @@ public class BinlogUtils {
|
|||||||
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
|
var opts = new DeliveryOptions().setLocalOnly(local).setSendTimeout(Duration.ofSeconds(10).toMillis());
|
||||||
tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts);
|
tuple.getT1().reply(new EndSessionMessage(botId, tuple.getT2()), opts);
|
||||||
})
|
})
|
||||||
.then();
|
.then()
|
||||||
|
.publishOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -31,7 +31,6 @@ import org.slf4j.Logger;
|
|||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.warp.commonutils.concurrency.future.CompletableFutureUtils;
|
import org.warp.commonutils.concurrency.future.CompletableFutureUtils;
|
||||||
import reactor.core.CoreSubscriber;
|
import reactor.core.CoreSubscriber;
|
||||||
import reactor.core.Disposable;
|
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.MonoSink;
|
import reactor.core.publisher.MonoSink;
|
||||||
@ -303,7 +302,7 @@ public class MonoUtils {
|
|||||||
return fromEmitResultFuture(sink.tryEmitEmpty());
|
return fromEmitResultFuture(sink.tryEmitEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> SinkRWStream<T> unicastBackpressureSinkStreak() {
|
public static <T> Mono<SinkRWStream<T>> unicastBackpressureSinkStream() {
|
||||||
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
|
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
|
||||||
return asStream(sink, null, null, 1);
|
return asStream(sink, null, null, 1);
|
||||||
}
|
}
|
||||||
@ -311,7 +310,7 @@ public class MonoUtils {
|
|||||||
/**
|
/**
|
||||||
* Create a sink that can be written from a writeStream
|
* Create a sink that can be written from a writeStream
|
||||||
*/
|
*/
|
||||||
public static <T> SinkRWStream<T> unicastBackpressureStream(int maxBackpressureQueueSize) {
|
public static <T> Mono<SinkRWStream<T>> unicastBackpressureStream(int maxBackpressureQueueSize) {
|
||||||
Queue<T> boundedQueue = Queues.<T>get(maxBackpressureQueueSize).get();
|
Queue<T> boundedQueue = Queues.<T>get(maxBackpressureQueueSize).get();
|
||||||
var queueSize = Flux
|
var queueSize = Flux
|
||||||
.interval(Duration.ZERO, Duration.ofMillis(500))
|
.interval(Duration.ZERO, Duration.ofMillis(500))
|
||||||
@ -321,16 +320,16 @@ public class MonoUtils {
|
|||||||
return asStream(sink, queueSize, termination, maxBackpressureQueueSize);
|
return asStream(sink, queueSize, termination, maxBackpressureQueueSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> SinkRWStream<T> unicastBackpressureErrorStream() {
|
public static <T> Mono<SinkRWStream<T>> unicastBackpressureErrorStream() {
|
||||||
Many<T> sink = Sinks.many().unicast().onBackpressureError();
|
Many<T> sink = Sinks.many().unicast().onBackpressureError();
|
||||||
return asStream(sink, null, null, 1);
|
return asStream(sink, null, null, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> SinkRWStream<T> asStream(Many<T> sink,
|
public static <T> Mono<SinkRWStream<T>> asStream(Many<T> sink,
|
||||||
@Nullable Flux<Integer> backpressureSize,
|
@Nullable Flux<Integer> backpressureSize,
|
||||||
@Nullable Empty<Void> termination,
|
@Nullable Empty<Void> termination,
|
||||||
int maxBackpressureQueueSize) {
|
int maxBackpressureQueueSize) {
|
||||||
return new SinkRWStream<>(sink, backpressureSize, termination, maxBackpressureQueueSize);
|
return SinkRWStream.create(sink, backpressureSize, termination, maxBackpressureQueueSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Future<Void> toVertxFuture(Mono<Void> toTransform) {
|
private static Future<Void> toVertxFuture(Mono<Void> toTransform) {
|
||||||
@ -347,46 +346,64 @@ public class MonoUtils {
|
|||||||
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {
|
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {
|
||||||
|
|
||||||
private final Many<T> sink;
|
private final Many<T> sink;
|
||||||
private final @Nullable Disposable drainSubscription;
|
private final Flux<Integer> backpressureSize;
|
||||||
|
private final Empty<Void> termination;
|
||||||
private Handler<Throwable> exceptionHandler = e -> {};
|
private Handler<Throwable> exceptionHandler = e -> {};
|
||||||
private Handler<Void> drainHandler = h -> {};
|
private Handler<Void> drainHandler = h -> {};
|
||||||
private final int maxBackpressureQueueSize;
|
private final int maxBackpressureQueueSize;
|
||||||
private volatile int writeQueueMaxSize;
|
private volatile int writeQueueMaxSize;
|
||||||
private volatile boolean writeQueueFull = false;
|
private volatile boolean writeQueueFull = false;
|
||||||
|
|
||||||
public SinkRWStream(Many<T> sink,
|
private SinkRWStream(Many<T> sink,
|
||||||
@Nullable Flux<Integer> backpressureSize,
|
@Nullable Flux<Integer> backpressureSize,
|
||||||
@Nullable Empty<Void> termination,
|
@Nullable Empty<Void> termination,
|
||||||
int maxBackpressureQueueSize) {
|
int maxBackpressureQueueSize) {
|
||||||
this.maxBackpressureQueueSize = maxBackpressureQueueSize;
|
this.maxBackpressureQueueSize = maxBackpressureQueueSize;
|
||||||
this.writeQueueMaxSize = this.maxBackpressureQueueSize;
|
this.writeQueueMaxSize = this.maxBackpressureQueueSize;
|
||||||
|
this.backpressureSize = backpressureSize;
|
||||||
|
this.termination = termination;
|
||||||
this.sink = sink;
|
this.sink = sink;
|
||||||
|
}
|
||||||
|
|
||||||
if (backpressureSize != null) {
|
public Mono<SinkRWStream<T>> initialize() {
|
||||||
AtomicBoolean drained = new AtomicBoolean(true);
|
return Mono.fromCallable(() -> {
|
||||||
this.drainSubscription = backpressureSize
|
if (backpressureSize != null) {
|
||||||
.subscribeOn(Schedulers.single())
|
AtomicBoolean drained = new AtomicBoolean(true);
|
||||||
.subscribe(size -> {
|
var drainSubscription = backpressureSize
|
||||||
writeQueueFull = size >= this.writeQueueMaxSize;
|
.publishOn(Schedulers.boundedElastic())
|
||||||
|
.subscribe(size -> {
|
||||||
|
writeQueueFull = size >= this.writeQueueMaxSize;
|
||||||
|
|
||||||
boolean newDrained = size <= this.writeQueueMaxSize / 2;
|
boolean newDrained = size <= this.writeQueueMaxSize / 2;
|
||||||
boolean oldDrained = drained.getAndSet(newDrained);
|
boolean oldDrained = drained.getAndSet(newDrained);
|
||||||
if (newDrained && !oldDrained) {
|
if (newDrained && !oldDrained) {
|
||||||
drainHandler.handle(null);
|
drainHandler.handle(null);
|
||||||
}
|
}
|
||||||
}, ex -> {
|
}, ex -> {
|
||||||
exceptionHandler.handle(ex);
|
exceptionHandler.handle(ex);
|
||||||
}, () -> {
|
}, () -> {
|
||||||
if (!drained.get()) {
|
if (!drained.get()) {
|
||||||
drainHandler.handle(null);
|
drainHandler.handle(null);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (termination != null) {
|
if (termination != null) {
|
||||||
termination.asMono().subscribeOn(Schedulers.single()).doOnTerminate(drainSubscription::dispose).subscribe();
|
termination
|
||||||
|
.asMono()
|
||||||
|
.doOnTerminate(drainSubscription::dispose)
|
||||||
|
.publishOn(Schedulers.boundedElastic())
|
||||||
|
.subscribe();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
this.drainSubscription = null;
|
return this;
|
||||||
}
|
}).publishOn(Schedulers.boundedElastic());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Mono<SinkRWStream<T>> create(Many<T> sink,
|
||||||
|
@Nullable Flux<Integer> backpressureSize,
|
||||||
|
@Nullable Empty<Void> termination,
|
||||||
|
int maxBackpressureQueueSize) {
|
||||||
|
return new SinkRWStream<T>(sink, backpressureSize, termination, maxBackpressureQueueSize).initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Flux<T> readAsFlux() {
|
public Flux<T> readAsFlux() {
|
||||||
@ -423,7 +440,7 @@ public class MonoUtils {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
|
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
|
||||||
sink.asFlux().subscribeWith(new CoreSubscriber<T>() {
|
sink.asFlux().publishOn(Schedulers.boundedElastic()).subscribeWith(new CoreSubscriber<T>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSubscribe(@NotNull Subscription s) {
|
public void onSubscribe(@NotNull Subscription s) {
|
||||||
@ -500,24 +517,6 @@ public class MonoUtils {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void end(Handler<AsyncResult<Void>> handler) {
|
public void end(Handler<AsyncResult<Void>> handler) {
|
||||||
/*
|
|
||||||
MonoUtils.emitCompleteFuture(sink).recover(error -> {
|
|
||||||
if (error instanceof EmissionException) {
|
|
||||||
var sinkError = (EmissionException) error;
|
|
||||||
switch (sinkError.getReason()) {
|
|
||||||
case FAIL_CANCELLED:
|
|
||||||
case FAIL_ZERO_SUBSCRIBER:
|
|
||||||
case FAIL_TERMINATED:
|
|
||||||
return Future.succeededFuture();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Future.failedFuture(error);
|
|
||||||
}).onComplete(h -> {
|
|
||||||
if (drainSubscription != null) {
|
|
||||||
drainSubscription.dispose();
|
|
||||||
}
|
|
||||||
}).onComplete(handler);
|
|
||||||
*/
|
|
||||||
MonoUtils.emitCompleteFuture(sink).onComplete(handler);
|
MonoUtils.emitCompleteFuture(sink).onComplete(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -578,7 +577,7 @@ public class MonoUtils {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
|
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
|
||||||
flux.subscribeWith(new CoreSubscriber<T>() {
|
flux.publishOn(Schedulers.boundedElastic()).subscribeWith(new CoreSubscriber<T>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSubscribe(@NotNull Subscription s) {
|
public void onSubscribe(@NotNull Subscription s) {
|
||||||
|
Loading…
Reference in New Issue
Block a user