From b2a41727a6e7aaace8d9339ad8377b825878a5a0 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 22 Oct 2020 22:01:05 +0200 Subject: [PATCH] Remove the busy wait between client and server with lots of resurces wasted for empty replies --- pom.xml | 2 +- .../server/AsyncTdMiddleEventBusServer.java | 76 ++++++++++++++----- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index 1a1ce75..9e508c3 100644 --- a/pom.xml +++ b/pom.xml @@ -102,7 +102,7 @@ it.tdlight tdlight-java - 3.169.67 + 3.169.68 it.cavallium diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 8e174c2..1c7b0af 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -26,14 +26,16 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; -import reactor.util.concurrent.Queues; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; public class AsyncTdMiddleEventBusServer extends AbstractVerticle { @@ -41,6 +43,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private static final byte[] EMPTY = new byte[0]; // todo: restore duration to 2 seconds instead of 10 millis, when the bug of tdlight double queue wait is fixed public static final Duration WAIT_DURATION = Duration.ofSeconds(1);// Duration.ofMillis(10); + // If you enable this the poll will wait up to 100 additional milliseconds between each poll, if the server is remote + private static final boolean ENABLE_MINIMUM_POLL_WAIT_INTERVAL = false; private final TdClusterManager cluster; @@ -50,11 +54,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { protected final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false); protected AsyncTdDirectImpl td; - protected final Queue>> queue = Queues.>>unbounded().get(); + protected final LinkedBlockingQueue>> queue = new LinkedBlockingQueue<>(); + private final Scheduler tdSrvPoll; @SuppressWarnings({"unchecked", "rawtypes"}) public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { this.cluster = clusterManager; + this.tdSrvPoll = Schedulers.newSingle("TdSrvPoll"); if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) { cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec()); cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec()); @@ -137,21 +143,54 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { private Mono listen() { return Mono.create(registrationSink -> { cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message msg) -> { - Mono - .from(tdClosed) - .single() - .filter(tdClosedVal -> !tdClosedVal) - .flatMap(_v -> Mono.>>>create(sink -> { - sink.onRequest((l) -> { - ArrayList>> updatesBatch = new ArrayList<>(); - while (!queue.isEmpty() && updatesBatch.size() < 1000) { - var item = queue.poll(); - if (item == null) break; - updatesBatch.add(item); - } - sink.success(updatesBatch); - }); - })) + // Run only if tdlib is not closed + Mono.from(tdClosed).single().filter(tdClosedVal -> !tdClosedVal) + // Get a list of updates + .flatMap(_v -> Mono + .>>>fromSupplier(() -> { + // When a request is asked, read up to 1000 available updates in the queue + long requestTime = System.currentTimeMillis(); + ArrayList>> updatesBatch = new ArrayList<>(); + try { + // Block until an update is found or 5 seconds passed + var item = queue.poll(5, TimeUnit.SECONDS); + if (item != null) { + updatesBatch.add(item); + queue.drainTo(updatesBatch, local ? 999 : 998); + + if (ENABLE_MINIMUM_POLL_WAIT_INTERVAL) { + if (!local) { + var item2 = queue.poll(100, TimeUnit.MILLISECONDS); + if (item2 != null) { + updatesBatch.add(item2); + queue.drainTo(updatesBatch, Math.max(0, 1000 - updatesBatch.size())); + } + } + } + } + } catch (InterruptedException ex) { + // polling cancelled, expected sometimes + } + // Return the updates found, can be an empty list + return updatesBatch; + }) + // Subscribe on td server poll scheduler + .subscribeOn(tdSrvPoll) + // Filter out empty updates lists + .filter(updates -> !updates.isEmpty()) + .repeatWhen(s -> s + // Take until an update is received + .takeWhile(n -> n == 0) + // Take until tdClosed is true + .takeUntilOther(tdClosed.filter(closed -> closed).take(1).single()) + ) + // Take only one update list + .take(1) + // If 5 seconds pass, return a list with 0 updates + .timeout(Duration.ofSeconds(5), Mono.just(List.of())) + // Return 1 list or 0 lists + .singleOrEmpty() + ) .flatMap(receivedList -> { return Flux.fromIterable(receivedList).flatMap(result -> { if (result.succeeded()) { @@ -191,6 +230,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { }).collectList().map(list -> new TdOptionalList(true, list)); }) .defaultIfEmpty(new TdOptionalList(false, Collections.emptyList())) + .subscribeOn(tdSrvPoll) .subscribe(v -> { msg.reply(v); }, ex -> {