diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 4693e13..240edd2 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -340,7 +340,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { } })) .limitRate(Math.max(1, tdOptions.getEventsSize())) - .transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100))) + .transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100), false)) //.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) //.map(List::of) .limitRate(Math.max(1, tdOptions.getEventsSize())) diff --git a/src/main/java/it/tdlight/utils/BufferTimeOutPublisher.java b/src/main/java/it/tdlight/utils/BufferTimeOutPublisher.java index 160c6cf..5650fe6 100644 --- a/src/main/java/it/tdlight/utils/BufferTimeOutPublisher.java +++ b/src/main/java/it/tdlight/utils/BufferTimeOutPublisher.java @@ -16,6 +16,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import reactor.core.CoreSubscriber; @SuppressWarnings("ReactiveStreamsPublisherImplementation") public class BufferTimeOutPublisher implements Publisher> { @@ -25,23 +26,26 @@ public class BufferTimeOutPublisher implements Publisher> { private final Publisher source; private final int size; private final long duration; + private final boolean discardOnError; - public BufferTimeOutPublisher(Publisher source, int size, Duration duration) { + public BufferTimeOutPublisher(Publisher source, int size, Duration duration, boolean discardOnError) { this.source = source; this.size = size; this.duration = duration.toMillis(); + this.discardOnError = discardOnError; } @Override public void subscribe(Subscriber> subscriber) { - subscriber.onSubscribe(new BufferTimeOutSubscription(source, subscriber, size, duration)); + subscriber.onSubscribe(new BufferTimeOutSubscription(source, subscriber, size, duration, discardOnError)); } - protected static class BufferTimeOutSubscription implements Subscription, Subscriber { + protected static class BufferTimeOutSubscription implements Subscription, CoreSubscriber { private final Subscriber> subscriber; private final int size; private final long duration; + private final boolean discardOnError; private Subscription subscription; private final ReentrantLock lock = new ReentrantLock(); @@ -59,10 +63,12 @@ public class BufferTimeOutPublisher implements Publisher> { public BufferTimeOutSubscription(Publisher source, Subscriber> subscriber, int size, - long duration) { + long duration, + boolean discardOnError) { this.subscriber = subscriber; this.size = size; this.duration = duration; + this.discardOnError = discardOnError; this.buffer = new ArrayList<>(size); source.subscribe(this); } @@ -144,8 +150,19 @@ public class BufferTimeOutPublisher implements Publisher> { @Override public void onError(Throwable t) { - scheduledFuture.cancel(false); - subscriber.onError(t); + if (discardOnError) { + scheduledFuture.cancel(false); + subscriber.onError(t); + } else { + lock.lock(); + try { + checkSend(); + scheduledFuture.cancel(false); + subscriber.onError(t); + } finally { + lock.unlock(); + } + } } @Override