diff --git a/pom.xml b/pom.xml
index 1370ab6..fa106ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
it.cavallium
filequeue
- 3.0.0
+ 3.0.2
io.projectreactor
diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventDeserializer.java
new file mode 100644
index 0000000..700a031
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventDeserializer.java
@@ -0,0 +1,18 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
+import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed;
+import java.io.DataInput;
+import java.io.IOException;
+
+public class ClusterBoundResultingEventDeserializer implements Deserializer {
+
+ @Override
+ public ClusterBoundResultingEvent deserialize(int length, DataInput dataInput) throws IOException {
+ var type = dataInput.readByte();
+ return switch (type) {
+ case 0 -> new ResultingEventPublisherClosed();
+ default -> throw new UnsupportedOperationException("Unsupported type: " + type);
+ };
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventSerializer.java b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventSerializer.java
new file mode 100644
index 0000000..4fa19d3
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/ClusterBoundResultingEventSerializer.java
@@ -0,0 +1,17 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ClusterBoundResultingEventSerializer implements Serializer {
+
+ @Override
+ public void serialize(ClusterBoundResultingEvent data, DataOutput output) throws IOException {
+ if (data instanceof ResultingEvent.ResultingEventPublisherClosed) {
+ output.writeByte(0x0);
+ } else {
+ throw new UnsupportedOperationException("Unsupported event: " + data);
+ }
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
index 432539c..f978d5c 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactiveApiPublisher.java
@@ -5,7 +5,6 @@ import static it.tdlight.reactiveapi.AuthPhase.LOGGED_OUT;
import static it.tdlight.reactiveapi.Event.SERIAL_VERSION;
import static java.util.Objects.requireNonNull;
-import it.cavallium.filequeue.DiskQueueToConsumer;
import it.tdlight.common.Init;
import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.common.Response;
@@ -37,45 +36,33 @@ import it.tdlight.reactiveapi.ResultingEvent.ClientBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ClusterBoundResultingEvent;
import it.tdlight.reactiveapi.ResultingEvent.ResultingEventPublisherClosed;
import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
-import it.tdlight.reactiveapi.rsocket.FileQueueUtils;
import it.tdlight.tdlight.ClientManager;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
-import reactor.core.publisher.BaseSubscriber;
-import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Schedulers;
@@ -98,6 +85,13 @@ public abstract class ReactiveApiPublisher {
private final AtomicReference disposable = new AtomicReference<>();
private final AtomicReference path = new AtomicReference<>();
+ // Debugging variables
+ final LongAdder receivedUpdates = new LongAdder();
+ final LongAdder bufferedUpdates = new LongAdder();
+ final LongAdder processedUpdates = new LongAdder();
+ final LongAdder clientBoundEvents = new LongAdder();
+ final LongAdder sentClientBoundEvents = new LongAdder();
+
private ReactiveApiPublisher(TdlibChannelsSharedHost sharedTdlibServers,
Set resultingEventTransformerSet,
long userId, String lane) {
@@ -113,68 +107,26 @@ public abstract class ReactiveApiPublisher {
throw new RuntimeException("Can't load TDLight", e);
}
this.telegramClient = Flux.create(sink -> {
- var path = this.path.get();
- if (path == null) {
- sink.error(new IllegalStateException("Path not set!"));
- return;
- }
- DiskQueueToConsumer queue;
- try {
- var queuePath = path.resolve(".queue");
- if (Files.notExists(queuePath)) {
- Files.createDirectories(queuePath);
- }
- queue = new DiskQueueToConsumer<>(queuePath.resolve("tdlib-events.tape2"),
- FileQueueUtils.convert(SignalUtils.serializer()),
- FileQueueUtils.convert(SignalUtils.deserializer()),
- signal -> {
- if (sink.requestedFromDownstream() > 0) {
- if (signal != null) {
- sink.next(signal);
- }
- return true;
- } else {
- return false;
- }
- }
- );
- } catch (Throwable ex) {
- LOG.error("Failed to initialize queue {}", userId, ex);
- sink.error(ex);
- return;
- }
- try {
- queue.startQueue();
- } catch (Throwable ex) {
- LOG.error("Failed to initialize queue {}", userId, ex);
- sink.error(ex);
- return;
- }
-
try {
rawTelegramClient.createAndRegisterClient();
} catch (Throwable ex) {
LOG.error("Failed to initialize client {}", userId, ex);
sink.error(ex);
- return;
}
-
- rawTelegramClient.setListener(value -> {
+ rawTelegramClient.setListener(t -> {
if (!sink.isCancelled()) {
- queue.add(value);
- }
- });
-
- sink.onDispose(() -> {
- rawTelegramClient.dispose();
- try {
- queue.close();
- } catch (Exception e) {
- LOG.error("Unexpected error while closing the queue", e);
+ this.receivedUpdates.increment();
+ sink.next(t);
}
});
sink.onCancel(rawTelegramClient::cancel);
- }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic());
+ sink.onDispose(() -> {
+ rawTelegramClient.dispose();
+ });
+ }, OverflowStrategy.ERROR).doOnNext(next -> bufferedUpdates.increment());
+
+
+ Stats.STATS.add(this);
}
public static ReactiveApiPublisher fromToken(TdlibChannelsSharedHost sharedTdlibServers,
@@ -216,12 +168,20 @@ public abstract class ReactiveApiPublisher {
return transformedFlux;
})
- .publish(256);
+ .publish(512);
publishedResultingEvents
// Obtain only TDLib-bound events
.filter(s -> s instanceof TDLibBoundResultingEvent>)
- .map(s -> ((TDLibBoundResultingEvent>) s))
+ .>map(s -> ((TDLibBoundResultingEvent>) s))
+
+ // Buffer requests to avoid halting the event loop
+ .transform(ReactorUtils.onBackpressureBuffer(path,
+ "tdlib-bound-events",
+ false,
+ new TdlibBoundResultingEventSerializer(),
+ new TdlibBoundResultingEventDeserializer()
+ ))
// Send requests to tdlib
.flatMap(req -> Mono
@@ -262,54 +222,17 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof ClientBoundResultingEvent)
.cast(ClientBoundResultingEvent.class)
.map(ClientBoundResultingEvent::event)
- .transform(flux -> Flux.create(sink -> {
- try {
- var queuePath = path.resolve(".queue");
- if (Files.notExists(queuePath)) {
- Files.createDirectories(queuePath);
- }
- var queue = new DiskQueueToConsumer<>(queuePath.resolve("client-bound-resulting-events.tape2"),
- FileQueueUtils.convert(new ClientBoundEventSerializer()),
- FileQueueUtils.convert(new ClientBoundEventDeserializer()),
- signal -> {
- if (sink.requestedFromDownstream() > 0) {
- if (signal != null) {
- sink.next(signal);
- }
- return true;
- } else {
- return false;
- }
- }
- );
- sink.onDispose(queue::close);
- flux.subscribeOn(Schedulers.parallel()).subscribe(new CoreSubscriber<>() {
- @Override
- public void onSubscribe(@NotNull Subscription s) {
- sink.onCancel(s::cancel);
- s.request(Long.MAX_VALUE);
- }
- @Override
- public void onNext(ClientBoundEvent clientBoundEvent) {
- if (!sink.isCancelled()) {
- queue.add(clientBoundEvent);
- }
- }
+ // Buffer requests to avoid halting the event loop
+ .doOnNext(clientBoundEvent -> clientBoundEvents.increment())
+ .transform(ReactorUtils.onBackpressureBufferSubscribe(path,
+ "client-bound-resulting-events",
+ false,
+ new ClientBoundEventSerializer(),
+ new ClientBoundEventDeserializer()
+ ))
+ .doOnNext(clientBoundEvent -> sentClientBoundEvents.increment())
- @Override
- public void onError(Throwable throwable) {
- sink.error(throwable);
- }
-
- @Override
- public void onComplete() {
- }
- });
- } catch (IOException ex) {
- sink.error(ex);
- }
- }, OverflowStrategy.ERROR).subscribeOn(Schedulers.boundedElastic()))
.as(ReactorUtils::subscribeOnceUntilUnsubscribe);
sharedTdlibServers.events(lane, messagesToSend);
@@ -319,6 +242,14 @@ public abstract class ReactiveApiPublisher {
.filter(s -> s instanceof ClusterBoundResultingEvent)
.cast(ClusterBoundResultingEvent.class)
+ // Buffer requests to avoid halting the event loop
+ .as(ReactorUtils.onBackpressureBuffer(path,
+ "cluster-bound-events",
+ false,
+ new ClusterBoundResultingEventSerializer(),
+ new ClusterBoundResultingEventDeserializer()
+ ))
+
// Send events to the cluster
.subscribeOn(Schedulers.parallel())
.subscribe(clusterBoundEvent -> {
@@ -382,6 +313,7 @@ public abstract class ReactiveApiPublisher {
// Update the state
var state = this.state.updateAndGet(oldState -> oldState.withSignal(signal));
+ processedUpdates.increment();
if (state.authPhase() == LOGGED_IN) {
ResultingEvent resultingEvent = wrapUpdateSignal(signal);
return List.of(resultingEvent);
diff --git a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
index e958259..03a3557 100644
--- a/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
+++ b/src/main/java/it/tdlight/reactiveapi/ReactorUtils.java
@@ -1,10 +1,17 @@
package it.tdlight.reactiveapi;
+import it.cavallium.filequeue.DiskQueueToConsumer;
+import it.tdlight.reactiveapi.rsocket.FileQueueUtils;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.LongConsumer;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscription;
@@ -15,6 +22,7 @@ import reactor.core.publisher.FluxSink;
import reactor.core.publisher.FluxSink.OverflowStrategy;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
+import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
public class ReactorUtils {
@@ -137,7 +145,8 @@ public class ReactorUtils {
var s = subscriptionAtomicReference.get();
emitter.onRequest(n -> {
if (n > maxBufferSize) {
- emitter.error(new UnsupportedOperationException("Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize));
+ emitter.error(new UnsupportedOperationException(
+ "Requests count is bigger than max buffer size! " + n + " > " + maxBufferSize));
} else {
s.request(n);
}
@@ -150,6 +159,114 @@ public class ReactorUtils {
});
}
+ public static Function, Flux> onBackpressureBufferSubscribe(Path path,
+ String name,
+ boolean persistent,
+ Serializer serializer,
+ Deserializer deserializer) {
+ return flux -> {
+ AtomicReference> ref = new AtomicReference<>();
+ DiskQueueToConsumer queue;
+ try {
+ var queuePath = path.resolve(".tdlib-queue");
+ if (Files.notExists(queuePath)) {
+ Files.createDirectories(queuePath);
+ }
+ queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"),
+ !persistent,
+ FileQueueUtils.convert(serializer),
+ FileQueueUtils.convert(deserializer),
+ signal -> {
+ var sink = ref.get();
+ if (sink != null && sink.requestedFromDownstream() > 0) {
+ if (signal != null) {
+ sink.next(signal);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ );
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ var disposable = flux
+ .subscribeOn(Schedulers.parallel())
+ .publishOn(Schedulers.boundedElastic())
+ .subscribe(queue::add);
+ queue.startQueue();
+ return Flux.create(sink -> {
+ sink.onDispose(() -> {
+ disposable.dispose();
+ queue.close();
+ });
+ ref.set(sink);
+ sink.onCancel(() -> ref.set(null));
+ });
+ };
+ }
+
+ public static Function, Flux> onBackpressureBuffer(Path path,
+ String name,
+ boolean persistent,
+ Serializer serializer,
+ Deserializer deserializer) {
+ return flux -> Flux.create(sink -> {
+ try {
+ var queuePath = path.resolve(".tdlib-queue");
+ if (Files.notExists(queuePath)) {
+ Files.createDirectories(queuePath);
+ }
+ var queue = new DiskQueueToConsumer<>(queuePath.resolve(name + ".tape2"),
+ !persistent,
+ FileQueueUtils.convert(serializer),
+ FileQueueUtils.convert(deserializer),
+ signal -> {
+ if (sink.requestedFromDownstream() > 0 && !sink.isCancelled()) {
+ if (signal != null) {
+ sink.next(signal);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+ );
+ sink.onDispose(queue::close);
+ flux
+ .subscribeOn(Schedulers.parallel())
+ .publishOn(Schedulers.boundedElastic())
+ .subscribe(new CoreSubscriber() {
+ @Override
+ public void onSubscribe(@NotNull Subscription s) {
+ sink.onCancel(s::cancel);
+ s.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(T element) {
+ if (!sink.isCancelled()) {
+ queue.add(element);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ sink.error(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ });
+ queue.startQueue();
+ } catch (IOException ex) {
+ sink.error(ex);
+ }
+ }).subscribeOn(Schedulers.boundedElastic());
+ }
+
private static class WaitingSink implements FluxSink {
@Override
diff --git a/src/main/java/it/tdlight/reactiveapi/Stats.java b/src/main/java/it/tdlight/reactiveapi/Stats.java
new file mode 100644
index 0000000..c3d52b9
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/Stats.java
@@ -0,0 +1,141 @@
+package it.tdlight.reactiveapi;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class Stats extends Thread {
+
+ private static final Logger LOG = LogManager.getLogger(Stats.class);
+ public static final List STATS = new CopyOnWriteArrayList<>();
+ public static final long SLEEP_INTERVAL = Duration.ofSeconds(10).toMillis();
+
+ static {
+ var stats = new Stats();
+ stats.setName("Stats");
+ stats.setDaemon(true);
+ stats.start();
+ }
+
+ public static void init() {
+
+ }
+
+ @Override
+ public void run() {
+ try {
+ var prev = System.currentTimeMillis();
+ var prevClients = 0;
+ var prevReceivedUpdates = new LongArrayList();
+ var prevBufferedUpdates = new LongArrayList();
+ var prevProcessedUpdates = new LongArrayList();
+ var prevClientBoundEvents = new LongArrayList();
+ var prevSentClientBoundEvents = new LongArrayList();
+ while (!Thread.interrupted()) {
+ //noinspection BusyWait
+ Thread.sleep(SLEEP_INTERVAL);
+ var now = System.currentTimeMillis();
+ var timeDiffSeconds = (now - prev) / 1000d;
+ StringBuilder out = new StringBuilder();
+ out.append("Statistics. Time delta: %03.2fs%n".formatted(timeDiffSeconds));
+ var currentClients = STATS.size();
+ var clientIds = new LongArrayList();
+ var receivedUpdates = new LongArrayList();
+ var bufferedUpdates = new LongArrayList();
+ var processedUpdates = new LongArrayList();
+ var clientBoundEvents = new LongArrayList();
+ var sentClientBoundEvents = new LongArrayList();
+ for (ReactiveApiPublisher stat : STATS) {
+ clientIds.add(stat.userId);
+ receivedUpdates.add(stat.receivedUpdates.longValue());
+ bufferedUpdates.add(stat.bufferedUpdates.longValue());
+ processedUpdates.add(stat.processedUpdates.longValue());
+ clientBoundEvents.add(stat.clientBoundEvents.longValue());
+ sentClientBoundEvents.add(stat.sentClientBoundEvents.longValue());
+ }
+ while (currentClients > prevClients) {
+ prevClients++;
+ prevReceivedUpdates.add(0);
+ prevBufferedUpdates.add(0);
+ prevProcessedUpdates.add(0);
+ prevClientBoundEvents.add(0);
+ prevSentClientBoundEvents.add(0);
+ }
+ double receivedUpdatesRateSum = 0;
+ long ramBufferedSum = 0;
+ long diskBufferedSum = 0;
+ double bufferedUpdatesRateSum = 0;
+ double processedUpdatesRateSum = 0;
+ double clientBoundEventsRateSum = 0;
+ double sentClientBoundEventsRateSum = 0;
+ for (int i = 0; i <= currentClients; i++) {
+ double receivedUpdatesRate;
+ long ramBuffered;
+ long diskBuffered;
+ double bufferedUpdatesRate;
+ double processedUpdatesRate;
+ double clientBoundEventsRate;
+ double sentClientBoundEventsRate;
+ if (i != currentClients) {
+ receivedUpdatesRate = (receivedUpdates.getLong(i) - prevReceivedUpdates.getLong(i)) / timeDiffSeconds;
+ diskBuffered = bufferedUpdates.getLong(i) - processedUpdates.getLong(i);
+ ramBuffered = receivedUpdates.getLong(i) - bufferedUpdates.getLong(i);
+ bufferedUpdatesRate = (bufferedUpdates.getLong(i) - prevBufferedUpdates.getLong(i)) / timeDiffSeconds;
+ processedUpdatesRate = (processedUpdates.getLong(i) - prevProcessedUpdates.getLong(i)) / timeDiffSeconds;
+ clientBoundEventsRate = (clientBoundEvents.getLong(i) - prevClientBoundEvents.getLong(i)) / timeDiffSeconds;
+ sentClientBoundEventsRate =
+ (sentClientBoundEvents.getLong(i) - prevSentClientBoundEvents.getLong(i)) / timeDiffSeconds;
+ receivedUpdatesRateSum += receivedUpdatesRate;
+ diskBufferedSum += diskBuffered;
+ ramBufferedSum += ramBuffered;
+ bufferedUpdatesRateSum += bufferedUpdatesRate;
+ processedUpdatesRateSum += processedUpdatesRate;
+ clientBoundEventsRateSum += clientBoundEventsRate;
+ sentClientBoundEventsRateSum += sentClientBoundEventsRate;
+ out.append(String.format("%d:\t", clientIds.getLong(i)));
+ } else {
+ receivedUpdatesRate = receivedUpdatesRateSum;
+ diskBuffered = diskBufferedSum;
+ ramBuffered = ramBufferedSum;
+ bufferedUpdatesRate = bufferedUpdatesRateSum;
+ processedUpdatesRate = processedUpdatesRateSum;
+ clientBoundEventsRate = clientBoundEventsRateSum;
+ sentClientBoundEventsRate = sentClientBoundEventsRateSum;
+ out.append("Total:\t");
+ }
+ out.append(String.format(
+ "\tUpdates:\t[received %03.2fHz\tbuffered: %03.2fHz (RAM: %d HDD: %d)\tprocessed: %03.2fHz]\tClient bound events: %03.2fHz\tProcessed events: %03.2fHz\t%n",
+ receivedUpdatesRate,
+ bufferedUpdatesRate,
+ ramBuffered,
+ diskBuffered,
+ processedUpdatesRate,
+ clientBoundEventsRate,
+ sentClientBoundEventsRate
+ ));
+ }
+ out.append(String.format("%n"));
+
+ for (int i = 0; i < currentClients; i++) {
+ prevReceivedUpdates = receivedUpdates;
+ prevBufferedUpdates = bufferedUpdates;
+ prevProcessedUpdates = processedUpdates;
+ prevClientBoundEvents = clientBoundEvents;
+ prevSentClientBoundEvents = sentClientBoundEvents;
+ }
+ LOG.debug(out.toString());
+
+ prev = now;
+ }
+ } catch (InterruptedException ex) {
+
+ }
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventDeserializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventDeserializer.java
new file mode 100644
index 0000000..802b2be
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventDeserializer.java
@@ -0,0 +1,16 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.jni.TdApi;
+import it.tdlight.jni.TdApi.Function;
+import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
+import java.io.DataInput;
+import java.io.IOException;
+
+public class TdlibBoundResultingEventDeserializer implements Deserializer> {
+
+ @Override
+ public TDLibBoundResultingEvent> deserialize(int length, DataInput dataInput) throws IOException {
+ Function> action = (Function>) TdApi.Deserializer.deserialize(dataInput);
+ return new TDLibBoundResultingEvent<>(action, dataInput.readBoolean());
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventSerializer.java b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventSerializer.java
new file mode 100644
index 0000000..326885a
--- /dev/null
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibBoundResultingEventSerializer.java
@@ -0,0 +1,14 @@
+package it.tdlight.reactiveapi;
+
+import it.tdlight.reactiveapi.ResultingEvent.TDLibBoundResultingEvent;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class TdlibBoundResultingEventSerializer implements Serializer> {
+
+ @Override
+ public void serialize(TDLibBoundResultingEvent> data, DataOutput output) throws IOException {
+ data.action().serialize(output);
+ output.writeBoolean(data.ignoreFailure());
+ }
+}
diff --git a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java
index 7b5386f..d63ce01 100644
--- a/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java
+++ b/src/main/java/it/tdlight/reactiveapi/TdlibChannelsSharedHost.java
@@ -6,7 +6,10 @@ import it.tdlight.reactiveapi.Event.ClientBoundEvent;
import it.tdlight.reactiveapi.Event.OnRequest;
import it.tdlight.reactiveapi.Event.OnResponse;
import java.io.Closeable;
+import java.nio.file.Path;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
diff --git a/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java b/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java
index ef471b0..9555c59 100644
--- a/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java
+++ b/src/test/java/it/tdlight/reactiveapi/test/InfiniteQueueBench.java
@@ -23,7 +23,7 @@ public class InfiniteQueueBench {
AtomicInteger status = new AtomicInteger();
tmpFile.toFile().deleteOnExit();
Files.delete(tmpFile);
- try (var queue = new it.cavallium.filequeue.DiskQueueToConsumer(tmpFile, new Serializer() {
+ try (var queue = new it.cavallium.filequeue.DiskQueueToConsumer(tmpFile, true, new Serializer() {
@Override
public byte[] serialize(String data) throws IOException {
return data.getBytes(StandardCharsets.US_ASCII);