Use LMDB queues

This commit is contained in:
Andrea Cavalli 2022-10-11 00:24:51 +02:00
parent abf9f28484
commit a8510ed336
4 changed files with 87 additions and 93 deletions

10
pom.xml
View File

@ -77,7 +77,13 @@
<dependency> <dependency>
<groupId>it.cavallium</groupId> <groupId>it.cavallium</groupId>
<artifactId>filequeue</artifactId> <artifactId>filequeue</artifactId>
<version>3.0.2</version> <version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
@ -134,7 +140,7 @@
<dependency> <dependency>
<groupId>org.ow2.asm</groupId> <groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId> <artifactId>asm</artifactId>
<version>9.3</version> <version>9.4</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>

View File

@ -143,7 +143,14 @@ public class AtomixReactiveApi implements ReactiveApi {
return loadSessions.<Void>then(Mono.fromRunnable(() -> { return loadSessions.<Void>then(Mono.fromRunnable(() -> {
if (sharedTdlibServers != null) { if (sharedTdlibServers != null) {
requestsSub = sharedTdlibServers.requests() requestsSub = sharedTdlibServers.requests()
.doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data())) .doOnNext(req -> {
var publisher = localSessions.get(req.data().userId());
if (publisher != null) {
publisher.handleRequest(req.data());
} else {
LOG.error("Dropped request because no session is found: {}", req);
}
})
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.subscribe(n -> {}, ex -> LOG.error("Requests channel broke unexpectedly", ex)); .subscribe(n -> {}, ex -> LOG.error("Requests channel broke unexpectedly", ex));
} }

View File

@ -3,8 +3,10 @@ package it.tdlight.reactiveapi;
import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN; import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN;
import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT; import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT;
import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; import static it.tdlight.reactiveapi.Event.SERIAL_VERSION;
import static it.tdlight.reactiveapi.rsocket.FileQueueUtils.convert;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import it.cavallium.filequeue.QueueConsumer;
import it.tdlight.common.Init; import it.tdlight.common.Init;
import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.common.Response; import it.tdlight.common.Response;
@ -43,6 +45,7 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
@ -65,6 +68,7 @@ import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Many; import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler.Worker;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
public abstract class ReactiveApiPublisher { public abstract class ReactiveApiPublisher {
@ -112,6 +116,7 @@ public abstract class ReactiveApiPublisher {
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.error("Failed to initialize client {}", userId, ex); LOG.error("Failed to initialize client {}", userId, ex);
sink.error(ex); sink.error(ex);
return;
} }
rawTelegramClient.setListener(t -> { rawTelegramClient.setListener(t -> {
if (!sink.isCancelled()) { if (!sink.isCancelled()) {
@ -120,10 +125,8 @@ public abstract class ReactiveApiPublisher {
} }
}); });
sink.onCancel(rawTelegramClient::cancel); sink.onCancel(rawTelegramClient::cancel);
sink.onDispose(() -> { sink.onDispose(rawTelegramClient::dispose);
rawTelegramClient.dispose(); }, OverflowStrategy.BUFFER).doOnNext(next -> bufferedUpdates.increment());
});
}, OverflowStrategy.ERROR).doOnNext(next -> bufferedUpdates.increment());
Stats.STATS.add(this); Stats.STATS.add(this);
@ -176,12 +179,7 @@ public abstract class ReactiveApiPublisher {
.<TDLibBoundResultingEvent<?>>map(s -> ((TDLibBoundResultingEvent<?>) s)) .<TDLibBoundResultingEvent<?>>map(s -> ((TDLibBoundResultingEvent<?>) s))
// Buffer requests to avoid halting the event loop // Buffer requests to avoid halting the event loop
.transform(ReactorUtils.onBackpressureBuffer(path, .onBackpressureBuffer()
"tdlib-bound-events",
false,
new TdlibBoundResultingEventSerializer(),
new TdlibBoundResultingEventDeserializer()
))
// Send requests to tdlib // Send requests to tdlib
.flatMap(req -> Mono .flatMap(req -> Mono
@ -225,7 +223,7 @@ public abstract class ReactiveApiPublisher {
// Buffer requests to avoid halting the event loop // Buffer requests to avoid halting the event loop
.doOnNext(clientBoundEvent -> clientBoundEvents.increment()) .doOnNext(clientBoundEvent -> clientBoundEvents.increment())
.transform(ReactorUtils.onBackpressureBufferSubscribe(path, .transform(ReactorUtils.onBackpressureBufferSubscribe(Paths.get(""),
"client-bound-resulting-events", "client-bound-resulting-events",
false, false,
new ClientBoundEventSerializer(), new ClientBoundEventSerializer(),
@ -243,12 +241,7 @@ public abstract class ReactiveApiPublisher {
.cast(ClusterBoundResultingEvent.class) .cast(ClusterBoundResultingEvent.class)
// Buffer requests to avoid halting the event loop // Buffer requests to avoid halting the event loop
.as(ReactorUtils.onBackpressureBuffer(path, .onBackpressureBuffer()
"cluster-bound-events",
false,
new ClusterBoundResultingEventSerializer(),
new ClusterBoundResultingEventDeserializer()
))
// Send events to the cluster // Send events to the cluster
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())

View File

@ -1,6 +1,7 @@
package it.tdlight.reactiveapi; package it.tdlight.reactiveapi;
import it.cavallium.filequeue.DiskQueueToConsumer; import it.cavallium.filequeue.IQueueToConsumer;
import it.cavallium.filequeue.LMDBQueueToConsumer;
import it.tdlight.reactiveapi.rsocket.FileQueueUtils; import it.tdlight.reactiveapi.rsocket.FileQueueUtils;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
@ -166,37 +167,30 @@ public class ReactorUtils {
Deserializer<T> deserializer) { Deserializer<T> deserializer) {
return flux -> { return flux -> {
AtomicReference<FluxSink<T>> ref = new AtomicReference<>(); AtomicReference<FluxSink<T>> ref = new AtomicReference<>();
DiskQueueToConsumer<T> queue; var queuePath = path.resolve(".tdlib-queue");
try { IQueueToConsumer<T> queue = new LMDBQueueToConsumer<>(queuePath,
var queuePath = path.resolve(".tdlib-queue"); name,
if (Files.notExists(queuePath)) { !persistent,
Files.createDirectories(queuePath); FileQueueUtils.convert(serializer),
} FileQueueUtils.convert(deserializer),
queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"), signal -> {
!persistent, var sink = ref.get();
FileQueueUtils.convert(serializer), if (sink != null && !sink.isCancelled() && sink.requestedFromDownstream() > 0) {
FileQueueUtils.convert(deserializer), if (signal != null) {
signal -> { sink.next(signal);
var sink = ref.get();
if (sink != null && sink.requestedFromDownstream() > 0) {
if (signal != null) {
sink.next(signal);
}
return true;
} else {
return false;
} }
return true;
} else {
return false;
} }
); }
} catch (IOException ex) { );
throw new UncheckedIOException(ex);
}
var disposable = flux var disposable = flux
.subscribeOn(Schedulers.parallel()) .subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.boundedElastic()) .publishOn(Schedulers.boundedElastic())
.subscribe(queue::add); .subscribe(queue::add);
queue.startQueue(); queue.startQueue();
return Flux.<T>create(sink -> { return Flux.create(sink -> {
sink.onDispose(() -> { sink.onDispose(() -> {
disposable.dispose(); disposable.dispose();
queue.close(); queue.close();
@ -213,58 +207,52 @@ public class ReactorUtils {
Serializer<T> serializer, Serializer<T> serializer,
Deserializer<T> deserializer) { Deserializer<T> deserializer) {
return flux -> Flux.<T>create(sink -> { return flux -> Flux.<T>create(sink -> {
try { var queuePath = path.resolve(".tdlib-queue");
var queuePath = path.resolve(".tdlib-queue"); var queue = new LMDBQueueToConsumer<>(queuePath,
if (Files.notExists(queuePath)) { name,
Files.createDirectories(queuePath); !persistent,
} FileQueueUtils.convert(serializer),
var queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"), FileQueueUtils.convert(deserializer),
!persistent, signal -> {
FileQueueUtils.convert(serializer), if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) {
FileQueueUtils.convert(deserializer), if (signal != null) {
signal -> { sink.next(signal);
if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) { }
if (signal != null) { return true;
sink.next(signal); } else {
} return false;
return true; }
} else { }
return false; );
sink.onDispose(queue::close);
flux
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.boundedElastic())
.subscribe(new CoreSubscriber<>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
sink.onCancel(s::cancel);
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T element) {
if (!sink.isCancelled()) {
queue.add(element);
} }
} }
);
sink.onDispose(queue::close);
flux
.subscribeOn(Schedulers.parallel())
.publishOn(Schedulers.boundedElastic())
.subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(@NotNull Subscription s) {
sink.onCancel(s::cancel);
s.request(Long.MAX_VALUE);
}
@Override @Override
public void onNext(T element) { public void onError(Throwable throwable) {
if (!sink.isCancelled()) { sink.error(throwable);
queue.add(element); }
}
}
@Override @Override
public void onError(Throwable throwable) { public void onComplete() {
sink.error(throwable); }
} });
queue.startQueue();
@Override }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic());
public void onComplete() {
}
});
queue.startQueue();
} catch (IOException ex) {
sink.error(ex);
}
}).subscribeOn(Schedulers.boundedElastic());
} }
private static class WaitingSink<T> implements FluxSink<T> { private static class WaitingSink<T> implements FluxSink<T> {