diff --git a/pom.xml b/pom.xml index fa106ec..95f1273 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,13 @@ it.cavallium filequeue - 3.0.2 + 3.1.2 + + + org.ow2.asm + asm + + io.projectreactor @@ -134,7 +140,7 @@ org.ow2.asm asm - 9.3 + 9.4 io.projectreactor diff --git a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java index b58aca7..0ed852b 100644 --- a/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java +++ b/src/main/java/it/tdlight/reactiveapi/AtomixReactiveApi.java @@ -143,7 +143,14 @@ public class AtomixReactiveApi implements ReactiveApi { return loadSessions.then(Mono.fromRunnable(() -> { if (sharedTdlibServers != null) { requestsSub = sharedTdlibServers.requests() - .doOnNext(req -> localSessions.get(req.data().userId()).handleRequest(req.data())) + .doOnNext(req -> { + var publisher = localSessions.get(req.data().userId()); + if (publisher != null) { + publisher.handleRequest(req.data()); + } else { + LOG.error("Dropped request because no session is found: {}", req); + } + }) .subscribeOn(Schedulers.parallel()) .subscribe(n -> {}, ex -> LOG.error("Requests channel broke unexpectedly", ex)); } diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index f978d5c..3d4b322 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -3,8 +3,10 @@ package it.tdlight.reactiveapi; import static it.tdlight.reactiveapi.AuthPhase.LOGGED_IN; import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT; import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; +import static it.tdlight.reactiveapi.rsocket.FileQueueUtils.convert; import static java.util.Objects.requireNonNull; +import it.cavallium.filequeue.QueueConsumer; import it.tdlight.common.Init; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.common.Response; @@ -43,6 +45,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; @@ -65,6 +68,7 @@ import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; +import reactor.core.scheduler.Scheduler.Worker; import reactor.core.scheduler.Schedulers; public abstract class ReactiveApiPublisher { @@ -112,6 +116,7 @@ public abstract class ReactiveApiPublisher { } catch (Throwable ex) { LOG.error("Failed to initialize client {}", userId, ex); sink.error(ex); + return; } rawTelegramClient.setListener(t -> { if (!sink.isCancelled()) { @@ -120,10 +125,8 @@ public abstract class ReactiveApiPublisher { } }); sink.onCancel(rawTelegramClient::cancel); - sink.onDispose(() -> { - rawTelegramClient.dispose(); - }); - }, OverflowStrategy.ERROR).doOnNext(next -> bufferedUpdates.increment()); + sink.onDispose(rawTelegramClient::dispose); + }, OverflowStrategy.BUFFER).doOnNext(next -> bufferedUpdates.increment()); Stats.STATS.add(this); @@ -176,12 +179,7 @@ public abstract class ReactiveApiPublisher { .>map(s -> ((TDLibBoundResultingEvent) s)) // Buffer requests to avoid halting the event loop - .transform(ReactorUtils.onBackpressureBuffer(path, - "tdlib-bound-events", - false, - new TdlibBoundResultingEventSerializer(), - new TdlibBoundResultingEventDeserializer() - )) + .onBackpressureBuffer() // Send requests to tdlib .flatMap(req -> Mono @@ -225,7 +223,7 @@ public abstract class ReactiveApiPublisher { // Buffer requests to avoid halting the event loop .doOnNext(clientBoundEvent -> clientBoundEvents.increment()) - .transform(ReactorUtils.onBackpressureBufferSubscribe(path, + .transform(ReactorUtils.onBackpressureBufferSubscribe(Paths.get(""), "client-bound-resulting-events", false, new ClientBoundEventSerializer(), @@ -243,12 +241,7 @@ public abstract class ReactiveApiPublisher { .cast(ClusterBoundResultingEvent.class) // Buffer requests to avoid halting the event loop - .as(ReactorUtils.onBackpressureBuffer(path, - "cluster-bound-events", - false, - new ClusterBoundResultingEventSerializer(), - new ClusterBoundResultingEventDeserializer() - )) + .onBackpressureBuffer() // Send events to the cluster .subscribeOn(Schedulers.parallel()) diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java index 03a3557..77bb5bf 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java @@ -1,6 +1,7 @@ package it.tdlight.reactiveapi; -import it.cavallium.filequeue.DiskQueueToConsumer; +import it.cavallium.filequeue.IQueueToConsumer; +import it.cavallium.filequeue.LMDBQueueToConsumer; import it.tdlight.reactiveapi.rsocket.FileQueueUtils; import java.io.IOException; import java.io.UncheckedIOException; @@ -166,37 +167,30 @@ public class ReactorUtils { Deserializer deserializer) { return flux -> { AtomicReference> ref = new AtomicReference<>(); - DiskQueueToConsumer queue; - try { - var queuePath = path.resolve(".tdlib-queue"); - if (Files.notExists(queuePath)) { - Files.createDirectories(queuePath); - } - queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"), - !persistent, - FileQueueUtils.convert(serializer), - FileQueueUtils.convert(deserializer), - signal -> { - var sink = ref.get(); - if (sink != null && sink.requestedFromDownstream() > 0) { - if (signal != null) { - sink.next(signal); - } - return true; - } else { - return false; + var queuePath = path.resolve(".tdlib-queue"); + IQueueToConsumer queue = new LMDBQueueToConsumer<>(queuePath, + name, + !persistent, + FileQueueUtils.convert(serializer), + FileQueueUtils.convert(deserializer), + signal -> { + var sink = ref.get(); + if (sink != null && !sink.isCancelled() && sink.requestedFromDownstream() > 0) { + if (signal != null) { + sink.next(signal); } + return true; + } else { + return false; } - ); - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } + } + ); var disposable = flux .subscribeOn(Schedulers.parallel()) .publishOn(Schedulers.boundedElastic()) .subscribe(queue::add); queue.startQueue(); - return Flux.create(sink -> { + return Flux.create(sink -> { sink.onDispose(() -> { disposable.dispose(); queue.close(); @@ -213,58 +207,52 @@ public class ReactorUtils { Serializer serializer, Deserializer deserializer) { return flux -> Flux.create(sink -> { - try { - var queuePath = path.resolve(".tdlib-queue"); - if (Files.notExists(queuePath)) { - Files.createDirectories(queuePath); - } - var queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"), - !persistent, - FileQueueUtils.convert(serializer), - FileQueueUtils.convert(deserializer), - signal -> { - if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) { - if (signal != null) { - sink.next(signal); - } - return true; - } else { - return false; + var queuePath = path.resolve(".tdlib-queue"); + var queue = new LMDBQueueToConsumer<>(queuePath, + name, + !persistent, + FileQueueUtils.convert(serializer), + FileQueueUtils.convert(deserializer), + signal -> { + if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) { + if (signal != null) { + sink.next(signal); + } + return true; + } else { + return false; + } + } + ); + sink.onDispose(queue::close); + flux + .subscribeOn(Schedulers.parallel()) + .publishOn(Schedulers.boundedElastic()) + .subscribe(new CoreSubscriber<>() { + @Override + public void onSubscribe(@NotNull Subscription s) { + sink.onCancel(s::cancel); + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T element) { + if (!sink.isCancelled()) { + queue.add(element); } } - ); - sink.onDispose(queue::close); - flux - .subscribeOn(Schedulers.parallel()) - .publishOn(Schedulers.boundedElastic()) - .subscribe(new CoreSubscriber() { - @Override - public void onSubscribe(@NotNull Subscription s) { - sink.onCancel(s::cancel); - s.request(Long.MAX_VALUE); - } - @Override - public void onNext(T element) { - if (!sink.isCancelled()) { - queue.add(element); - } - } + @Override + public void onError(Throwable throwable) { + sink.error(throwable); + } - @Override - public void onError(Throwable throwable) { - sink.error(throwable); - } - - @Override - public void onComplete() { - } - }); - queue.startQueue(); - } catch (IOException ex) { - sink.error(ex); - } - }).subscribeOn(Schedulers.boundedElastic()); + @Override + public void onComplete() { + } + }); + queue.startQueue(); + }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()); } private static class WaitingSink implements FluxSink {