Code cleanup
This commit is contained in:
parent
b8dc06dbd6
commit
a760bf6e14
|
@ -1,347 +0,0 @@
|
||||||
package it.tdlight.tdlibsession;
|
|
||||||
|
|
||||||
import io.vertx.core.eventbus.DeliveryOptions;
|
|
||||||
import io.vertx.core.eventbus.ReplyException;
|
|
||||||
import io.vertx.reactivex.core.eventbus.EventBus;
|
|
||||||
import io.vertx.reactivex.core.eventbus.Message;
|
|
||||||
import io.vertx.core.eventbus.MessageCodec;
|
|
||||||
import io.vertx.reactivex.core.eventbus.MessageConsumer;
|
|
||||||
import it.tdlight.utils.MonoUtils;
|
|
||||||
import java.net.ConnectException;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import org.jetbrains.annotations.Nullable;
|
|
||||||
import org.warp.commonutils.log.Logger;
|
|
||||||
import org.warp.commonutils.log.LoggerFactory;
|
|
||||||
import reactor.core.Disposable;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
import reactor.core.publisher.Sinks;
|
|
||||||
import reactor.core.publisher.Sinks.One;
|
|
||||||
import reactor.core.scheduler.Schedulers;
|
|
||||||
import reactor.util.function.Tuple2;
|
|
||||||
import reactor.util.function.Tuples;
|
|
||||||
|
|
||||||
public class EventBusFlux {
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(EventBusFlux.class);
|
|
||||||
|
|
||||||
private static final byte[] EMPTY = new byte[0];
|
|
||||||
|
|
||||||
public static <T> void registerFluxCodec(EventBus eventBus, MessageCodec<T, T> itemsCodec) {
|
|
||||||
var signalsCodec = new SignalMessageCodec<T>(itemsCodec);
|
|
||||||
try {
|
|
||||||
eventBus.registerCodec(signalsCodec);
|
|
||||||
} catch (IllegalStateException ex) {
|
|
||||||
if (!ex.getMessage().contains("Already a codec registered with name")) {
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If the flux is fast and you are on a network, please do this:
|
|
||||||
*
|
|
||||||
* <pre>flux
|
|
||||||
.bufferTimeout(Duration.ofMillis(100))
|
|
||||||
.windowTimeout(1, Duration.ofSeconds(5))
|
|
||||||
.flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))</pre>
|
|
||||||
*
|
|
||||||
* @return tuple. T1 = flux served, T2 = error that caused cancelling of the subscription
|
|
||||||
*/
|
|
||||||
public static <T> Tuple2<Mono<Void>, Mono<Throwable>> serve(Flux<T> flux,
|
|
||||||
EventBus eventBus,
|
|
||||||
String fluxAddress,
|
|
||||||
DeliveryOptions baseDeliveryOptions,
|
|
||||||
MessageCodec<T, T> itemsCodec,
|
|
||||||
Duration connectionTimeout) {
|
|
||||||
var signalsCodec = new SignalMessageCodec<T>(itemsCodec);
|
|
||||||
EventBusFlux.registerFluxCodec(eventBus, itemsCodec);
|
|
||||||
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
|
|
||||||
.setSendTimeout(connectionTimeout.toMillis());
|
|
||||||
var signalDeliveryOptions = new DeliveryOptions(deliveryOptions)
|
|
||||||
.setCodecName(signalsCodec.name());
|
|
||||||
AtomicInteger subscriptionsCount = new AtomicInteger();
|
|
||||||
One<Throwable> fatalErrorSink = Sinks.one();
|
|
||||||
var servedMono = Mono.<Void>create(sink -> {
|
|
||||||
MessageConsumer<byte[]> subscribe = eventBus.consumer(fluxAddress + ".subscribe");
|
|
||||||
|
|
||||||
subscribe.handler(msg -> {
|
|
||||||
subscribe.unregister(subscribeUnregistered -> {
|
|
||||||
if (subscribeUnregistered.succeeded()) {
|
|
||||||
if (subscriptionsCount.incrementAndGet() > 1) {
|
|
||||||
subscriptionsCount.decrementAndGet();
|
|
||||||
logger.error("Another client tried to connect to the same flux. Rejecting the request.");
|
|
||||||
msg.fail(500, "This flux is already in use!");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
long subscriptionId = 0;
|
|
||||||
var subscriptionAddress = fluxAddress + "." + subscriptionId;
|
|
||||||
|
|
||||||
MessageConsumer<byte[]> subscriptionReady = eventBus.consumer(subscriptionAddress + ".subscriptionReady");
|
|
||||||
MessageConsumer<byte[]> dispose = eventBus.consumer(subscriptionAddress + ".dispose");
|
|
||||||
MessageConsumer<byte[]> ping = eventBus.consumer(subscriptionAddress + ".ping");
|
|
||||||
MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel");
|
|
||||||
|
|
||||||
subscriptionReady.<Long>handler(subscriptionReadyMsg -> {
|
|
||||||
subscriptionReady.unregister(subscriptionReadyUnregistered -> {
|
|
||||||
if (subscriptionReadyUnregistered.succeeded()) {
|
|
||||||
AtomicReference<Disposable> atomicSubscription = new AtomicReference<>(null);
|
|
||||||
var subscription = flux
|
|
||||||
.onErrorResume(error -> Mono
|
|
||||||
.<Message<T>>create(errorSink -> {
|
|
||||||
var responseHandler = MonoUtils.toHandler(errorSink);
|
|
||||||
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onError(error), signalDeliveryOptions, responseHandler);
|
|
||||||
})
|
|
||||||
.then(Mono.empty())
|
|
||||||
)
|
|
||||||
.flatMapSequential(item -> Mono.<Message<T>>create(itemSink -> {
|
|
||||||
var responseHandler = MonoUtils.toHandler(itemSink);
|
|
||||||
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, responseHandler);
|
|
||||||
}))
|
|
||||||
.subscribeOn(Schedulers.parallel())
|
|
||||||
.subscribe(response -> {}, error -> {
|
|
||||||
if (error instanceof ReplyException) {
|
|
||||||
var errorMessageCode = ((ReplyException) error).failureCode();
|
|
||||||
// -1 == NO_HANDLERS
|
|
||||||
if (errorMessageCode == -1) {
|
|
||||||
logger.error("Can't send a signal of flux \"" + fluxAddress + "\" because the connection was lost");
|
|
||||||
} else {
|
|
||||||
logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.toString());
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error);
|
|
||||||
}
|
|
||||||
fatalErrorSink.tryEmitValue(error);
|
|
||||||
disposeFlux(atomicSubscription.get(),
|
|
||||||
fatalErrorSink,
|
|
||||||
subscriptionReady,
|
|
||||||
ping,
|
|
||||||
cancel,
|
|
||||||
dispose,
|
|
||||||
fluxAddress,
|
|
||||||
() -> {
|
|
||||||
logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error");
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}, () -> {
|
|
||||||
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions, msg2 -> {
|
|
||||||
logger.info("Completed flux \"" + fluxAddress + "\"");
|
|
||||||
if (msg2.failed()) {
|
|
||||||
logger.error("Failed to send onComplete signal", msg2.cause());
|
|
||||||
fatalErrorSink.tryEmitValue(msg2.cause());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
atomicSubscription.set(subscription);
|
|
||||||
|
|
||||||
ping.handler(msg2 -> {
|
|
||||||
logger.trace("Client is still alive");
|
|
||||||
msg2.reply(EMPTY, deliveryOptions);
|
|
||||||
});
|
|
||||||
|
|
||||||
cancel.handler(msg2 -> {
|
|
||||||
logger.trace("Cancelling flux \"" + fluxAddress + "\"");
|
|
||||||
subscription.dispose();
|
|
||||||
logger.debug("Cancelled flux \"" + fluxAddress + "\"");
|
|
||||||
msg2.reply(EMPTY, deliveryOptions);
|
|
||||||
});
|
|
||||||
|
|
||||||
dispose.handler(msg2 -> {
|
|
||||||
disposeFlux(subscription,
|
|
||||||
fatalErrorSink,
|
|
||||||
subscriptionReady,
|
|
||||||
ping,
|
|
||||||
cancel,
|
|
||||||
dispose,
|
|
||||||
fluxAddress,
|
|
||||||
() -> msg2.reply(EMPTY)
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
ping.completionHandler(h0 -> {
|
|
||||||
if (h0.succeeded()) {
|
|
||||||
cancel.completionHandler(h1 -> {
|
|
||||||
if (h1.succeeded()) {
|
|
||||||
dispose.completionHandler(h2 -> {
|
|
||||||
if (h2.succeeded()) {
|
|
||||||
subscriptionReadyMsg.reply((Long) subscriptionId);
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to register dispose", h1.cause());
|
|
||||||
subscriptionReadyMsg.fail(500, "Failed to register dispose");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to register cancel", h1.cause());
|
|
||||||
subscriptionReadyMsg.fail(500, "Failed to register cancel");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to register ping", h0.cause());
|
|
||||||
subscriptionReadyMsg.fail(500, "Failed to register ping");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to unregister \"subscription ready\"");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
subscriptionReady.completionHandler(srh -> {
|
|
||||||
if (srh.succeeded()) {
|
|
||||||
msg.reply((Long) subscriptionId);
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to register \"subscription ready\"", srh.cause());
|
|
||||||
msg.fail(500, "Failed to register \"subscription ready\"");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to unregister subscribe consumer");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
subscribe.completionHandler(h -> {
|
|
||||||
if (h.failed()) {
|
|
||||||
sink.error(h.cause());
|
|
||||||
} else {
|
|
||||||
sink.success();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}).publishOn(Schedulers.parallel()).share();
|
|
||||||
|
|
||||||
return Tuples.of(servedMono, fatalErrorSink.asMono());
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void disposeFlux(@Nullable Disposable subscription,
|
|
||||||
One<Throwable> fatalErrorSink,
|
|
||||||
MessageConsumer<byte[]> subscriptionReady,
|
|
||||||
MessageConsumer<byte[]> ping,
|
|
||||||
MessageConsumer<byte[]> cancel,
|
|
||||||
MessageConsumer<byte[]> dispose,
|
|
||||||
String fluxAddress,
|
|
||||||
Runnable after) {
|
|
||||||
logger.trace("Disposing flux \"" + fluxAddress + "\"");
|
|
||||||
fatalErrorSink.tryEmitEmpty();
|
|
||||||
if (subscription != null) {
|
|
||||||
subscription.dispose();
|
|
||||||
}
|
|
||||||
subscriptionReady.unregister(v0 -> {
|
|
||||||
if (v0.failed()) {
|
|
||||||
logger.error("Failed to unregister subscriptionReady", v0.cause());
|
|
||||||
}
|
|
||||||
ping.unregister(v1 -> {
|
|
||||||
if (v1.failed()) {
|
|
||||||
logger.error("Failed to unregister ping", v1.cause());
|
|
||||||
}
|
|
||||||
cancel.unregister(v2 -> {
|
|
||||||
if (v2.failed()) {
|
|
||||||
logger.error("Failed to unregister cancel", v2.cause());
|
|
||||||
}
|
|
||||||
dispose.unregister(v3 -> {
|
|
||||||
if (v3.failed()) {
|
|
||||||
logger.error("Failed to unregister dispose", v3.cause());
|
|
||||||
}
|
|
||||||
logger.debug("Disposed flux \"" + fluxAddress + "\"");
|
|
||||||
after.run();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Flux<T> connect(EventBus eventBus,
|
|
||||||
String fluxAddress,
|
|
||||||
DeliveryOptions baseDeliveryOptions,
|
|
||||||
MessageCodec<T, T> itemsCodec,
|
|
||||||
Duration connectionTimeout) {
|
|
||||||
EventBusFlux.registerFluxCodec(eventBus, itemsCodec);
|
|
||||||
return Flux.<T>create(emitter -> {
|
|
||||||
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
|
|
||||||
.setSendTimeout(connectionTimeout.toMillis());
|
|
||||||
eventBus.<Long>request(fluxAddress + ".subscribe", EMPTY, deliveryOptions, msg -> {
|
|
||||||
if (msg.succeeded()) {
|
|
||||||
long subscriptionId = msg.result().body();
|
|
||||||
var subscriptionAddress = fluxAddress + "." + subscriptionId;
|
|
||||||
|
|
||||||
var signalConsumer = eventBus.<SignalMessage<T>>consumer(subscriptionAddress + ".signal");
|
|
||||||
signalConsumer.handler(msg2 -> {
|
|
||||||
var signal = msg2.body();
|
|
||||||
switch (signal.getSignalType()) {
|
|
||||||
case ITEM:
|
|
||||||
emitter.next(signal.getItem());
|
|
||||||
break;
|
|
||||||
case ERROR:
|
|
||||||
emitter.error(new Exception(signal.getErrorMessage()));
|
|
||||||
break;
|
|
||||||
case COMPLETE:
|
|
||||||
emitter.complete();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
msg2.reply(EMPTY);
|
|
||||||
});
|
|
||||||
signalConsumer.completionHandler(h -> {
|
|
||||||
if (h.succeeded()) {
|
|
||||||
eventBus.<Long>request(subscriptionAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> {
|
|
||||||
if (msg2.failed()) {
|
|
||||||
logger.error("Failed to tell that the subscription is ready");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
var pingSubscription = Flux.interval(Duration.ofSeconds(10)).flatMapSequential(n -> Mono.create(pingSink ->
|
|
||||||
eventBus.<byte[]>request(subscriptionAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> {
|
|
||||||
if (pingMsg.succeeded()) {
|
|
||||||
pingSink.success(pingMsg.result().body());
|
|
||||||
} else {
|
|
||||||
var pingError = pingMsg.cause();
|
|
||||||
if (pingError instanceof ReplyException) {
|
|
||||||
var pingReplyException = (ReplyException) pingError;
|
|
||||||
// -1 = NO_HANDLERS
|
|
||||||
if (pingReplyException.failureCode() == -1) {
|
|
||||||
pingSink.error(new ConnectException( "Can't send a ping to flux \"" + fluxAddress + "\" because the connection was lost"));
|
|
||||||
} else {
|
|
||||||
pingSink.error(new ConnectException("Ping failed: " + pingReplyException.toString()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pingSink.error(new IllegalStateException("Ping failed: " + pingError.getMessage()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})))
|
|
||||||
.publishOn(Schedulers.boundedElastic())
|
|
||||||
.onBackpressureBuffer()
|
|
||||||
.subscribeOn(Schedulers.parallel())
|
|
||||||
.subscribe(v -> {}, emitter::error);
|
|
||||||
|
|
||||||
emitter.onDispose(() -> {
|
|
||||||
if (!pingSubscription.isDisposed()) {
|
|
||||||
pingSubscription.dispose();
|
|
||||||
}
|
|
||||||
eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> {
|
|
||||||
if (msg.failed()) {
|
|
||||||
logger.error("Failed to tell that the subscription is disposed");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
emitter.onCancel(() -> {
|
|
||||||
if (!pingSubscription.isDisposed()) {
|
|
||||||
pingSubscription.dispose();
|
|
||||||
}
|
|
||||||
eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> {
|
|
||||||
if (msg.failed()) {
|
|
||||||
logger.error("Failed to tell that the subscription is cancelled");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
emitter.error(new IllegalStateException("Subscription failed", msg.cause()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -171,7 +171,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
||||||
.<byte[]>rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions)
|
.<byte[]>rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions)
|
||||||
.as(MonoUtils::toMono);
|
.as(MonoUtils::toMono);
|
||||||
})
|
})
|
||||||
.flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic()))
|
.flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic()))
|
||||||
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
|
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
|
||||||
.takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> {
|
.takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> {
|
||||||
logger.trace("About to kill pinger because updates stream ended");
|
logger.trace("About to kill pinger because updates stream ended");
|
||||||
|
@ -295,7 +295,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
||||||
case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
|
case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
|
||||||
return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib"))
|
return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib"))
|
||||||
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
|
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))
|
||||||
.flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel()))
|
.flatMap(latestBinlogMsg -> Mono.fromCallable(latestBinlogMsg::body).subscribeOn(Schedulers.parallel()))
|
||||||
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
|
.doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length())))
|
||||||
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
|
.flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog()))
|
||||||
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
|
.doOnSuccess(s -> logger.info("Overwritten binlog from server"))
|
||||||
|
|
|
@ -62,54 +62,6 @@ public class MonoUtils {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> Handler<AsyncResult<T>> toHandler(SynchronousSink<T> sink) {
|
|
||||||
return event -> {
|
|
||||||
if (event.succeeded()) {
|
|
||||||
if (event.result() == null) {
|
|
||||||
sink.complete();
|
|
||||||
} else {
|
|
||||||
sink.next(Objects.requireNonNull(event.result()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
sink.error(event.cause());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Handler<AsyncResult<T>> toHandler(MonoSink<T> sink) {
|
|
||||||
return event -> {
|
|
||||||
if (event.succeeded()) {
|
|
||||||
if (event.result() == null) {
|
|
||||||
sink.success();
|
|
||||||
} else {
|
|
||||||
sink.success(Objects.requireNonNull(event.result()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
sink.error(event.cause());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> SynchronousSink<T> toSink(Context context, Promise<T> promise) {
|
|
||||||
return PromiseSink.of(context, promise);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Mono<T> executeAsFuture(Consumer<Handler<AsyncResult<T>>> action) {
|
|
||||||
return Mono.<T>fromFuture(() -> {
|
|
||||||
return CompletableFutureUtils.getCompletableFuture(() -> {
|
|
||||||
var resultFuture = new CompletableFuture<T>();
|
|
||||||
action.accept(handler -> {
|
|
||||||
if (handler.failed()) {
|
|
||||||
resultFuture.completeExceptionally(handler.cause());
|
|
||||||
} else {
|
|
||||||
resultFuture.complete(handler.result());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return resultFuture;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Mono<T> fromBlockingMaybe(Callable<T> callable) {
|
public static <T> Mono<T> fromBlockingMaybe(Callable<T> callable) {
|
||||||
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic());
|
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
|
@ -125,30 +77,6 @@ public class MonoUtils {
|
||||||
return fromBlockingMaybe(callable).single();
|
return fromBlockingMaybe(callable).single();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> CoreSubscriber<? super T> toSubscriber(Promise<T> promise) {
|
|
||||||
return new CoreSubscriber<T>() {
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(Subscription s) {
|
|
||||||
s.request(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(T t) {
|
|
||||||
promise.complete(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
promise.fail(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
promise.tryComplete();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <R extends TdApi.Object> void orElseThrowFuture(TdResult<R> value, SynchronousSink<CompletableFuture<R>> sink) {
|
public static <R extends TdApi.Object> void orElseThrowFuture(TdResult<R> value, SynchronousSink<CompletableFuture<R>> sink) {
|
||||||
if (value.succeeded()) {
|
if (value.succeeded()) {
|
||||||
sink.next(CompletableFuture.completedFuture(value.result()));
|
sink.next(CompletableFuture.completedFuture(value.result()));
|
||||||
|
@ -203,42 +131,8 @@ public class MonoUtils {
|
||||||
})).retry();
|
})).retry();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> Mono<T> fromFuture(CompletableFuture<T> future) {
|
|
||||||
return Mono.create(sink -> {
|
|
||||||
future.whenComplete((result, error) -> {
|
|
||||||
if (error != null) {
|
|
||||||
sink.error(error);
|
|
||||||
} else if (result != null) {
|
|
||||||
sink.success(result);
|
|
||||||
} else {
|
|
||||||
sink.success();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Mono<T> fromFuture(Supplier<CompletableFuture<T>> future) {
|
|
||||||
return Mono.create(sink -> {
|
|
||||||
CompletableFutureUtils.getCompletableFuture(future).whenComplete((result, error) -> {
|
|
||||||
if (error != null) {
|
|
||||||
sink.error(error.getCause());
|
|
||||||
} else if (result != null) {
|
|
||||||
sink.success(result);
|
|
||||||
} else {
|
|
||||||
sink.success();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T extends Object> CompletableFuture<T> toFuture(Mono<T> mono) {
|
|
||||||
var cf = new CompletableFuture<T>();
|
|
||||||
mono.subscribe(cf::complete, cf::completeExceptionally, () -> cf.complete(null));
|
|
||||||
return cf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Mono<T> toMono(Future<T> future) {
|
public static <T> Mono<T> toMono(Future<T> future) {
|
||||||
return Mono.<T>create(sink -> future.onComplete(result -> {
|
return Mono.create(sink -> future.onComplete(result -> {
|
||||||
if (result.succeeded()) {
|
if (result.succeeded()) {
|
||||||
sink.success(result.result());
|
sink.success(result.result());
|
||||||
} else {
|
} else {
|
||||||
|
@ -325,65 +219,11 @@ public class MonoUtils {
|
||||||
return fromEmitResultFuture(sink.tryEmitEmpty());
|
return fromEmitResultFuture(sink.tryEmitEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> Mono<SinkRWStream<T>> unicastBackpressureSinkStream(Scheduler scheduler) {
|
|
||||||
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer();
|
|
||||||
return asStream(sink, scheduler, null, null, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a sink that can be written from a writeStream
|
|
||||||
*/
|
|
||||||
public static <T> Mono<SinkRWStream<T>> unicastBackpressureStream(Scheduler scheduler, int maxBackpressureQueueSize) {
|
|
||||||
Queue<T> boundedQueue = Queues.<T>get(maxBackpressureQueueSize).get();
|
|
||||||
var queueSize = Flux
|
|
||||||
.interval(Duration.ZERO, Duration.ofMillis(500))
|
|
||||||
.map(n -> boundedQueue.size());
|
|
||||||
Empty<Void> termination = Sinks.empty();
|
|
||||||
Many<T> sink = Sinks.many().unicast().onBackpressureBuffer(boundedQueue, termination::tryEmitEmpty);
|
|
||||||
return asStream(sink, scheduler, queueSize, termination, maxBackpressureQueueSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Mono<SinkRWStream<T>> unicastBackpressureErrorStream(Scheduler scheduler) {
|
|
||||||
Many<T> sink = Sinks.many().unicast().onBackpressureError();
|
|
||||||
return asStream(sink, scheduler, null, null, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Mono<SinkRWStream<T>> asStream(Many<T> sink,
|
|
||||||
Scheduler scheduler,
|
|
||||||
@Nullable Flux<Integer> backpressureSize,
|
|
||||||
@Nullable Empty<Void> termination,
|
|
||||||
int maxBackpressureQueueSize) {
|
|
||||||
return SinkRWStream.create(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Future<Void> toVertxFuture(Mono<Void> toTransform) {
|
|
||||||
var promise = Promise.<Void>promise();
|
|
||||||
toTransform.subscribeOn(Schedulers.parallel()).subscribe(next -> {}, promise::fail, promise::complete);
|
|
||||||
return promise.future();
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public static <T> Mono<T> castVoid(Mono<Void> mono) {
|
public static <T> Mono<T> castVoid(Mono<Void> mono) {
|
||||||
return (Mono) mono;
|
return (Mono) mono;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This method fails to guarantee that the consumer gets registered on all clusters before returning.
|
|
||||||
* Use fromConsumerAdvanced if you want better stability.
|
|
||||||
*/
|
|
||||||
@Deprecated
|
|
||||||
public static <T> Flux<T> fromConsumerUnsafe(MessageConsumer<T> messageConsumer) {
|
|
||||||
return Flux.<Message<T>>create(sink -> {
|
|
||||||
messageConsumer.endHandler(e -> sink.complete());
|
|
||||||
sink.onDispose(messageConsumer::unregister);
|
|
||||||
})
|
|
||||||
//.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono)))
|
|
||||||
.flatMapSequential(msg -> Mono
|
|
||||||
.fromCallable(msg::body)
|
|
||||||
.subscribeOn(Schedulers.parallel())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Flux<T> fromMessageConsumer(Mono<Void> onRegistered, MessageConsumer<T> messageConsumer) {
|
public static <T> Flux<T> fromMessageConsumer(Mono<Void> onRegistered, MessageConsumer<T> messageConsumer) {
|
||||||
return fromReplyableMessageConsumer(onRegistered, messageConsumer).map(Message::body);
|
return fromReplyableMessageConsumer(onRegistered, messageConsumer).map(Message::body);
|
||||||
}
|
}
|
||||||
|
@ -447,472 +287,4 @@ public class MonoUtils {
|
||||||
.defaultIfEmpty(false);
|
.defaultIfEmpty(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class SinkRWStream<T> implements io.vertx.core.streams.WriteStream<T>, io.vertx.core.streams.ReadStream<T> {
|
|
||||||
|
|
||||||
private final Many<T> sink;
|
|
||||||
private final Scheduler scheduler;
|
|
||||||
private final Flux<Integer> backpressureSize;
|
|
||||||
private final Empty<Void> termination;
|
|
||||||
private Handler<Throwable> exceptionHandler = e -> {};
|
|
||||||
private Handler<Void> drainHandler = h -> {};
|
|
||||||
private final int maxBackpressureQueueSize;
|
|
||||||
private volatile int writeQueueMaxSize;
|
|
||||||
private volatile boolean writeQueueFull = false;
|
|
||||||
|
|
||||||
private SinkRWStream(Many<T> sink,
|
|
||||||
Scheduler scheduler,
|
|
||||||
@Nullable Flux<Integer> backpressureSize,
|
|
||||||
@Nullable Empty<Void> termination,
|
|
||||||
int maxBackpressureQueueSize) {
|
|
||||||
this.maxBackpressureQueueSize = maxBackpressureQueueSize;
|
|
||||||
this.writeQueueMaxSize = this.maxBackpressureQueueSize;
|
|
||||||
this.backpressureSize = backpressureSize;
|
|
||||||
this.termination = termination;
|
|
||||||
this.sink = sink;
|
|
||||||
this.scheduler = scheduler;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Mono<SinkRWStream<T>> initialize() {
|
|
||||||
return Mono.fromCallable(() -> {
|
|
||||||
if (backpressureSize != null) {
|
|
||||||
AtomicBoolean drained = new AtomicBoolean(true);
|
|
||||||
var drainSubscription = backpressureSize
|
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
|
||||||
.subscribe(size -> {
|
|
||||||
writeQueueFull = size >= this.writeQueueMaxSize;
|
|
||||||
|
|
||||||
boolean newDrained = size <= this.writeQueueMaxSize / 2;
|
|
||||||
boolean oldDrained = drained.getAndSet(newDrained);
|
|
||||||
if (newDrained && !oldDrained) {
|
|
||||||
drainHandler.handle(null);
|
|
||||||
}
|
|
||||||
}, ex -> {
|
|
||||||
exceptionHandler.handle(ex);
|
|
||||||
}, () -> {
|
|
||||||
if (!drained.get()) {
|
|
||||||
drainHandler.handle(null);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
if (termination != null) {
|
|
||||||
termination
|
|
||||||
.asMono()
|
|
||||||
.doOnTerminate(drainSubscription::dispose)
|
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
|
||||||
.subscribe();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return this;
|
|
||||||
}).subscribeOn(Schedulers.boundedElastic());
|
|
||||||
}
|
|
||||||
|
|
||||||
public static <T> Mono<SinkRWStream<T>> create(Many<T> sink,
|
|
||||||
Scheduler scheduler,
|
|
||||||
@Nullable Flux<Integer> backpressureSize,
|
|
||||||
@Nullable Empty<Void> termination,
|
|
||||||
int maxBackpressureQueueSize) {
|
|
||||||
return new SinkRWStream<T>(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize).initialize();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Flux<T> readAsFlux() {
|
|
||||||
return sink.asFlux();
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReactiveReactorReadStream<T> readAsStream() {
|
|
||||||
return new ReactiveReactorReadStream<>(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Many<T> writeAsSink() {
|
|
||||||
return sink;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReactiveSinkWriteStream<T> writeAsStream() {
|
|
||||||
return new ReactiveSinkWriteStream<>(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public SinkRWStream<T> exceptionHandler(Handler<Throwable> handler) {
|
|
||||||
exceptionHandler = handler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Read stream section
|
|
||||||
//
|
|
||||||
|
|
||||||
private Handler<Void> readEndHandler = v -> {};
|
|
||||||
|
|
||||||
private Subscription readCoreSubscription;
|
|
||||||
|
|
||||||
private final AtomicBoolean fetchMode = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
|
|
||||||
sink.asFlux().subscribeOn(scheduler).subscribe(new CoreSubscriber<T>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(@NotNull Subscription s) {
|
|
||||||
readCoreSubscription = s;
|
|
||||||
if (!fetchMode.get()) {
|
|
||||||
readCoreSubscription.request(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(T t) {
|
|
||||||
handler.handle(t);
|
|
||||||
if (!fetchMode.get()) {
|
|
||||||
readCoreSubscription.request(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
exceptionHandler.handle(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
readEndHandler.handle(null);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> pause() {
|
|
||||||
fetchMode.set(true);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> resume() {
|
|
||||||
if (fetchMode.compareAndSet(true, false)) {
|
|
||||||
readCoreSubscription.request(1);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> fetch(long amount) {
|
|
||||||
if (fetchMode.get()) {
|
|
||||||
if (amount > 0) {
|
|
||||||
readCoreSubscription.request(amount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> endHandler(@io.vertx.codegen.annotations.Nullable Handler<Void> endHandler) {
|
|
||||||
this.readEndHandler = endHandler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Write stream section
|
|
||||||
//
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Future<Void> write(T data) {
|
|
||||||
return MonoUtils.emitNextFuture(sink, data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(T data, Handler<AsyncResult<Void>> handler) {
|
|
||||||
write(data).onComplete(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void end(Handler<AsyncResult<Void>> handler) {
|
|
||||||
MonoUtils.emitCompleteFuture(sink).onComplete(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.WriteStream<T> setWriteQueueMaxSize(int maxSize) {
|
|
||||||
if (maxSize <= maxBackpressureQueueSize) {
|
|
||||||
this.writeQueueMaxSize = maxSize;
|
|
||||||
} else {
|
|
||||||
logger.error("Failed to set writeQueueMaxSize to " + maxSize + ", because it's bigger than the max backpressure queue size " + maxBackpressureQueueSize);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean writeQueueFull() {
|
|
||||||
return writeQueueFull;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.WriteStream<T> drainHandler(@Nullable Handler<Void> handler) {
|
|
||||||
this.drainHandler = handler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class FluxReadStream<T> implements io.vertx.core.streams.ReadStream<T> {
|
|
||||||
|
|
||||||
private final Flux<T> flux;
|
|
||||||
private final Scheduler scheduler;
|
|
||||||
private Handler<Throwable> exceptionHandler = e -> {};
|
|
||||||
|
|
||||||
public FluxReadStream(Flux<T> flux, Scheduler scheduler) {
|
|
||||||
this.flux = flux;
|
|
||||||
this.scheduler = scheduler;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Flux<T> readAsFlux() {
|
|
||||||
return flux;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReactiveReactorReadStream<T> readAsStream() {
|
|
||||||
return new ReactiveReactorReadStream<>(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FluxReadStream<T> exceptionHandler(Handler<Throwable> handler) {
|
|
||||||
exceptionHandler = handler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
// Read stream section
|
|
||||||
//
|
|
||||||
|
|
||||||
private Handler<Void> readEndHandler = v -> {};
|
|
||||||
|
|
||||||
private Subscription readCoreSubscription;
|
|
||||||
|
|
||||||
private final AtomicBoolean fetchMode = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
@SuppressWarnings("DuplicatedCode")
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> handler(@io.vertx.codegen.annotations.Nullable Handler<T> handler) {
|
|
||||||
flux.subscribeOn(scheduler).subscribe(new CoreSubscriber<T>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onSubscribe(@NotNull Subscription s) {
|
|
||||||
readCoreSubscription = s;
|
|
||||||
if (!fetchMode.get()) {
|
|
||||||
readCoreSubscription.request(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onNext(T t) {
|
|
||||||
if (handler != null) {
|
|
||||||
handler.handle(t);
|
|
||||||
}
|
|
||||||
if (!fetchMode.get()) {
|
|
||||||
readCoreSubscription.request(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onError(Throwable t) {
|
|
||||||
exceptionHandler.handle(t);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onComplete() {
|
|
||||||
if (readEndHandler != null) {
|
|
||||||
readEndHandler.handle(null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> pause() {
|
|
||||||
fetchMode.set(true);
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> resume() {
|
|
||||||
if (fetchMode.compareAndSet(true, false)) {
|
|
||||||
readCoreSubscription.request(1);
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> fetch(long amount) {
|
|
||||||
if (fetchMode.get()) {
|
|
||||||
if (amount > 0) {
|
|
||||||
readCoreSubscription.request(amount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> endHandler(@io.vertx.codegen.annotations.Nullable Handler<Void> endHandler) {
|
|
||||||
this.readEndHandler = endHandler;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ReactiveSinkWriteStream<T> implements WriteStream<T> {
|
|
||||||
|
|
||||||
private final WriteStream<T> ws;
|
|
||||||
|
|
||||||
public ReactiveSinkWriteStream(SinkRWStream<T> ws) {
|
|
||||||
this.ws = WriteStream.newInstance(ws);
|
|
||||||
}
|
|
||||||
|
|
||||||
public io.vertx.core.streams.WriteStream<T> getDelegate() {
|
|
||||||
//noinspection unchecked
|
|
||||||
return ws.getDelegate();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public WriteStream<T> exceptionHandler(Handler<Throwable> handler) {
|
|
||||||
return ws.exceptionHandler(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(T data, Handler<AsyncResult<Void>> handler) {
|
|
||||||
ws.write(data, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void write(T data) {
|
|
||||||
ws.write(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Completable rxWrite(T data) {
|
|
||||||
return ws.rxWrite(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void end(Handler<AsyncResult<Void>> handler) {
|
|
||||||
ws.end(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void end() {
|
|
||||||
ws.end();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Completable rxEnd() {
|
|
||||||
return ws.rxEnd();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void end(T data, Handler<AsyncResult<Void>> handler) {
|
|
||||||
ws.end(data, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void end(T data) {
|
|
||||||
ws.end(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Completable rxEnd(T data) {
|
|
||||||
return ws.rxEnd(data);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public WriteStream<T> setWriteQueueMaxSize(int maxSize) {
|
|
||||||
return ws.setWriteQueueMaxSize(maxSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean writeQueueFull() {
|
|
||||||
return ws.writeQueueFull();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public WriteStream<T> drainHandler(Handler<Void> handler) {
|
|
||||||
return ws.drainHandler(handler);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ReactiveReactorReadStream<T> implements ReadStream<T> {
|
|
||||||
|
|
||||||
private final ReadStream<T> rs;
|
|
||||||
|
|
||||||
public ReactiveReactorReadStream(SinkRWStream<T> rws) {
|
|
||||||
this.rs = ReadStream.newInstance(rws);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReactiveReactorReadStream(FluxReadStream<T> rs) {
|
|
||||||
this.rs = ReadStream.newInstance(rs);
|
|
||||||
}
|
|
||||||
|
|
||||||
public ReactiveReactorReadStream(Flux<T> s, Scheduler scheduler) {
|
|
||||||
this.rs = ReadStream.newInstance(new FluxReadStream<>(s, scheduler));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public io.vertx.core.streams.ReadStream<T> getDelegate() {
|
|
||||||
//noinspection unchecked
|
|
||||||
return rs.getDelegate();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
|
|
||||||
return rs.exceptionHandler(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReadStream<T> handler(Handler<T> handler) {
|
|
||||||
return rs.handler(handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReadStream<T> pause() {
|
|
||||||
return rs.pause();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReadStream<T> resume() {
|
|
||||||
return rs.resume();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReadStream<T> fetch(long amount) {
|
|
||||||
return rs.fetch(amount);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ReadStream<T> endHandler(Handler<Void> endHandler) {
|
|
||||||
return rs.endHandler(endHandler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Pipe<T> pipe() {
|
|
||||||
return rs.pipe();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void pipeTo(WriteStream<T> dst, Handler<AsyncResult<Void>> handler) {
|
|
||||||
rs.pipeTo(dst, handler);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void pipeTo(WriteStream<T> dst) {
|
|
||||||
rs.pipeTo(dst);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Completable rxPipeTo(WriteStream<T> dst) {
|
|
||||||
return rs.rxPipeTo(dst);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Observable<T> toObservable() {
|
|
||||||
return rs.toObservable();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Flowable<T> toFlowable() {
|
|
||||||
return rs.toFlowable();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user