From abf9f28484df6a3f51aea70ab40499799ae95e62 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 10 Oct 2022 20:30:32 +0200 Subject: [PATCH] Improve queues --- pom.xml | 2 +- ...lusterBoundResultingEventDeserializer.java | 18 ++ .../ClusterBoundResultingEventSerializer.java | 17 ++ .../reactiveapi/ReactiveApiPublisher.java | 160 +++++------------- .../it/tdlight/reactiveapi/ReactorUtils.java | 119 ++++++++++++- .../java/it/tdlight/reactiveapi/Stats.java | 141 +++++++++++++++ .../TdlibBoundResultingEventDeserializer.java | 16 ++ .../TdlibBoundResultingEventSerializer.java | 14 ++ .../reactiveapi/TdlibChannelsSharedHost.java | 3 + .../reactiveapi/test/InfiniteQueueBench.java | 2 +- 10 files changed, 375 insertions(+), 117 deletions(-) create mode 100644 src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventDeserializer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventSerializer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/Stats.java create mode 100644 src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventDeserializer.java create mode 100644 src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventSerializer.java diff --git a/pom.xml b/pom.xml index 1370ab6..fa106ec 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ it.cavallium filequeue - 3.0.0 + 3.0.2 io.projectreactor diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventDeserializer.java new file mode 100644 index 0000000..700a031 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventDeserializer.java @@ -0,0 +1,18 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent; +import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed; +import java.io.DataInput; +import java.io.IOException; + +public class ClusterBoundResultingEventDeserializer implements Deserializer { + + @Override + public ClusterBoundResultingEvent deserialize(int length, DataInput dataInput) throws IOException { + var type = dataInput.readByte(); + return switch (type) { + case 0 -> new ResultingEventPublisherClosed(); + default -> throw new UnsupportedOperationException("Unsupported type: " + type); + }; + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventSerializer.java b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventSerializer.java new file mode 100644 index 0000000..4fa19d3 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventSerializer.java @@ -0,0 +1,17 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent; +import java.io.DataOutput; +import java.io.IOException; + +public class ClusterBoundResultingEventSerializer implements Serializer { + + @Override + public void serialize(ClusterBoundResultingEvent data, DataOutput output) throws IOException { + if (data instanceof ResultingEvent.ResultingEventPublisherClosed) { + output.writeByte(0x0); + } else { + throw new UnsupportedOperationException("Unsupported event: " + data); + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java index 432539c..f978d5c 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java @@ -5,7 +5,6 @@ import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT; import static it.tdlight.reactiveapi.Event.SERIAL_VERSION; import static java.util.Objects.requireNonNull; -import it.cavallium.filequeue.DiskQueueToConsumer; import it.tdlight.common.Init; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.common.Response; @@ -37,45 +36,33 @@ import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent; import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent; import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed; import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; -import it.tdlight.reactiveapi.rsocket.FileQueueUtils; import it.tdlight.tdlight.ClientManager; import java.io.ByteArrayOutputStream; import java.io.DataOutput; import java.io.DataOutputStream; -import java.io.File; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.Objects; import java.util.Set; import java.util.StringJoiner; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import reactor.core.CoreSubscriber; import reactor.core.Disposable; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks.EmitFailureHandler; import reactor.core.publisher.Sinks.Many; import reactor.core.scheduler.Schedulers; @@ -98,6 +85,13 @@ public abstract class ReactiveApiPublisher { private final AtomicReference disposable = new AtomicReference<>(); private final AtomicReference path = new AtomicReference<>(); + // Debugging variables + final LongAdder receivedUpdates = new LongAdder(); + final LongAdder bufferedUpdates = new LongAdder(); + final LongAdder processedUpdates = new LongAdder(); + final LongAdder clientBoundEvents = new LongAdder(); + final LongAdder sentClientBoundEvents = new LongAdder(); + private ReactiveApiPublisher(TdlibChannelsSharedHost sharedTdlibServers, Set resultingEventTransformerSet, long userId, String lane) { @@ -113,68 +107,26 @@ public abstract class ReactiveApiPublisher { throw new RuntimeException("Can't load TDLight", e); } this.telegramClient = Flux.create(sink -> { - var path = this.path.get(); - if (path == null) { - sink.error(new IllegalStateException("Path not set!")); - return; - } - DiskQueueToConsumer queue; - try { - var queuePath = path.resolve(".queue"); - if (Files.notExists(queuePath)) { - Files.createDirectories(queuePath); - } - queue = new DiskQueueToConsumer<>(queuePath.resolve("tdlib-events.tape2"), - FileQueueUtils.convert(SignalUtils.serializer()), - FileQueueUtils.convert(SignalUtils.deserializer()), - signal -> { - if (sink.requestedFromDownstream() > 0) { - if (signal != null) { - sink.next(signal); - } - return true; - } else { - return false; - } - } - ); - } catch (Throwable ex) { - LOG.error("Failed to initialize queue {}", userId, ex); - sink.error(ex); - return; - } - try { - queue.startQueue(); - } catch (Throwable ex) { - LOG.error("Failed to initialize queue {}", userId, ex); - sink.error(ex); - return; - } - try { rawTelegramClient.createAndRegisterClient(); } catch (Throwable ex) { LOG.error("Failed to initialize client {}", userId, ex); sink.error(ex); - return; } - - rawTelegramClient.setListener(value -> { + rawTelegramClient.setListener(t -> { if (!sink.isCancelled()) { - queue.add(value); - } - }); - - sink.onDispose(() -> { - rawTelegramClient.dispose(); - try { - queue.close(); - } catch (Exception e) { - LOG.error("Unexpected error while closing the queue", e); + this.receivedUpdates.increment(); + sink.next(t); } }); sink.onCancel(rawTelegramClient::cancel); - }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()); + sink.onDispose(() -> { + rawTelegramClient.dispose(); + }); + }, OverflowStrategy.ERROR).doOnNext(next -> bufferedUpdates.increment()); + + + Stats.STATS.add(this); } public static ReactiveApiPublisher fromToken(TdlibChannelsSharedHost sharedTdlibServers, @@ -216,12 +168,20 @@ public abstract class ReactiveApiPublisher { return transformedFlux; }) - .publish(256); + .publish(512); publishedResultingEvents // Obtain only TDLib-bound events .filter(s -> s instanceof TDLibBoundResultingEvent) - .map(s -> ((TDLibBoundResultingEvent) s)) + .>map(s -> ((TDLibBoundResultingEvent) s)) + + // Buffer requests to avoid halting the event loop + .transform(ReactorUtils.onBackpressureBuffer(path, + "tdlib-bound-events", + false, + new TdlibBoundResultingEventSerializer(), + new TdlibBoundResultingEventDeserializer() + )) // Send requests to tdlib .flatMap(req -> Mono @@ -262,54 +222,17 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof ClientBoundResultingEvent) .cast(ClientBoundResultingEvent.class) .map(ClientBoundResultingEvent::event) - .transform(flux -> Flux.create(sink -> { - try { - var queuePath = path.resolve(".queue"); - if (Files.notExists(queuePath)) { - Files.createDirectories(queuePath); - } - var queue = new DiskQueueToConsumer<>(queuePath.resolve("client-bound-resulting-events.tape2"), - FileQueueUtils.convert(new ClientBoundEventSerializer()), - FileQueueUtils.convert(new ClientBoundEventDeserializer()), - signal -> { - if (sink.requestedFromDownstream() > 0) { - if (signal != null) { - sink.next(signal); - } - return true; - } else { - return false; - } - } - ); - sink.onDispose(queue::close); - flux.subscribeOn(Schedulers.parallel()).subscribe(new CoreSubscriber<>() { - @Override - public void onSubscribe(@NotNull Subscription s) { - sink.onCancel(s::cancel); - s.request(Long.MAX_VALUE); - } - @Override - public void onNext(ClientBoundEvent clientBoundEvent) { - if (!sink.isCancelled()) { - queue.add(clientBoundEvent); - } - } + // Buffer requests to avoid halting the event loop + .doOnNext(clientBoundEvent -> clientBoundEvents.increment()) + .transform(ReactorUtils.onBackpressureBufferSubscribe(path, + "client-bound-resulting-events", + false, + new ClientBoundEventSerializer(), + new ClientBoundEventDeserializer() + )) + .doOnNext(clientBoundEvent -> sentClientBoundEvents.increment()) - @Override - public void onError(Throwable throwable) { - sink.error(throwable); - } - - @Override - public void onComplete() { - } - }); - } catch (IOException ex) { - sink.error(ex); - } - }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic())) .as(ReactorUtils::subscribeOnceUntilUnsubscribe); sharedTdlibServers.events(lane, messagesToSend); @@ -319,6 +242,14 @@ public abstract class ReactiveApiPublisher { .filter(s -> s instanceof ClusterBoundResultingEvent) .cast(ClusterBoundResultingEvent.class) + // Buffer requests to avoid halting the event loop + .as(ReactorUtils.onBackpressureBuffer(path, + "cluster-bound-events", + false, + new ClusterBoundResultingEventSerializer(), + new ClusterBoundResultingEventDeserializer() + )) + // Send events to the cluster .subscribeOn(Schedulers.parallel()) .subscribe(clusterBoundEvent -> { @@ -382,6 +313,7 @@ public abstract class ReactiveApiPublisher { // Update the state var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal)); + processedUpdates.increment(); if (state.authPhase() == LOGGED_IN) { ResultingEvent resultingEvent = wrapUpdateSignal(signal); return List.of(resultingEvent); diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java index e958259..03a3557 100644 --- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java +++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java @@ -1,10 +1,17 @@ package it.tdlight.reactiveapi; +import it.cavallium.filequeue.DiskQueueToConsumer; +import it.tdlight.reactiveapi.rsocket.FileQueueUtils; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayDeque; import java.util.Deque; import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.LongConsumer; import org.jetbrains.annotations.NotNull; import org.reactivestreams.Subscription; @@ -15,6 +22,7 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.FluxSink.OverflowStrategy; import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; +import reactor.core.scheduler.Schedulers; import reactor.util.context.Context; public class ReactorUtils { @@ -137,7 +145,8 @@ public class ReactorUtils { var s = subscriptionAtomicReference.get(); emitter.onRequest(n -> { if (n > maxBufferSize) { - emitter.error(new UnsupportedOperationException("Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize)); + emitter.error(new UnsupportedOperationException( + "Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize)); } else { s.request(n); } @@ -150,6 +159,114 @@ public class ReactorUtils { }); } + public static Function, Flux> onBackpressureBufferSubscribe(Path path, + String name, + boolean persistent, + Serializer serializer, + Deserializer deserializer) { + return flux -> { + AtomicReference> ref = new AtomicReference<>(); + DiskQueueToConsumer queue; + try { + var queuePath = path.resolve(".tdlib-queue"); + if (Files.notExists(queuePath)) { + Files.createDirectories(queuePath); + } + queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"), + !persistent, + FileQueueUtils.convert(serializer), + FileQueueUtils.convert(deserializer), + signal -> { + var sink = ref.get(); + if (sink != null && sink.requestedFromDownstream() > 0) { + if (signal != null) { + sink.next(signal); + } + return true; + } else { + return false; + } + } + ); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + var disposable = flux + .subscribeOn(Schedulers.parallel()) + .publishOn(Schedulers.boundedElastic()) + .subscribe(queue::add); + queue.startQueue(); + return Flux.create(sink -> { + sink.onDispose(() -> { + disposable.dispose(); + queue.close(); + }); + ref.set(sink); + sink.onCancel(() -> ref.set(null)); + }); + }; + } + + public static Function, Flux> onBackpressureBuffer(Path path, + String name, + boolean persistent, + Serializer serializer, + Deserializer deserializer) { + return flux -> Flux.create(sink -> { + try { + var queuePath = path.resolve(".tdlib-queue"); + if (Files.notExists(queuePath)) { + Files.createDirectories(queuePath); + } + var queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"), + !persistent, + FileQueueUtils.convert(serializer), + FileQueueUtils.convert(deserializer), + signal -> { + if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) { + if (signal != null) { + sink.next(signal); + } + return true; + } else { + return false; + } + } + ); + sink.onDispose(queue::close); + flux + .subscribeOn(Schedulers.parallel()) + .publishOn(Schedulers.boundedElastic()) + .subscribe(new CoreSubscriber() { + @Override + public void onSubscribe(@NotNull Subscription s) { + sink.onCancel(s::cancel); + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T element) { + if (!sink.isCancelled()) { + queue.add(element); + } + } + + @Override + public void onError(Throwable throwable) { + sink.error(throwable); + } + + @Override + public void onComplete() { + } + }); + queue.startQueue(); + } catch (IOException ex) { + sink.error(ex); + } + }).subscribeOn(Schedulers.boundedElastic()); + } + private static class WaitingSink implements FluxSink { @Override diff --git a/src/main/java/it/tdlight/reactiveapi/Stats.java b/src/main/java/it/tdlight/reactiveapi/Stats.java new file mode 100644 index 0000000..c3d52b9 --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/Stats.java @@ -0,0 +1,141 @@ +package it.tdlight.reactiveapi; + +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class Stats extends Thread { + + private static final Logger LOG = LogManager.getLogger(Stats.class); + public static final List STATS = new CopyOnWriteArrayList<>(); + public static final long SLEEP_INTERVAL = Duration.ofSeconds(10).toMillis(); + + static { + var stats = new Stats(); + stats.setName("Stats"); + stats.setDaemon(true); + stats.start(); + } + + public static void init() { + + } + + @Override + public void run() { + try { + var prev = System.currentTimeMillis(); + var prevClients = 0; + var prevReceivedUpdates = new LongArrayList(); + var prevBufferedUpdates = new LongArrayList(); + var prevProcessedUpdates = new LongArrayList(); + var prevClientBoundEvents = new LongArrayList(); + var prevSentClientBoundEvents = new LongArrayList(); + while (!Thread.interrupted()) { + //noinspection BusyWait + Thread.sleep(SLEEP_INTERVAL); + var now = System.currentTimeMillis(); + var timeDiffSeconds = (now - prev) / 1000d; + StringBuilder out = new StringBuilder(); + out.append("Statistics. Time delta: %03.2fs%n".formatted(timeDiffSeconds)); + var currentClients = STATS.size(); + var clientIds = new LongArrayList(); + var receivedUpdates = new LongArrayList(); + var bufferedUpdates = new LongArrayList(); + var processedUpdates = new LongArrayList(); + var clientBoundEvents = new LongArrayList(); + var sentClientBoundEvents = new LongArrayList(); + for (ReactiveApiPublisher stat : STATS) { + clientIds.add(stat.userId); + receivedUpdates.add(stat.receivedUpdates.longValue()); + bufferedUpdates.add(stat.bufferedUpdates.longValue()); + processedUpdates.add(stat.processedUpdates.longValue()); + clientBoundEvents.add(stat.clientBoundEvents.longValue()); + sentClientBoundEvents.add(stat.sentClientBoundEvents.longValue()); + } + while (currentClients > prevClients) { + prevClients++; + prevReceivedUpdates.add(0); + prevBufferedUpdates.add(0); + prevProcessedUpdates.add(0); + prevClientBoundEvents.add(0); + prevSentClientBoundEvents.add(0); + } + double receivedUpdatesRateSum = 0; + long ramBufferedSum = 0; + long diskBufferedSum = 0; + double bufferedUpdatesRateSum = 0; + double processedUpdatesRateSum = 0; + double clientBoundEventsRateSum = 0; + double sentClientBoundEventsRateSum = 0; + for (int i = 0; i <= currentClients; i++) { + double receivedUpdatesRate; + long ramBuffered; + long diskBuffered; + double bufferedUpdatesRate; + double processedUpdatesRate; + double clientBoundEventsRate; + double sentClientBoundEventsRate; + if (i != currentClients) { + receivedUpdatesRate = (receivedUpdates.getLong(i) - prevReceivedUpdates.getLong(i)) / timeDiffSeconds; + diskBuffered = bufferedUpdates.getLong(i) - processedUpdates.getLong(i); + ramBuffered = receivedUpdates.getLong(i) - bufferedUpdates.getLong(i); + bufferedUpdatesRate = (bufferedUpdates.getLong(i) - prevBufferedUpdates.getLong(i)) / timeDiffSeconds; + processedUpdatesRate = (processedUpdates.getLong(i) - prevProcessedUpdates.getLong(i)) / timeDiffSeconds; + clientBoundEventsRate = (clientBoundEvents.getLong(i) - prevClientBoundEvents.getLong(i)) / timeDiffSeconds; + sentClientBoundEventsRate = + (sentClientBoundEvents.getLong(i) - prevSentClientBoundEvents.getLong(i)) / timeDiffSeconds; + receivedUpdatesRateSum += receivedUpdatesRate; + diskBufferedSum += diskBuffered; + ramBufferedSum += ramBuffered; + bufferedUpdatesRateSum += bufferedUpdatesRate; + processedUpdatesRateSum += processedUpdatesRate; + clientBoundEventsRateSum += clientBoundEventsRate; + sentClientBoundEventsRateSum += sentClientBoundEventsRate; + out.append(String.format("%d:\t", clientIds.getLong(i))); + } else { + receivedUpdatesRate = receivedUpdatesRateSum; + diskBuffered = diskBufferedSum; + ramBuffered = ramBufferedSum; + bufferedUpdatesRate = bufferedUpdatesRateSum; + processedUpdatesRate = processedUpdatesRateSum; + clientBoundEventsRate = clientBoundEventsRateSum; + sentClientBoundEventsRate = sentClientBoundEventsRateSum; + out.append("Total:\t"); + } + out.append(String.format( + "\tUpdates:\t[received %03.2fHz\tbuffered: %03.2fHz (RAM: %d HDD: %d)\tprocessed: %03.2fHz]\tClient bound events: %03.2fHz\tProcessed events: %03.2fHz\t%n", + receivedUpdatesRate, + bufferedUpdatesRate, + ramBuffered, + diskBuffered, + processedUpdatesRate, + clientBoundEventsRate, + sentClientBoundEventsRate + )); + } + out.append(String.format("%n")); + + for (int i = 0; i < currentClients; i++) { + prevReceivedUpdates = receivedUpdates; + prevBufferedUpdates = bufferedUpdates; + prevProcessedUpdates = processedUpdates; + prevClientBoundEvents = clientBoundEvents; + prevSentClientBoundEvents = sentClientBoundEvents; + } + LOG.debug(out.toString()); + + prev = now; + } + } catch (InterruptedException ex) { + + } + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventDeserializer.java new file mode 100644 index 0000000..802b2be --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventDeserializer.java @@ -0,0 +1,16 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.Function; +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import java.io.DataInput; +import java.io.IOException; + +public class TdlibBoundResultingEventDeserializer implements Deserializer> { + + @Override + public TDLibBoundResultingEvent deserialize(int length, DataInput dataInput) throws IOException { + Function action = (Function) TdApi.Deserializer.deserialize(dataInput); + return new TDLibBoundResultingEvent<>(action, dataInput.readBoolean()); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventSerializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventSerializer.java new file mode 100644 index 0000000..326885a --- /dev/null +++ b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventSerializer.java @@ -0,0 +1,14 @@ +package it.tdlight.reactiveapi; + +import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent; +import java.io.DataOutput; +import java.io.IOException; + +public class TdlibBoundResultingEventSerializer implements Serializer> { + + @Override + public void serialize(TDLibBoundResultingEvent data, DataOutput output) throws IOException { + data.action().serialize(output); + output.writeBoolean(data.ignoreFailure()); + } +} diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java index 7b5386f..d63ce01 100644 --- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java +++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java @@ -6,7 +6,10 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent; import it.tdlight.reactiveapi.Event.OnRequest; import it.tdlight.reactiveapi.Event.OnResponse; import java.io.Closeable; +import java.nio.file.Path; import java.time.Duration; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; diff --git a/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java b/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java index ef471b0..9555c59 100644 --- a/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java +++ b/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java @@ -23,7 +23,7 @@ public class InfiniteQueueBench { AtomicInteger status = new AtomicInteger(); tmpFile.toFile().deleteOnExit(); Files.delete(tmpFile); - try (var queue = new it.cavallium.filequeue.DiskQueueToConsumer(tmpFile, new Serializer() { + try (var queue = new it.cavallium.filequeue.DiskQueueToConsumer(tmpFile, true, new Serializer() { @Override public byte[] serialize(String data) throws IOException { return data.getBytes(StandardCharsets.US_ASCII);