From a760bf6e14d21b4c124ebcc7cc3febcec67895e9 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 1 Jul 2021 20:08:57 +0200 Subject: [PATCH] Code cleanup --- .../it/tdlight/tdlibsession/EventBusFlux.java | 347 ---------- .../client/AsyncTdMiddleEventBusClient.java | 4 +- src/main/java/it/tdlight/utils/MonoUtils.java | 630 +----------------- 3 files changed, 3 insertions(+), 978 deletions(-) delete mode 100644 src/main/java/it/tdlight/tdlibsession/EventBusFlux.java diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java deleted file mode 100644 index 9c51bf4..0000000 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ /dev/null @@ -1,347 +0,0 @@ -package it.tdlight.tdlibsession; - -import io.vertx.core.eventbus.DeliveryOptions; -import io.vertx.core.eventbus.ReplyException; -import io.vertx.reactivex.core.eventbus.EventBus; -import io.vertx.reactivex.core.eventbus.Message; -import io.vertx.core.eventbus.MessageCodec; -import io.vertx.reactivex.core.eventbus.MessageConsumer; -import it.tdlight.utils.MonoUtils; -import java.net.ConnectException; -import java.time.Duration; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import org.jetbrains.annotations.Nullable; -import org.warp.commonutils.log.Logger; -import org.warp.commonutils.log.LoggerFactory; -import reactor.core.Disposable; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.One; -import reactor.core.scheduler.Schedulers; -import reactor.util.function.Tuple2; -import reactor.util.function.Tuples; - -public class EventBusFlux { - private static final Logger logger = LoggerFactory.getLogger(EventBusFlux.class); - - private static final byte[] EMPTY = new byte[0]; - - public static void registerFluxCodec(EventBus eventBus, MessageCodec itemsCodec) { - var signalsCodec = new SignalMessageCodec(itemsCodec); - try { - eventBus.registerCodec(signalsCodec); - } catch (IllegalStateException ex) { - if (!ex.getMessage().contains("Already a codec registered with name")) { - throw ex; - } - } - } - - /** - * If the flux is fast and you are on a network, please do this: - * - *
flux
-  .bufferTimeout(Duration.ofMillis(100))
-  .windowTimeout(1, Duration.ofSeconds(5))
-  .flatMap(w -> w.defaultIfEmpty(Collections.emptyList()))
- * - * @return tuple. T1 = flux served, T2 = error that caused cancelling of the subscription - */ - public static Tuple2, Mono> serve(Flux flux, - EventBus eventBus, - String fluxAddress, - DeliveryOptions baseDeliveryOptions, - MessageCodec itemsCodec, - Duration connectionTimeout) { - var signalsCodec = new SignalMessageCodec(itemsCodec); - EventBusFlux.registerFluxCodec(eventBus, itemsCodec); - var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) - .setSendTimeout(connectionTimeout.toMillis()); - var signalDeliveryOptions = new DeliveryOptions(deliveryOptions) - .setCodecName(signalsCodec.name()); - AtomicInteger subscriptionsCount = new AtomicInteger(); - One fatalErrorSink = Sinks.one(); - var servedMono = Mono.create(sink -> { - MessageConsumer subscribe = eventBus.consumer(fluxAddress + ".subscribe"); - - subscribe.handler(msg -> { - subscribe.unregister(subscribeUnregistered -> { - if (subscribeUnregistered.succeeded()) { - if (subscriptionsCount.incrementAndGet() > 1) { - subscriptionsCount.decrementAndGet(); - logger.error("Another client tried to connect to the same flux. Rejecting the request."); - msg.fail(500, "This flux is already in use!"); - return; - } - long subscriptionId = 0; - var subscriptionAddress = fluxAddress + "." + subscriptionId; - - MessageConsumer subscriptionReady = eventBus.consumer(subscriptionAddress + ".subscriptionReady"); - MessageConsumer dispose = eventBus.consumer(subscriptionAddress + ".dispose"); - MessageConsumer ping = eventBus.consumer(subscriptionAddress + ".ping"); - MessageConsumer cancel = eventBus.consumer(subscriptionAddress + ".cancel"); - - subscriptionReady.handler(subscriptionReadyMsg -> { - subscriptionReady.unregister(subscriptionReadyUnregistered -> { - if (subscriptionReadyUnregistered.succeeded()) { - AtomicReference atomicSubscription = new AtomicReference<>(null); - var subscription = flux - .onErrorResume(error -> Mono - .>create(errorSink -> { - var responseHandler = MonoUtils.toHandler(errorSink); - eventBus.request(subscriptionAddress + ".signal", SignalMessage.onError(error), signalDeliveryOptions, responseHandler); - }) - .then(Mono.empty()) - ) - .flatMapSequential(item -> Mono.>create(itemSink -> { - var responseHandler = MonoUtils.toHandler(itemSink); - eventBus.request(subscriptionAddress + ".signal", SignalMessage.onNext(item), signalDeliveryOptions, responseHandler); - })) - .subscribeOn(Schedulers.parallel()) - .subscribe(response -> {}, error -> { - if (error instanceof ReplyException) { - var errorMessageCode = ((ReplyException) error).failureCode(); - // -1 == NO_HANDLERS - if (errorMessageCode == -1) { - logger.error("Can't send a signal of flux \"" + fluxAddress + "\" because the connection was lost"); - } else { - logger.error("Error when sending a signal of flux \"" + fluxAddress + "\": {}", error.toString()); - } - } else { - logger.error("Error when sending a signal of flux \"" + fluxAddress + "\"", error); - } - fatalErrorSink.tryEmitValue(error); - disposeFlux(atomicSubscription.get(), - fatalErrorSink, - subscriptionReady, - ping, - cancel, - dispose, - fluxAddress, - () -> { - logger.warn("Forcefully disposed \"" + fluxAddress + "\" caused by the previous error"); - } - ); - }, () -> { - eventBus.request(subscriptionAddress + ".signal", SignalMessage.onComplete(), signalDeliveryOptions, msg2 -> { - logger.info("Completed flux \"" + fluxAddress + "\""); - if (msg2.failed()) { - logger.error("Failed to send onComplete signal", msg2.cause()); - fatalErrorSink.tryEmitValue(msg2.cause()); - } - }); - }); - atomicSubscription.set(subscription); - - ping.handler(msg2 -> { - logger.trace("Client is still alive"); - msg2.reply(EMPTY, deliveryOptions); - }); - - cancel.handler(msg2 -> { - logger.trace("Cancelling flux \"" + fluxAddress + "\""); - subscription.dispose(); - logger.debug("Cancelled flux \"" + fluxAddress + "\""); - msg2.reply(EMPTY, deliveryOptions); - }); - - dispose.handler(msg2 -> { - disposeFlux(subscription, - fatalErrorSink, - subscriptionReady, - ping, - cancel, - dispose, - fluxAddress, - () -> msg2.reply(EMPTY) - ); - }); - - ping.completionHandler(h0 -> { - if (h0.succeeded()) { - cancel.completionHandler(h1 -> { - if (h1.succeeded()) { - dispose.completionHandler(h2 -> { - if (h2.succeeded()) { - subscriptionReadyMsg.reply((Long) subscriptionId); - } else { - logger.error("Failed to register dispose", h1.cause()); - subscriptionReadyMsg.fail(500, "Failed to register dispose"); - } - }); - } else { - logger.error("Failed to register cancel", h1.cause()); - subscriptionReadyMsg.fail(500, "Failed to register cancel"); - } - }); - } else { - logger.error("Failed to register ping", h0.cause()); - subscriptionReadyMsg.fail(500, "Failed to register ping"); - } - }); - } else { - logger.error("Failed to unregister \"subscription ready\""); - } - }); - }); - - subscriptionReady.completionHandler(srh -> { - if (srh.succeeded()) { - msg.reply((Long) subscriptionId); - } else { - logger.error("Failed to register \"subscription ready\"", srh.cause()); - msg.fail(500, "Failed to register \"subscription ready\""); - } - }); - } else { - logger.error("Failed to unregister subscribe consumer"); - } - }); - }); - - subscribe.completionHandler(h -> { - if (h.failed()) { - sink.error(h.cause()); - } else { - sink.success(); - } - }); - }).publishOn(Schedulers.parallel()).share(); - - return Tuples.of(servedMono, fatalErrorSink.asMono()); - } - - private static void disposeFlux(@Nullable Disposable subscription, - One fatalErrorSink, - MessageConsumer subscriptionReady, - MessageConsumer ping, - MessageConsumer cancel, - MessageConsumer dispose, - String fluxAddress, - Runnable after) { - logger.trace("Disposing flux \"" + fluxAddress + "\""); - fatalErrorSink.tryEmitEmpty(); - if (subscription != null) { - subscription.dispose(); - } - subscriptionReady.unregister(v0 -> { - if (v0.failed()) { - logger.error("Failed to unregister subscriptionReady", v0.cause()); - } - ping.unregister(v1 -> { - if (v1.failed()) { - logger.error("Failed to unregister ping", v1.cause()); - } - cancel.unregister(v2 -> { - if (v2.failed()) { - logger.error("Failed to unregister cancel", v2.cause()); - } - dispose.unregister(v3 -> { - if (v3.failed()) { - logger.error("Failed to unregister dispose", v3.cause()); - } - logger.debug("Disposed flux \"" + fluxAddress + "\""); - after.run(); - }); - }); - }); - }); - } - - public static Flux connect(EventBus eventBus, - String fluxAddress, - DeliveryOptions baseDeliveryOptions, - MessageCodec itemsCodec, - Duration connectionTimeout) { - EventBusFlux.registerFluxCodec(eventBus, itemsCodec); - return Flux.create(emitter -> { - var deliveryOptions = new DeliveryOptions(baseDeliveryOptions) - .setSendTimeout(connectionTimeout.toMillis()); - eventBus.request(fluxAddress + ".subscribe", EMPTY, deliveryOptions, msg -> { - if (msg.succeeded()) { - long subscriptionId = msg.result().body(); - var subscriptionAddress = fluxAddress + "." + subscriptionId; - - var signalConsumer = eventBus.>consumer(subscriptionAddress + ".signal"); - signalConsumer.handler(msg2 -> { - var signal = msg2.body(); - switch (signal.getSignalType()) { - case ITEM: - emitter.next(signal.getItem()); - break; - case ERROR: - emitter.error(new Exception(signal.getErrorMessage())); - break; - case COMPLETE: - emitter.complete(); - break; - } - msg2.reply(EMPTY); - }); - signalConsumer.completionHandler(h -> { - if (h.succeeded()) { - eventBus.request(subscriptionAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> { - if (msg2.failed()) { - logger.error("Failed to tell that the subscription is ready"); - } - }); - } else { - emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause())); - } - }); - - var pingSubscription = Flux.interval(Duration.ofSeconds(10)).flatMapSequential(n -> Mono.create(pingSink -> - eventBus.request(subscriptionAddress + ".ping", EMPTY, deliveryOptions, pingMsg -> { - if (pingMsg.succeeded()) { - pingSink.success(pingMsg.result().body()); - } else { - var pingError = pingMsg.cause(); - if (pingError instanceof ReplyException) { - var pingReplyException = (ReplyException) pingError; - // -1 = NO_HANDLERS - if (pingReplyException.failureCode() == -1) { - pingSink.error(new ConnectException( "Can't send a ping to flux \"" + fluxAddress + "\" because the connection was lost")); - } else { - pingSink.error(new ConnectException("Ping failed: " + pingReplyException.toString())); - } - } else { - pingSink.error(new IllegalStateException("Ping failed: " + pingError.getMessage())); - } - } - }))) - .publishOn(Schedulers.boundedElastic()) - .onBackpressureBuffer() - .subscribeOn(Schedulers.parallel()) - .subscribe(v -> {}, emitter::error); - - emitter.onDispose(() -> { - if (!pingSubscription.isDisposed()) { - pingSubscription.dispose(); - } - eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> { - if (msg.failed()) { - logger.error("Failed to tell that the subscription is disposed"); - } - }); - }); - - emitter.onCancel(() -> { - if (!pingSubscription.isDisposed()) { - pingSubscription.dispose(); - } - eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> { - if (msg.failed()) { - logger.error("Failed to tell that the subscription is cancelled"); - } - }); - }); - } else { - emitter.error(new IllegalStateException("Subscription failed", msg.cause())); - } - }); - }); - } - -} diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index c1853f7..56dab26 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -171,7 +171,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .rxRequest(botAddress + ".ping", EMPTY, pingDeliveryOptions) .as(MonoUtils::toMono); }) - .flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic())) + .flatMap(msg -> Mono.fromCallable(msg::body).subscribeOn(Schedulers.boundedElastic())) .repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true)) .takeUntilOther(Mono.firstWithSignal(this.updatesStreamEnd.asMono().doOnTerminate(() -> { logger.trace("About to kill pinger because updates stream ended"); @@ -295,7 +295,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { case TdApi.AuthorizationStateClosed.CONSTRUCTOR: return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) - .flatMap(latestBinlogMsg -> Mono.fromCallable(() -> latestBinlogMsg.body()).subscribeOn(Schedulers.parallel())) + .flatMap(latestBinlogMsg -> Mono.fromCallable(latestBinlogMsg::body).subscribeOn(Schedulers.parallel())) .doOnNext(latestBinlog -> logger.info("Received binlog from server. Size: " + BinlogUtils.humanReadableByteCountBin(latestBinlog.binlog().length()))) .flatMap(latestBinlog -> this.saveBinlog(latestBinlog.binlog())) .doOnSuccess(s -> logger.info("Overwritten binlog from server")) diff --git a/src/main/java/it/tdlight/utils/MonoUtils.java b/src/main/java/it/tdlight/utils/MonoUtils.java index 3140f30..da6a6ab 100644 --- a/src/main/java/it/tdlight/utils/MonoUtils.java +++ b/src/main/java/it/tdlight/utils/MonoUtils.java @@ -62,54 +62,6 @@ public class MonoUtils { }); } - public static Handler> toHandler(SynchronousSink sink) { - return event -> { - if (event.succeeded()) { - if (event.result() == null) { - sink.complete(); - } else { - sink.next(Objects.requireNonNull(event.result())); - } - } else { - sink.error(event.cause()); - } - }; - } - - public static Handler> toHandler(MonoSink sink) { - return event -> { - if (event.succeeded()) { - if (event.result() == null) { - sink.success(); - } else { - sink.success(Objects.requireNonNull(event.result())); - } - } else { - sink.error(event.cause()); - } - }; - } - - public static SynchronousSink toSink(Context context, Promise promise) { - return PromiseSink.of(context, promise); - } - - public static Mono executeAsFuture(Consumer>> action) { - return Mono.fromFuture(() -> { - return CompletableFutureUtils.getCompletableFuture(() -> { - var resultFuture = new CompletableFuture(); - action.accept(handler -> { - if (handler.failed()) { - resultFuture.completeExceptionally(handler.cause()); - } else { - resultFuture.complete(handler.result()); - } - }); - return resultFuture; - }); - }); - } - public static Mono fromBlockingMaybe(Callable callable) { return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic()); } @@ -125,30 +77,6 @@ public class MonoUtils { return fromBlockingMaybe(callable).single(); } - public static CoreSubscriber toSubscriber(Promise promise) { - return new CoreSubscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - - @Override - public void onNext(T t) { - promise.complete(t); - } - - @Override - public void onError(Throwable t) { - promise.fail(t); - } - - @Override - public void onComplete() { - promise.tryComplete(); - } - }; - } - public static void orElseThrowFuture(TdResult value, SynchronousSink> sink) { if (value.succeeded()) { sink.next(CompletableFuture.completedFuture(value.result())); @@ -203,42 +131,8 @@ public class MonoUtils { })).retry(); } - public static Mono fromFuture(CompletableFuture future) { - return Mono.create(sink -> { - future.whenComplete((result, error) -> { - if (error != null) { - sink.error(error); - } else if (result != null) { - sink.success(result); - } else { - sink.success(); - } - }); - }); - } - - public static Mono fromFuture(Supplier> future) { - return Mono.create(sink -> { - CompletableFutureUtils.getCompletableFuture(future).whenComplete((result, error) -> { - if (error != null) { - sink.error(error.getCause()); - } else if (result != null) { - sink.success(result); - } else { - sink.success(); - } - }); - }); - } - - public static CompletableFuture toFuture(Mono mono) { - var cf = new CompletableFuture(); - mono.subscribe(cf::complete, cf::completeExceptionally, () -> cf.complete(null)); - return cf; - } - public static Mono toMono(Future future) { - return Mono.create(sink -> future.onComplete(result -> { + return Mono.create(sink -> future.onComplete(result -> { if (result.succeeded()) { sink.success(result.result()); } else { @@ -325,65 +219,11 @@ public class MonoUtils { return fromEmitResultFuture(sink.tryEmitEmpty()); } - public static Mono> unicastBackpressureSinkStream(Scheduler scheduler) { - Many sink = Sinks.many().unicast().onBackpressureBuffer(); - return asStream(sink, scheduler, null, null, 1); - } - - /** - * Create a sink that can be written from a writeStream - */ - public static Mono> unicastBackpressureStream(Scheduler scheduler, int maxBackpressureQueueSize) { - Queue boundedQueue = Queues.get(maxBackpressureQueueSize).get(); - var queueSize = Flux - .interval(Duration.ZERO, Duration.ofMillis(500)) - .map(n -> boundedQueue.size()); - Empty termination = Sinks.empty(); - Many sink = Sinks.many().unicast().onBackpressureBuffer(boundedQueue, termination::tryEmitEmpty); - return asStream(sink, scheduler, queueSize, termination, maxBackpressureQueueSize); - } - - public static Mono> unicastBackpressureErrorStream(Scheduler scheduler) { - Many sink = Sinks.many().unicast().onBackpressureError(); - return asStream(sink, scheduler, null, null, 1); - } - - public static Mono> asStream(Many sink, - Scheduler scheduler, - @Nullable Flux backpressureSize, - @Nullable Empty termination, - int maxBackpressureQueueSize) { - return SinkRWStream.create(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize); - } - - private static Future toVertxFuture(Mono toTransform) { - var promise = Promise.promise(); - toTransform.subscribeOn(Schedulers.parallel()).subscribe(next -> {}, promise::fail, promise::complete); - return promise.future(); - } - @SuppressWarnings({"unchecked", "rawtypes"}) public static Mono castVoid(Mono mono) { return (Mono) mono; } - /** - * This method fails to guarantee that the consumer gets registered on all clusters before returning. - * Use fromConsumerAdvanced if you want better stability. - */ - @Deprecated - public static Flux fromConsumerUnsafe(MessageConsumer messageConsumer) { - return Flux.>create(sink -> { - messageConsumer.endHandler(e -> sink.complete()); - sink.onDispose(messageConsumer::unregister); - }) - //.startWith(MonoUtils.castVoid(messageConsumer.rxCompletionHandler().as(MonoUtils::toMono))) - .flatMapSequential(msg -> Mono - .fromCallable(msg::body) - .subscribeOn(Schedulers.parallel()) - ); - } - public static Flux fromMessageConsumer(Mono onRegistered, MessageConsumer messageConsumer) { return fromReplyableMessageConsumer(onRegistered, messageConsumer).map(Message::body); } @@ -447,472 +287,4 @@ public class MonoUtils { .defaultIfEmpty(false); } - public static class SinkRWStream implements io.vertx.core.streams.WriteStream, io.vertx.core.streams.ReadStream { - - private final Many sink; - private final Scheduler scheduler; - private final Flux backpressureSize; - private final Empty termination; - private Handler exceptionHandler = e -> {}; - private Handler drainHandler = h -> {}; - private final int maxBackpressureQueueSize; - private volatile int writeQueueMaxSize; - private volatile boolean writeQueueFull = false; - - private SinkRWStream(Many sink, - Scheduler scheduler, - @Nullable Flux backpressureSize, - @Nullable Empty termination, - int maxBackpressureQueueSize) { - this.maxBackpressureQueueSize = maxBackpressureQueueSize; - this.writeQueueMaxSize = this.maxBackpressureQueueSize; - this.backpressureSize = backpressureSize; - this.termination = termination; - this.sink = sink; - this.scheduler = scheduler; - } - - public Mono> initialize() { - return Mono.fromCallable(() -> { - if (backpressureSize != null) { - AtomicBoolean drained = new AtomicBoolean(true); - var drainSubscription = backpressureSize - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(size -> { - writeQueueFull = size >= this.writeQueueMaxSize; - - boolean newDrained = size <= this.writeQueueMaxSize / 2; - boolean oldDrained = drained.getAndSet(newDrained); - if (newDrained && !oldDrained) { - drainHandler.handle(null); - } - }, ex -> { - exceptionHandler.handle(ex); - }, () -> { - if (!drained.get()) { - drainHandler.handle(null); - } - }); - if (termination != null) { - termination - .asMono() - .doOnTerminate(drainSubscription::dispose) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe(); - } - } - - return this; - }).subscribeOn(Schedulers.boundedElastic()); - } - - public static Mono> create(Many sink, - Scheduler scheduler, - @Nullable Flux backpressureSize, - @Nullable Empty termination, - int maxBackpressureQueueSize) { - return new SinkRWStream(sink, scheduler, backpressureSize, termination, maxBackpressureQueueSize).initialize(); - } - - public Flux readAsFlux() { - return sink.asFlux(); - } - - public ReactiveReactorReadStream readAsStream() { - return new ReactiveReactorReadStream<>(this); - } - - public Many writeAsSink() { - return sink; - } - - public ReactiveSinkWriteStream writeAsStream() { - return new ReactiveSinkWriteStream<>(this); - } - - @Override - public SinkRWStream exceptionHandler(Handler handler) { - exceptionHandler = handler; - return this; - } - - // - // Read stream section - // - - private Handler readEndHandler = v -> {}; - - private Subscription readCoreSubscription; - - private final AtomicBoolean fetchMode = new AtomicBoolean(false); - - @Override - public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - sink.asFlux().subscribeOn(scheduler).subscribe(new CoreSubscriber() { - - @Override - public void onSubscribe(@NotNull Subscription s) { - readCoreSubscription = s; - if (!fetchMode.get()) { - readCoreSubscription.request(1); - } - } - - @Override - public void onNext(T t) { - handler.handle(t); - if (!fetchMode.get()) { - readCoreSubscription.request(1); - } - } - - @Override - public void onError(Throwable t) { - exceptionHandler.handle(t); - } - - @Override - public void onComplete() { - readEndHandler.handle(null); - } - }); - return this; - } - - @Override - public io.vertx.core.streams.ReadStream pause() { - fetchMode.set(true); - return this; - } - - @Override - public io.vertx.core.streams.ReadStream resume() { - if (fetchMode.compareAndSet(true, false)) { - readCoreSubscription.request(1); - } - return this; - } - - @Override - public io.vertx.core.streams.ReadStream fetch(long amount) { - if (fetchMode.get()) { - if (amount > 0) { - readCoreSubscription.request(amount); - } - } - return this; - } - - @Override - public io.vertx.core.streams.ReadStream endHandler(@io.vertx.codegen.annotations.Nullable Handler endHandler) { - this.readEndHandler = endHandler; - return this; - } - - // - // Write stream section - // - - @Override - public Future write(T data) { - return MonoUtils.emitNextFuture(sink, data); - } - - @Override - public void write(T data, Handler> handler) { - write(data).onComplete(handler); - } - - @Override - public void end(Handler> handler) { - MonoUtils.emitCompleteFuture(sink).onComplete(handler); - } - - @Override - public io.vertx.core.streams.WriteStream setWriteQueueMaxSize(int maxSize) { - if (maxSize <= maxBackpressureQueueSize) { - this.writeQueueMaxSize = maxSize; - } else { - logger.error("Failed to set writeQueueMaxSize to " + maxSize + ", because it's bigger than the max backpressure queue size " + maxBackpressureQueueSize); - } - return this; - } - - @Override - public boolean writeQueueFull() { - return writeQueueFull; - } - - @Override - public io.vertx.core.streams.WriteStream drainHandler(@Nullable Handler handler) { - this.drainHandler = handler; - return this; - } - } - - public static class FluxReadStream implements io.vertx.core.streams.ReadStream { - - private final Flux flux; - private final Scheduler scheduler; - private Handler exceptionHandler = e -> {}; - - public FluxReadStream(Flux flux, Scheduler scheduler) { - this.flux = flux; - this.scheduler = scheduler; - } - - public Flux readAsFlux() { - return flux; - } - - public ReactiveReactorReadStream readAsStream() { - return new ReactiveReactorReadStream<>(this); - } - - @Override - public FluxReadStream exceptionHandler(Handler handler) { - exceptionHandler = handler; - return this; - } - - // - // Read stream section - // - - private Handler readEndHandler = v -> {}; - - private Subscription readCoreSubscription; - - private final AtomicBoolean fetchMode = new AtomicBoolean(false); - - @SuppressWarnings("DuplicatedCode") - @Override - public io.vertx.core.streams.ReadStream handler(@io.vertx.codegen.annotations.Nullable Handler handler) { - flux.subscribeOn(scheduler).subscribe(new CoreSubscriber() { - - @Override - public void onSubscribe(@NotNull Subscription s) { - readCoreSubscription = s; - if (!fetchMode.get()) { - readCoreSubscription.request(1); - } - } - - @Override - public void onNext(T t) { - if (handler != null) { - handler.handle(t); - } - if (!fetchMode.get()) { - readCoreSubscription.request(1); - } - } - - @Override - public void onError(Throwable t) { - exceptionHandler.handle(t); - } - - @Override - public void onComplete() { - if (readEndHandler != null) { - readEndHandler.handle(null); - } - } - }); - return this; - } - - @Override - public io.vertx.core.streams.ReadStream pause() { - fetchMode.set(true); - return this; - } - - @Override - public io.vertx.core.streams.ReadStream resume() { - if (fetchMode.compareAndSet(true, false)) { - readCoreSubscription.request(1); - } - return this; - } - - @Override - public io.vertx.core.streams.ReadStream fetch(long amount) { - if (fetchMode.get()) { - if (amount > 0) { - readCoreSubscription.request(amount); - } - } - return this; - } - - @Override - public io.vertx.core.streams.ReadStream endHandler(@io.vertx.codegen.annotations.Nullable Handler endHandler) { - this.readEndHandler = endHandler; - return this; - } - } - - public static class ReactiveSinkWriteStream implements WriteStream { - - private final WriteStream ws; - - public ReactiveSinkWriteStream(SinkRWStream ws) { - this.ws = WriteStream.newInstance(ws); - } - - public io.vertx.core.streams.WriteStream getDelegate() { - //noinspection unchecked - return ws.getDelegate(); - } - - @Override - public WriteStream exceptionHandler(Handler handler) { - return ws.exceptionHandler(handler); - } - - @Override - public void write(T data, Handler> handler) { - ws.write(data, handler); - } - - @Override - public void write(T data) { - ws.write(data); - } - - @Override - public Completable rxWrite(T data) { - return ws.rxWrite(data); - } - - @Override - public void end(Handler> handler) { - ws.end(handler); - } - - @Override - public void end() { - ws.end(); - } - - @Override - public Completable rxEnd() { - return ws.rxEnd(); - } - - @Override - public void end(T data, Handler> handler) { - ws.end(data, handler); - } - - @Override - public void end(T data) { - ws.end(data); - } - - @Override - public Completable rxEnd(T data) { - return ws.rxEnd(data); - } - - @Override - public WriteStream setWriteQueueMaxSize(int maxSize) { - return ws.setWriteQueueMaxSize(maxSize); - } - - @Override - public boolean writeQueueFull() { - return ws.writeQueueFull(); - } - - @Override - public WriteStream drainHandler(Handler handler) { - return ws.drainHandler(handler); - } - } - - public static class ReactiveReactorReadStream implements ReadStream { - - private final ReadStream rs; - - public ReactiveReactorReadStream(SinkRWStream rws) { - this.rs = ReadStream.newInstance(rws); - } - - public ReactiveReactorReadStream(FluxReadStream rs) { - this.rs = ReadStream.newInstance(rs); - } - - public ReactiveReactorReadStream(Flux s, Scheduler scheduler) { - this.rs = ReadStream.newInstance(new FluxReadStream<>(s, scheduler)); - } - - @Override - public io.vertx.core.streams.ReadStream getDelegate() { - //noinspection unchecked - return rs.getDelegate(); - } - - @Override - public ReadStream exceptionHandler(Handler handler) { - return rs.exceptionHandler(handler); - } - - @Override - public ReadStream handler(Handler handler) { - return rs.handler(handler); - } - - @Override - public ReadStream pause() { - return rs.pause(); - } - - @Override - public ReadStream resume() { - return rs.resume(); - } - - @Override - public ReadStream fetch(long amount) { - return rs.fetch(amount); - } - - @Override - public ReadStream endHandler(Handler endHandler) { - return rs.endHandler(endHandler); - } - - @Override - public Pipe pipe() { - return rs.pipe(); - } - - @Override - public void pipeTo(WriteStream dst, Handler> handler) { - rs.pipeTo(dst, handler); - } - - @Override - public void pipeTo(WriteStream dst) { - rs.pipeTo(dst); - } - - @Override - public Completable rxPipeTo(WriteStream dst) { - return rs.rxPipeTo(dst); - } - - @Override - public Observable toObservable() { - return rs.toObservable(); - } - - @Override - public Flowable toFlowable() { - return rs.toFlowable(); - } - } }