Use backpressure buffer instead of a queue
This commit is contained in:
parent
4f79c4ad61
commit
878c43ed9e
|
@ -23,16 +23,16 @@ import it.tdlight.tdlibsession.td.middle.TdOptionalList;
|
||||||
import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec;
|
import it.tdlight.tdlibsession.td.middle.TdResultMessageCodec;
|
||||||
import it.tdlight.utils.MonoUtils;
|
import it.tdlight.utils.MonoUtils;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Queue;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.ReplayProcessor;
|
import reactor.core.publisher.ReplayProcessor;
|
||||||
import reactor.util.concurrent.Queues;
|
import reactor.core.scheduler.Scheduler;
|
||||||
|
import reactor.core.scheduler.Schedulers;
|
||||||
|
|
||||||
public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||||
|
|
||||||
|
@ -49,7 +49,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||||
|
|
||||||
protected final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
|
protected final ReplayProcessor<Boolean> tdClosed = ReplayProcessor.cacheLastOrDefault(false);
|
||||||
protected AsyncTdDirectImpl td;
|
protected AsyncTdDirectImpl td;
|
||||||
protected final Queue<AsyncResult<TdResult<Update>>> queue = Queues.<AsyncResult<TdResult<Update>>>unbounded().get();
|
private Flux<List<AsyncResult<TdResult<Update>>>> updates;
|
||||||
|
private Scheduler getUpdatesScheduler;
|
||||||
|
|
||||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||||
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
|
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
|
||||||
|
@ -80,6 +81,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||||
throw new IllegalArgumentException("local is not set!");
|
throw new IllegalArgumentException("local is not set!");
|
||||||
}
|
}
|
||||||
this.local = local;
|
this.local = local;
|
||||||
|
this.getUpdatesScheduler = Schedulers.newSingle("GetUpdates_" + botAddress);
|
||||||
this.td = new AsyncTdDirectImpl(botAlias);
|
this.td = new AsyncTdDirectImpl(botAlias);
|
||||||
|
|
||||||
cluster.getEventBus().consumer(botAddress + ".ping", (Message<byte[]> msg) -> {
|
cluster.getEventBus().consumer(botAddress + ".ping", (Message<byte[]> msg) -> {
|
||||||
|
@ -140,54 +142,50 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||||
.from(tdClosed)
|
.from(tdClosed)
|
||||||
.single()
|
.single()
|
||||||
.filter(tdClosedVal -> !tdClosedVal)
|
.filter(tdClosedVal -> !tdClosedVal)
|
||||||
.map(_v -> {
|
.flatMap(closed -> updates.take(1)
|
||||||
ArrayList<AsyncResult<TdResult<Update>>> updatesBatch = new ArrayList<>();
|
.take(Duration.ofSeconds(2)) // If 2 seconds pass without the batch, return empty to reply before the timeout
|
||||||
while (!queue.isEmpty() && updatesBatch.size() < 1000) {
|
.singleOrEmpty()
|
||||||
var item = queue.poll();
|
)
|
||||||
if (item == null) break;
|
.flatMapMany(it -> Flux.fromIterable(it))
|
||||||
updatesBatch.add(item);
|
.flatMap(result -> {
|
||||||
}
|
if (result.succeeded()) {
|
||||||
return updatesBatch;
|
var received = result.result();
|
||||||
})
|
if (OUTPUT_REQUESTS) {
|
||||||
.flatMap(receivedList -> {
|
System.out.println("<=: " + received
|
||||||
return Flux.fromIterable(receivedList).flatMap(result -> {
|
.toString()
|
||||||
if (result.succeeded()) {
|
.replace("\n", " ")
|
||||||
var received = result.result();
|
.replace("\t", "")
|
||||||
if (OUTPUT_REQUESTS) {
|
.replace(" ", "")
|
||||||
System.out.println("<=: " + received
|
.replace(" = ", "="));
|
||||||
.toString()
|
}
|
||||||
.replace("\n", " ")
|
return Mono.create(sink -> {
|
||||||
.replace("\t", "")
|
if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
|
||||||
.replace(" ", "")
|
var authState = (UpdateAuthorizationState) received.result();
|
||||||
.replace(" = ", "="));
|
if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
|
||||||
}
|
tdClosed.onNext(true);
|
||||||
return Mono.create(sink -> {
|
vertx.undeploy(deploymentID(), undeployed -> {
|
||||||
if (received.succeeded() && received.result().getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
|
if (undeployed.failed()) {
|
||||||
var authState = (UpdateAuthorizationState) received.result();
|
logger.error("Error when undeploying td verticle", undeployed.cause());
|
||||||
if (authState.authorizationState.getConstructor() == AuthorizationStateClosed.CONSTRUCTOR) {
|
}
|
||||||
tdClosed.onNext(true);
|
|
||||||
vertx.undeploy(deploymentID(), undeployed -> {
|
|
||||||
if (undeployed.failed()) {
|
|
||||||
logger.error("Error when undeploying td verticle", undeployed.cause());
|
|
||||||
}
|
|
||||||
sink.success();
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
sink.success();
|
sink.success();
|
||||||
}
|
});
|
||||||
} else {
|
} else {
|
||||||
sink.success();
|
sink.success();
|
||||||
}
|
}
|
||||||
}).then(Mono.<TdResult<Update>>create(sink -> {
|
} else {
|
||||||
sink.success(received);
|
sink.success();
|
||||||
}));
|
}
|
||||||
} else {
|
}).then(Mono.<TdResult<Update>>create(sink -> {
|
||||||
logger.error("Received an error update", result.cause());
|
sink.success(received);
|
||||||
return Mono.empty();
|
}));
|
||||||
}
|
} else {
|
||||||
}).collectList().map(list -> new TdOptionalList(true, list));
|
logger.error("Received an error update", result.cause());
|
||||||
})
|
return Mono.empty();
|
||||||
|
}
|
||||||
|
}).collectList().map(list -> new TdOptionalList(true, list))
|
||||||
|
|
||||||
.defaultIfEmpty(new TdOptionalList(false, Collections.emptyList()))
|
.defaultIfEmpty(new TdOptionalList(false, Collections.emptyList()))
|
||||||
|
.subscribeOn(getUpdatesScheduler)
|
||||||
.subscribe(v -> {
|
.subscribe(v -> {
|
||||||
msg.reply(v);
|
msg.reply(v);
|
||||||
}, ex -> {
|
}, ex -> {
|
||||||
|
@ -225,6 +223,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||||
sink.error(ex);
|
sink.error(ex);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
.subscribeOn(Schedulers.single())
|
||||||
.subscribe(response -> {}, ex -> {
|
.subscribe(response -> {}, ex -> {
|
||||||
logger.error("Error when processing a request", ex);
|
logger.error("Error when processing a request", ex);
|
||||||
msg.fail(500, ex.getLocalizedMessage());
|
msg.fail(500, ex.getLocalizedMessage());
|
||||||
|
@ -240,12 +239,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||||
|
|
||||||
private Mono<Void> pipe() {
|
private Mono<Void> pipe() {
|
||||||
return Mono.fromCallable(() -> {
|
return Mono.fromCallable(() -> {
|
||||||
td
|
this.updates = td
|
||||||
.getUpdates(WAIT_DURATION, 1000)
|
.getUpdates(WAIT_DURATION, 1000)
|
||||||
.bufferTimeout(1000, local ? Duration.ofMillis(1) : Duration.ofMillis(100))
|
.onBackpressureBuffer()
|
||||||
.subscribe(nextItems -> {
|
.bufferTimeout(1000, Duration.ofMillis(300))
|
||||||
queue.addAll(nextItems);
|
.filter(asyncResults -> !asyncResults.isEmpty())
|
||||||
});
|
.share();
|
||||||
return (Void) null;
|
return (Void) null;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user