diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index deedc89..ae501b5 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -23,16 +23,16 @@ import it.tdlight.tdlibsession.td.middle.TdOptionalList; import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.utils.MonoUtils; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; -import java.util.Queue; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; -import reactor.util.concurrent.Queues; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @@ -49,7 +49,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { protected final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); protected AsyncTdDirectImpl td; - protected final Queue>> queue = Queues.>>unbounded().get(); + private Flux>>> updates; + private Scheduler getUpdatesScheduler; @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { @@ -80,6 +81,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { throw new IllegalArgumentException("local is not set!"); } this.local = local; + this.getUpdatesScheduler = Schedulers.newSingle("GetUpdates_" + botAddress); this.td = new AsyncTdDirectImpl(botAlias); cluster.getEventBus().consumer(botAddress + ".ping", (Message msg) -> { @@ -140,54 +142,50 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .from(tdClosed) .single() .filter(tdClosedVal -> !tdClosedVal) - .map(_v -> { - ArrayList>> updatesBatch = new ArrayList<>(); - while (!queue.isEmpty() && updatesBatch.size() < 1000) { - var item = queue.poll(); - if (item == null) break; - updatesBatch.add(item); - } - return updatesBatch; - }) - .flatMap(receivedList -> { - return Flux.fromIterable(receivedList).flatMap(result -> { - if (result.succeeded()) { - var received = result.result(); - if (OUTPUT_REQUESTS) { - System.out.println("<=: " + received - .toString() - .replace("\n", " ") - .replace("\t", "") - .replace(" ", "") - .replace(" = ", "=")); - } - return Mono.create(sink -> { - if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - var authState = (UpdateAuthorizationState) received.result(); - if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - tdClosed.onNext(true); - vertx.undeploy(deploymentID(), undeployed -> { - if (undeployed.failed()) { - logger.error("Error when undeploying td verticle", undeployed.cause()); - } - sink.success(); - }); - } else { + .flatMap(closed -> updates.take(1) + .take(Duration.ofSeconds(2)) // If 2 seconds pass without the batch, return empty to reply before the timeout + .singleOrEmpty() + ) + .flatMapMany(it -> Flux.fromIterable(it)) + .flatMap(result -> { + if (result.succeeded()) { + var received = result.result(); + if (OUTPUT_REQUESTS) { + System.out.println("<=: " + received + .toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + return Mono.create(sink -> { + if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var authState = (UpdateAuthorizationState) received.result(); + if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + tdClosed.onNext(true); + vertx.undeploy(deploymentID(), undeployed -> { + if (undeployed.failed()) { + logger.error("Error when undeploying td verticle", undeployed.cause()); + } sink.success(); - } + }); } else { sink.success(); } - }).then(Mono.>create(sink -> { - sink.success(received); - })); - } else { - logger.error("Received an error update", result.cause()); - return Mono.empty(); - } - }).collectList().map(list -> new TdOptionalList(true, list)); - }) + } else { + sink.success(); + } + }).then(Mono.>create(sink -> { + sink.success(received); + })); + } else { + logger.error("Received an error update", result.cause()); + return Mono.empty(); + } + }).collectList().map(list -> new TdOptionalList(true, list)) + .defaultIfEmpty(new TdOptionalList(false, Collections.emptyList())) + .subscribeOn(getUpdatesScheduler) .subscribe(v -> { msg.reply(v); }, ex -> { @@ -225,6 +223,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { sink.error(ex); } }) + .subscribeOn(Schedulers.single()) .subscribe(response -> {}, ex -> { logger.error("Error when processing a request", ex); msg.fail(500, ex.getLocalizedMessage()); @@ -240,12 +239,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private Mono pipe() { return Mono.fromCallable(() -> { - td + this.updates = td .getUpdates(WAIT_DURATION, 1000) - .bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100)) - .subscribe(nextItems -> { - queue.addAll(nextItems); - }); + .onBackpressureBuffer() + .bufferTimeout(1000, Duration.ofMillis(300)) + .filter(asyncResults -> !asyncResults.isEmpty()) + .share(); return (Void) null; }); }