diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index ae501b5..deedc89 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -23,16 +23,16 @@ import it.tdlight.tdlibsession.td.middle.TdOptionalList; import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.utils.MonoUtils; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @@ -49,8 +49,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { protected final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); protected AsyncTdDirectImpl td; - private Flux>>> updates; - private Scheduler getUpdatesScheduler; + protected final Queue>> queue = Queues.>>unbounded().get(); @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { @@ -81,7 +80,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { throw new IllegalArgumentException("local is not set!"); } this.local = local; - this.getUpdatesScheduler = Schedulers.newSingle("GetUpdates_" + botAddress); this.td = new AsyncTdDirectImpl(botAlias); cluster.getEventBus().consumer(botAddress + ".ping", (Message msg) -> { @@ -142,50 +140,54 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { .from(tdClosed) .single() .filter(tdClosedVal -> !tdClosedVal) - .flatMap(closed -> updates.take(1) - .take(Duration.ofSeconds(2)) // If 2 seconds pass without the batch, return empty to reply before the timeout - .singleOrEmpty() - ) - .flatMapMany(it -> Flux.fromIterable(it)) - .flatMap(result -> { - if (result.succeeded()) { - var received = result.result(); - if (OUTPUT_REQUESTS) { - System.out.println("<=: " + received - .toString() - .replace("\n", " ") - .replace("\t", "") - .replace(" ", "") - .replace(" = ", "=")); - } - return Mono.create(sink -> { - if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - var authState = (UpdateAuthorizationState) received.result(); - if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { - tdClosed.onNext(true); - vertx.undeploy(deploymentID(), undeployed -> { - if (undeployed.failed()) { - logger.error("Error when undeploying td verticle", undeployed.cause()); - } + .map(_v -> { + ArrayList>> updatesBatch = new ArrayList<>(); + while (!queue.isEmpty() && updatesBatch.size() < 1000) { + var item = queue.poll(); + if (item == null) break; + updatesBatch.add(item); + } + return updatesBatch; + }) + .flatMap(receivedList -> { + return Flux.fromIterable(receivedList).flatMap(result -> { + if (result.succeeded()) { + var received = result.result(); + if (OUTPUT_REQUESTS) { + System.out.println("<=: " + received + .toString() + .replace("\n", " ") + .replace("\t", "") + .replace(" ", "") + .replace(" = ", "=")); + } + return Mono.create(sink -> { + if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var authState = (UpdateAuthorizationState) received.result(); + if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { + tdClosed.onNext(true); + vertx.undeploy(deploymentID(), undeployed -> { + if (undeployed.failed()) { + logger.error("Error when undeploying td verticle", undeployed.cause()); + } + sink.success(); + }); + } else { sink.success(); - }); + } } else { sink.success(); } - } else { - sink.success(); - } - }).then(Mono.>create(sink -> { - sink.success(received); - })); - } else { - logger.error("Received an error update", result.cause()); - return Mono.empty(); - } - }).collectList().map(list -> new TdOptionalList(true, list)) - + }).then(Mono.>create(sink -> { + sink.success(received); + })); + } else { + logger.error("Received an error update", result.cause()); + return Mono.empty(); + } + }).collectList().map(list -> new TdOptionalList(true, list)); + }) .defaultIfEmpty(new TdOptionalList(false, Collections.emptyList())) - .subscribeOn(getUpdatesScheduler) .subscribe(v -> { msg.reply(v); }, ex -> { @@ -223,7 +225,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { sink.error(ex); } }) - .subscribeOn(Schedulers.single()) .subscribe(response -> {}, ex -> { logger.error("Error when processing a request", ex); msg.fail(500, ex.getLocalizedMessage()); @@ -239,12 +240,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private Mono pipe() { return Mono.fromCallable(() -> { - this.updates = td + td .getUpdates(WAIT_DURATION, 1000) - .onBackpressureBuffer() - .bufferTimeout(1000, Duration.ofMillis(300)) - .filter(asyncResults -> !asyncResults.isEmpty()) - .share(); + .bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100)) + .subscribe(nextItems -> { + queue.addAll(nextItems); + }); return (Void) null; }); }