diff --git a/pom.xml b/pom.xml
index 1a1ce75..9e508c3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,7 +102,7 @@
it.tdlight
tdlight-java
- 3.169.67
+ 3.169.68
it.cavallium
diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java
index 8e174c2..1c7b0af 100644
--- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java
+++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java
@@ -26,14 +26,16 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
-import reactor.util.concurrent.Queues;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
@@ -41,6 +43,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private static final byte[] EMPTY = new byte[0];
// todo: restore duration to 2 seconds instead of 10 millis, when the bug of tdlight double queue wait is fixed
public static final Duration WAIT_DURATION = Duration.ofSeconds(1);// Duration.ofMillis(10);
+ // If you enable this the poll will wait up to 100 additional milliseconds between each poll, if the server is remote
+ private static final boolean ENABLE_MINIMUM_POLL_WAIT_INTERVAL = false;
private final TdClusterManager cluster;
@@ -50,11 +54,13 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
protected final ReplayProcessor tdClosed = ReplayProcessor.cacheLastOrDefault(false);
protected AsyncTdDirectImpl td;
- protected final Queue>> queue = Queues.>>unbounded().get();
+ protected final LinkedBlockingQueue>> queue = new LinkedBlockingQueue<>();
+ private final Scheduler tdSrvPoll;
@SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
this.cluster = clusterManager;
+ this.tdSrvPoll = Schedulers.newSingle("TdSrvPoll");
if (cluster.registerDefaultCodec(TdOptionalList.class, new TdOptListMessageCodec())) {
cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec());
cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec());
@@ -137,21 +143,54 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private Mono listen() {
return Mono.create(registrationSink -> {
cluster.getEventBus().consumer(botAddress + ".getNextUpdatesBlock", (Message msg) -> {
- Mono
- .from(tdClosed)
- .single()
- .filter(tdClosedVal -> !tdClosedVal)
- .flatMap(_v -> Mono.>>>create(sink -> {
- sink.onRequest((l) -> {
- ArrayList>> updatesBatch = new ArrayList<>();
- while (!queue.isEmpty() && updatesBatch.size() < 1000) {
- var item = queue.poll();
- if (item == null) break;
- updatesBatch.add(item);
- }
- sink.success(updatesBatch);
- });
- }))
+ // Run only if tdlib is not closed
+ Mono.from(tdClosed).single().filter(tdClosedVal -> !tdClosedVal)
+ // Get a list of updates
+ .flatMap(_v -> Mono
+ .>>>fromSupplier(() -> {
+ // When a request is asked, read up to 1000 available updates in the queue
+ long requestTime = System.currentTimeMillis();
+ ArrayList>> updatesBatch = new ArrayList<>();
+ try {
+ // Block until an update is found or 5 seconds passed
+ var item = queue.poll(5, TimeUnit.SECONDS);
+ if (item != null) {
+ updatesBatch.add(item);
+ queue.drainTo(updatesBatch, local ? 999 : 998);
+
+ if (ENABLE_MINIMUM_POLL_WAIT_INTERVAL) {
+ if (!local) {
+ var item2 = queue.poll(100, TimeUnit.MILLISECONDS);
+ if (item2 != null) {
+ updatesBatch.add(item2);
+ queue.drainTo(updatesBatch, Math.max(0, 1000 - updatesBatch.size()));
+ }
+ }
+ }
+ }
+ } catch (InterruptedException ex) {
+ // polling cancelled, expected sometimes
+ }
+ // Return the updates found, can be an empty list
+ return updatesBatch;
+ })
+ // Subscribe on td server poll scheduler
+ .subscribeOn(tdSrvPoll)
+ // Filter out empty updates lists
+ .filter(updates -> !updates.isEmpty())
+ .repeatWhen(s -> s
+ // Take until an update is received
+ .takeWhile(n -> n == 0)
+ // Take until tdClosed is true
+ .takeUntilOther(tdClosed.filter(closed -> closed).take(1).single())
+ )
+ // Take only one update list
+ .take(1)
+ // If 5 seconds pass, return a list with 0 updates
+ .timeout(Duration.ofSeconds(5), Mono.just(List.of()))
+ // Return 1 list or 0 lists
+ .singleOrEmpty()
+ )
.flatMap(receivedList -> {
return Flux.fromIterable(receivedList).flatMap(result -> {
if (result.succeeded()) {
@@ -191,6 +230,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
}).collectList().map(list -> new TdOptionalList(true, list));
})
.defaultIfEmpty(new TdOptionalList(false, Collections.emptyList()))
+ .subscribeOn(tdSrvPoll)
.subscribe(v -> {
msg.reply(v);
}, ex -> {