Improve queues

This commit is contained in:
Andrea Cavalli 2022-10-10 20:30:32 +02:00
parent 708fcbd1e4
commit abf9f28484
10 changed files with 375 additions and 117 deletions

View File

@ -77,7 +77,7 @@
<dependency>
<groupId>it.cavallium</groupId>
<artifactId>filequeue</artifactId>
<version>3.0.0</version>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>

View File

@ -0,0 +1,18 @@
package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed;
import java.io.DataInput;
import java.io.IOException;
public class ClusterBoundResultingEventDeserializer implements Deserializer<ClusterBoundResultingEvent> {
@Override
public ClusterBoundResultingEvent deserialize(int length, DataInput dataInput) throws IOException {
var type = dataInput.readByte();
return switch (type) {
case 0 -> new ResultingEventPublisherClosed();
default -> throw new UnsupportedOperationException("Unsupported type: " + type);
};
}
}

View File

@ -0,0 +1,17 @@
package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
import java.io.DataOutput;
import java.io.IOException;
public class ClusterBoundResultingEventSerializer implements Serializer<ClusterBoundResultingEvent> {
@Override
public void serialize(ClusterBoundResultingEvent data, DataOutput output) throws IOException {
if (data instanceof ResultingEvent.ResultingEventPublisherClosed) {
output.writeByte(0x0);
} else {
throw new UnsupportedOperationException("Unsupported event: " + data);
}
}
}

View File

@ -5,7 +5,6 @@ import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT;
import static it.tdlight.reactiveapi.Event.SERIAL_VERSION;
import static java.util.Objects.requireNonNull;
import it.cavallium.filequeue.DiskQueueToConsumer;
import it.tdlight.common.Init;
import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.common.Response;
@ -37,45 +36,33 @@ import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
import it.tdlight.reactiveapi.rsocket.FileQueueUtils;
import it.tdlight.tdlight.ClientManager;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Schedulers;
@ -98,6 +85,13 @@ public abstract class ReactiveApiPublisher {
private final AtomicReference<Disposable> disposable = new AtomicReference<>();
private final AtomicReference<Path> path = new AtomicReference<>();
// Debugging variables
final LongAdder receivedUpdates = new LongAdder();
final LongAdder bufferedUpdates = new LongAdder();
final LongAdder processedUpdates = new LongAdder();
final LongAdder clientBoundEvents = new LongAdder();
final LongAdder sentClientBoundEvents = new LongAdder();
private ReactiveApiPublisher(TdlibChannelsSharedHost sharedTdlibServers,
Set<ResultingEventTransformer> resultingEventTransformerSet,
long userId, String lane) {
@ -113,68 +107,26 @@ public abstract class ReactiveApiPublisher {
throw new RuntimeException("Can't load TDLight", e);
}
this.telegramClient = Flux.<Signal>create(sink -> {
var path = this.path.get();
if (path == null) {
sink.error(new IllegalStateException("Path not set!"));
return;
}
DiskQueueToConsumer<Signal> queue;
try {
var queuePath = path.resolve(".queue");
if (Files.notExists(queuePath)) {
Files.createDirectories(queuePath);
}
queue = new DiskQueueToConsumer<>(queuePath.resolve("tdlib-events.tape2"),
FileQueueUtils.convert(SignalUtils.serializer()),
FileQueueUtils.convert(SignalUtils.deserializer()),
signal -> {
if (sink.requestedFromDownstream() > 0) {
if (signal != null) {
sink.next(signal);
}
return true;
} else {
return false;
}
}
);
} catch (Throwable ex) {
LOG.error("Failed to initialize queue {}", userId, ex);
sink.error(ex);
return;
}
try {
queue.startQueue();
} catch (Throwable ex) {
LOG.error("Failed to initialize queue {}", userId, ex);
sink.error(ex);
return;
}
try {
rawTelegramClient.createAndRegisterClient();
} catch (Throwable ex) {
LOG.error("Failed to initialize client {}", userId, ex);
sink.error(ex);
return;
}
rawTelegramClient.setListener(value -> {
rawTelegramClient.setListener(t -> {
if (!sink.isCancelled()) {
queue.add(value);
}
});
sink.onDispose(() -> {
rawTelegramClient.dispose();
try {
queue.close();
} catch (Exception e) {
LOG.error("Unexpected error while closing the queue", e);
this.receivedUpdates.increment();
sink.next(t);
}
});
sink.onCancel(rawTelegramClient::cancel);
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic());
sink.onDispose(() -> {
rawTelegramClient.dispose();
});
}, OverflowStrategy.ERROR).doOnNext(next -> bufferedUpdates.increment());
Stats.STATS.add(this);
}
public static ReactiveApiPublisher fromToken(TdlibChannelsSharedHost sharedTdlibServers,
@ -216,12 +168,20 @@ public abstract class ReactiveApiPublisher {
return transformedFlux;
})
.publish(256);
.publish(512);
publishedResultingEvents
// Obtain only TDLib-bound events
.filter(s -> s instanceof TDLibBoundResultingEvent<?>)
.map(s -> ((TDLibBoundResultingEvent<?>) s))
.<TDLibBoundResultingEvent<?>>map(s -> ((TDLibBoundResultingEvent<?>) s))
// Buffer requests to avoid halting the event loop
.transform(ReactorUtils.onBackpressureBuffer(path,
"tdlib-bound-events",
false,
new TdlibBoundResultingEventSerializer(),
new TdlibBoundResultingEventDeserializer()
))
// Send requests to tdlib
.flatMap(req -> Mono
@ -262,54 +222,17 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof ClientBoundResultingEvent)
.cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event)
.transform(flux -> Flux.<ClientBoundEvent>create(sink -> {
try {
var queuePath = path.resolve(".queue");
if (Files.notExists(queuePath)) {
Files.createDirectories(queuePath);
}
var queue = new DiskQueueToConsumer<>(queuePath.resolve("client-bound-resulting-events.tape2"),
FileQueueUtils.convert(new ClientBoundEventSerializer()),
FileQueueUtils.convert(new ClientBoundEventDeserializer()),
signal -> {
if (sink.requestedFromDownstream() > 0) {
if (signal != null) {
sink.next(signal);
}
return true;
} else {
return false;
}
}
);
sink.onDispose(queue::close);
flux.subscribeOn(Schedulers.parallel()).subscribe(new CoreSubscriber<>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
sink.onCancel(s::cancel);
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(ClientBoundEvent clientBoundEvent) {
if (!sink.isCancelled()) {
queue.add(clientBoundEvent);
}
}
// Buffer requests to avoid halting the event loop
.doOnNext(clientBoundEvent -> clientBoundEvents.increment())
.transform(ReactorUtils.onBackpressureBufferSubscribe(path,
"client-bound-resulting-events",
false,
new ClientBoundEventSerializer(),
new ClientBoundEventDeserializer()
))
.doOnNext(clientBoundEvent -> sentClientBoundEvents.increment())
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onComplete() {
}
});
} catch (IOException ex) {
sink.error(ex);
}
}, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()))
.as(ReactorUtils::subscribeOnceUntilUnsubscribe);
sharedTdlibServers.events(lane, messagesToSend);
@ -319,6 +242,14 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof ClusterBoundResultingEvent)
.cast(ClusterBoundResultingEvent.class)
// Buffer requests to avoid halting the event loop
.as(ReactorUtils.onBackpressureBuffer(path,
"cluster-bound-events",
false,
new ClusterBoundResultingEventSerializer(),
new ClusterBoundResultingEventDeserializer()
))
// Send events to the cluster
.subscribeOn(Schedulers.parallel())
.subscribe(clusterBoundEvent -> {
@ -382,6 +313,7 @@ public abstract class ReactiveApiPublisher {
// Update the state
var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal));
processedUpdates.increment();
if (state.authPhase() == LOGGED_IN) {
ResultingEvent resultingEvent = wrapUpdateSignal(signal);
return List.of(resultingEvent);

View File

@ -1,10 +1,17 @@
package it.tdlight.reactiveapi;
import it.cavallium.filequeue.DiskQueueToConsumer;
import it.tdlight.reactiveapi.rsocket.FileQueueUtils;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongConsumer;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscription;
@ -15,6 +22,7 @@ import reactor.core.publisher.FluxSink;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
public class ReactorUtils {
@ -137,7 +145,8 @@ public class ReactorUtils {
var s = subscriptionAtomicReference.get();
emitter.onRequest(n -> {
if (n > maxBufferSize) {
emitter.error(new UnsupportedOperationException("Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize));
emitter.error(new UnsupportedOperationException(
"Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize));
} else {
s.request(n);
}
@ -150,6 +159,114 @@ public class ReactorUtils {
});
}
public static <T> Function<Flux<T>, Flux<T>> onBackpressureBufferSubscribe(Path path,
String name,
boolean persistent,
Serializer<T> serializer,
Deserializer<T> deserializer) {
return flux -> {
AtomicReference<FluxSink<T>> ref = new AtomicReference<>();
DiskQueueToConsumer<T> queue;
try {
var queuePath = path.resolve(".tdlib-queue");
if (Files.notExists(queuePath)) {
Files.createDirectories(queuePath);
}
queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"),
!persistent,
FileQueueUtils.convert(serializer),
FileQueueUtils.convert(deserializer),
signal -> {
var sink = ref.get();
if (sink != null && sink.requestedFromDownstream() > 0) {
if (signal != null) {
sink.next(signal);
}
return true;
} else {
return false;
}
}
);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
var disposable = flux
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.boundedElastic())
.subscribe(queue::add);
queue.startQueue();
return Flux.<T>create(sink -> {
sink.onDispose(() -> {
disposable.dispose();
queue.close();
});
ref.set(sink);
sink.onCancel(() -> ref.set(null));
});
};
}
public static <T> Function<Flux<T>, Flux<T>> onBackpressureBuffer(Path path,
String name,
boolean persistent,
Serializer<T> serializer,
Deserializer<T> deserializer) {
return flux -> Flux.<T>create(sink -> {
try {
var queuePath = path.resolve(".tdlib-queue");
if (Files.notExists(queuePath)) {
Files.createDirectories(queuePath);
}
var queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"),
!persistent,
FileQueueUtils.convert(serializer),
FileQueueUtils.convert(deserializer),
signal -> {
if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) {
if (signal != null) {
sink.next(signal);
}
return true;
} else {
return false;
}
}
);
sink.onDispose(queue::close);
flux
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.boundedElastic())
.subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
sink.onCancel(s::cancel);
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T element) {
if (!sink.isCancelled()) {
queue.add(element);
}
}
@Override
public void onError(Throwable throwable) {
sink.error(throwable);
}
@Override
public void onComplete() {
}
});
queue.startQueue();
} catch (IOException ex) {
sink.error(ex);
}
}).subscribeOn(Schedulers.boundedElastic());
}
private static class WaitingSink<T> implements FluxSink<T> {
@Override

View File

@ -0,0 +1,141 @@
package it.tdlight.reactiveapi;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Stats extends Thread {
private static final Logger LOG = LogManager.getLogger(Stats.class);
public static final List<ReactiveApiPublisher> STATS = new CopyOnWriteArrayList<>();
public static final long SLEEP_INTERVAL = Duration.ofSeconds(10).toMillis();
static {
var stats = new Stats();
stats.setName("Stats");
stats.setDaemon(true);
stats.start();
}
public static void init() {
}
@Override
public void run() {
try {
var prev = System.currentTimeMillis();
var prevClients = 0;
var prevReceivedUpdates = new LongArrayList();
var prevBufferedUpdates = new LongArrayList();
var prevProcessedUpdates = new LongArrayList();
var prevClientBoundEvents = new LongArrayList();
var prevSentClientBoundEvents = new LongArrayList();
while (!Thread.interrupted()) {
//noinspection BusyWait
Thread.sleep(SLEEP_INTERVAL);
var now = System.currentTimeMillis();
var timeDiffSeconds = (now - prev) / 1000d;
StringBuilder out = new StringBuilder();
out.append("Statistics. Time delta: %03.2fs%n".formatted(timeDiffSeconds));
var currentClients = STATS.size();
var clientIds = new LongArrayList();
var receivedUpdates = new LongArrayList();
var bufferedUpdates = new LongArrayList();
var processedUpdates = new LongArrayList();
var clientBoundEvents = new LongArrayList();
var sentClientBoundEvents = new LongArrayList();
for (ReactiveApiPublisher stat : STATS) {
clientIds.add(stat.userId);
receivedUpdates.add(stat.receivedUpdates.longValue());
bufferedUpdates.add(stat.bufferedUpdates.longValue());
processedUpdates.add(stat.processedUpdates.longValue());
clientBoundEvents.add(stat.clientBoundEvents.longValue());
sentClientBoundEvents.add(stat.sentClientBoundEvents.longValue());
}
while (currentClients > prevClients) {
prevClients++;
prevReceivedUpdates.add(0);
prevBufferedUpdates.add(0);
prevProcessedUpdates.add(0);
prevClientBoundEvents.add(0);
prevSentClientBoundEvents.add(0);
}
double receivedUpdatesRateSum = 0;
long ramBufferedSum = 0;
long diskBufferedSum = 0;
double bufferedUpdatesRateSum = 0;
double processedUpdatesRateSum = 0;
double clientBoundEventsRateSum = 0;
double sentClientBoundEventsRateSum = 0;
for (int i = 0; i <= currentClients; i++) {
double receivedUpdatesRate;
long ramBuffered;
long diskBuffered;
double bufferedUpdatesRate;
double processedUpdatesRate;
double clientBoundEventsRate;
double sentClientBoundEventsRate;
if (i != currentClients) {
receivedUpdatesRate = (receivedUpdates.getLong(i) - prevReceivedUpdates.getLong(i)) / timeDiffSeconds;
diskBuffered = bufferedUpdates.getLong(i) - processedUpdates.getLong(i);
ramBuffered = receivedUpdates.getLong(i) - bufferedUpdates.getLong(i);
bufferedUpdatesRate = (bufferedUpdates.getLong(i) - prevBufferedUpdates.getLong(i)) / timeDiffSeconds;
processedUpdatesRate = (processedUpdates.getLong(i) - prevProcessedUpdates.getLong(i)) / timeDiffSeconds;
clientBoundEventsRate = (clientBoundEvents.getLong(i) - prevClientBoundEvents.getLong(i)) / timeDiffSeconds;
sentClientBoundEventsRate =
(sentClientBoundEvents.getLong(i) - prevSentClientBoundEvents.getLong(i)) / timeDiffSeconds;
receivedUpdatesRateSum += receivedUpdatesRate;
diskBufferedSum += diskBuffered;
ramBufferedSum += ramBuffered;
bufferedUpdatesRateSum += bufferedUpdatesRate;
processedUpdatesRateSum += processedUpdatesRate;
clientBoundEventsRateSum += clientBoundEventsRate;
sentClientBoundEventsRateSum += sentClientBoundEventsRate;
out.append(String.format("%d:\t", clientIds.getLong(i)));
} else {
receivedUpdatesRate = receivedUpdatesRateSum;
diskBuffered = diskBufferedSum;
ramBuffered = ramBufferedSum;
bufferedUpdatesRate = bufferedUpdatesRateSum;
processedUpdatesRate = processedUpdatesRateSum;
clientBoundEventsRate = clientBoundEventsRateSum;
sentClientBoundEventsRate = sentClientBoundEventsRateSum;
out.append("Total:\t");
}
out.append(String.format(
"\tUpdates:\t[received %03.2fHz\tbuffered: %03.2fHz (RAM: %d HDD: %d)\tprocessed: %03.2fHz]\tClient bound events: %03.2fHz\tProcessed events: %03.2fHz\t%n",
receivedUpdatesRate,
bufferedUpdatesRate,
ramBuffered,
diskBuffered,
processedUpdatesRate,
clientBoundEventsRate,
sentClientBoundEventsRate
));
}
out.append(String.format("%n"));
for (int i = 0; i < currentClients; i++) {
prevReceivedUpdates = receivedUpdates;
prevBufferedUpdates = bufferedUpdates;
prevProcessedUpdates = processedUpdates;
prevClientBoundEvents = clientBoundEvents;
prevSentClientBoundEvents = sentClientBoundEvents;
}
LOG.debug(out.toString());
prev = now;
}
} catch (InterruptedException ex) {
}
}
}

View File

@ -0,0 +1,16 @@
package it.tdlight.reactiveapi;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
import java.io.DataInput;
import java.io.IOException;
public class TdlibBoundResultingEventDeserializer implements Deserializer<TDLibBoundResultingEvent<?>> {
@Override
public TDLibBoundResultingEvent<?> deserialize(int length, DataInput dataInput) throws IOException {
Function<?> action = (Function<?>) TdApi.Deserializer.deserialize(dataInput);
return new TDLibBoundResultingEvent<>(action, dataInput.readBoolean());
}
}

View File

@ -0,0 +1,14 @@
package it.tdlight.reactiveapi;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
import java.io.DataOutput;
import java.io.IOException;
public class TdlibBoundResultingEventSerializer implements Serializer<TDLibBoundResultingEvent<?>> {
@Override
public void serialize(TDLibBoundResultingEvent<?> data, DataOutput output) throws IOException {
data.action().serialize(output);
output.writeBoolean(data.ignoreFailure());
}
}

View File

@ -6,7 +6,10 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnRequest;
import it.tdlight.reactiveapi.Event.OnResponse;
import java.io.Closeable;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

View File

@ -23,7 +23,7 @@ public class InfiniteQueueBench {
AtomicInteger status = new AtomicInteger();
tmpFile.toFile().deleteOnExit();
Files.delete(tmpFile);
try (var queue = new it.cavallium.filequeue.DiskQueueToConsumer<String>(tmpFile, new Serializer<String>() {
try (var queue = new it.cavallium.filequeue.DiskQueueToConsumer<String>(tmpFile, true, new Serializer<String>() {
@Override
public byte[] serialize(String data) throws IOException {
return data.getBytes(StandardCharsets.US_ASCII);