Update AsyncTdMiddleEventBusServer.java

This commit is contained in:
Andrea Cavalli 2020-10-22 04:55:58 +02:00
parent c2207aa44b
commit bf97f94db3

View File

@ -25,6 +25,7 @@ import it.tdlight.utils.MonoUtils;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -140,15 +141,17 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.from(tdClosed) .from(tdClosed)
.single() .single()
.filter(tdClosedVal -> !tdClosedVal) .filter(tdClosedVal -> !tdClosedVal)
.map(_v -> { .flatMap(_v -> Mono.<List<AsyncResult<TdResult<Update>>>>create(sink -> {
sink.onRequest((l) -> {
ArrayList<AsyncResult<TdResult<Update>>> updatesBatch = new ArrayList<>(); ArrayList<AsyncResult<TdResult<Update>>> updatesBatch = new ArrayList<>();
while (!queue.isEmpty() && updatesBatch.size() < 1000) { while (!queue.isEmpty() && updatesBatch.size() < 1000) {
var item = queue.poll(); var item = queue.poll();
if (item == null) break; if (item == null) break;
updatesBatch.add(item); updatesBatch.add(item);
} }
return updatesBatch; sink.success(updatesBatch);
}) });
}))
.flatMap(receivedList -> { .flatMap(receivedList -> {
return Flux.fromIterable(receivedList).flatMap(result -> { return Flux.fromIterable(receivedList).flatMap(result -> {
if (result.succeeded()) { if (result.succeeded()) {