Revert "Use backpressure buffer instead of a queue"

This reverts commit 878c43ed9e.
This commit is contained in:
Andrea Cavalli 2020-10-22 04:45:28 +02:00
parent 878c43ed9e
commit c2207aa44b
1 changed files with 52 additions and 51 deletions

View File

@ -23,16 +23,16 @@ import it.tdlight.tdlibsession.td.middle.TdOptionalList;
import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
import java.time.Duration; import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor; import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler; import reactor.util.concurrent.Queues;
import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusServer extends AbstractVerticle { public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
@ -49,8 +49,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
protected final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false); protected final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
protected AsyncTdDirectImpl td; protected AsyncTdDirectImpl td;
private Flux<List<AsyncResult<TdResult<Update>>>> updates; protected final Queue<AsyncResult<TdResult<Update>>> queue = Queues.<AsyncResult<TdResult<Update>>>unbounded().get();
private Scheduler getUpdatesScheduler;
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) { public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
@ -81,7 +80,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
throw new IllegalArgumentException("local is not set!"); throw new IllegalArgumentException("local is not set!");
} }
this.local = local; this.local = local;
this.getUpdatesScheduler = Schedulers.newSingle("GetUpdates_" + botAddress);
this.td = new AsyncTdDirectImpl(botAlias); this.td = new AsyncTdDirectImpl(botAlias);
cluster.getEventBus().consumer(botAddress + ".ping", (Message<byte[]> msg) -> { cluster.getEventBus().consumer(botAddress + ".ping", (Message<byte[]> msg) -> {
@ -142,50 +140,54 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.from(tdClosed) .from(tdClosed)
.single() .single()
.filter(tdClosedVal -> !tdClosedVal) .filter(tdClosedVal -> !tdClosedVal)
.flatMap(closed -> updates.take(1) .map(_v -> {
.take(Duration.ofSeconds(2)) // If 2 seconds pass without the batch, return empty to reply before the timeout ArrayList<AsyncResult<TdResult<Update>>> updatesBatch = new ArrayList<>();
.singleOrEmpty() while (!queue.isEmpty() && updatesBatch.size() < 1000) {
) var item = queue.poll();
.flatMapMany(it -> Flux.fromIterable(it)) if (item == null) break;
.flatMap(result -> { updatesBatch.add(item);
if (result.succeeded()) { }
var received = result.result(); return updatesBatch;
if (OUTPUT_REQUESTS) { })
System.out.println("<=: " + received .flatMap(receivedList -> {
.toString() return Flux.fromIterable(receivedList).flatMap(result -> {
.replace("\n", " ") if (result.succeeded()) {
.replace("\t", "") var received = result.result();
.replace(" ", "") if (OUTPUT_REQUESTS) {
.replace(" = ", "=")); System.out.println("<=: " + received
} .toString()
return Mono.create(sink -> { .replace("\n", " ")
if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { .replace("\t", "")
var authState = (UpdateAuthorizationState) received.result(); .replace(" ", "")
if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) { .replace(" = ", "="));
tdClosed.onNext(true); }
vertx.undeploy(deploymentID(), undeployed -> { return Mono.create(sink -> {
if (undeployed.failed()) { if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
logger.error("Error when undeploying td verticle", undeployed.cause()); var authState = (UpdateAuthorizationState) received.result();
} if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
tdClosed.onNext(true);
vertx.undeploy(deploymentID(), undeployed -> {
if (undeployed.failed()) {
logger.error("Error when undeploying td verticle", undeployed.cause());
}
sink.success();
});
} else {
sink.success(); sink.success();
}); }
} else { } else {
sink.success(); sink.success();
} }
} else { }).then(Mono.<TdResult<Update>>create(sink -> {
sink.success(); sink.success(received);
} }));
}).then(Mono.<TdResult<Update>>create(sink -> { } else {
sink.success(received); logger.error("Received an error update", result.cause());
})); return Mono.empty();
} else { }
logger.error("Received an error update", result.cause()); }).collectList().map(list -> new TdOptionalList(true, list));
return Mono.empty(); })
}
}).collectList().map(list -> new TdOptionalList(true, list))
.defaultIfEmpty(new TdOptionalList(false, Collections.emptyList())) .defaultIfEmpty(new TdOptionalList(false, Collections.emptyList()))
.subscribeOn(getUpdatesScheduler)
.subscribe(v -> { .subscribe(v -> {
msg.reply(v); msg.reply(v);
}, ex -> { }, ex -> {
@ -223,7 +225,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
sink.error(ex); sink.error(ex);
} }
}) })
.subscribeOn(Schedulers.single())
.subscribe(response -> {}, ex -> { .subscribe(response -> {}, ex -> {
logger.error("Error when processing a request", ex); logger.error("Error when processing a request", ex);
msg.fail(500, ex.getLocalizedMessage()); msg.fail(500, ex.getLocalizedMessage());
@ -239,12 +240,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private Mono<Void> pipe() { private Mono<Void> pipe() {
return Mono.fromCallable(() -> { return Mono.fromCallable(() -> {
this.updates = td td
.getUpdates(WAIT_DURATION, 1000) .getUpdates(WAIT_DURATION, 1000)
.onBackpressureBuffer() .bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100))
.bufferTimeout(1000, Duration.ofMillis(300)) .subscribe(nextItems -> {
.filter(asyncResults -> !asyncResults.isEmpty()) queue.addAll(nextItems);
.share(); });
return (Void) null; return (Void) null;
}); });
} }