From 630b293e8ee03c16ddf6ba8650159d4d00a30bc5 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 21 Oct 2021 00:21:43 +0200 Subject: [PATCH] Add missing generics --- pom.xml | 2 +- .../td/ReactorTelegramClient.java | 4 +- .../tdlibsession/td/ResponseError.java | 12 +- .../td/WrappedReactorTelegramClient.java | 4 +- .../tdlibsession/td/direct/AsyncTdDirect.java | 2 +- .../td/direct/AsyncTdDirectImpl.java | 2 +- .../tdlibsession/td/direct/TestClient.java | 6 +- .../tdlibsession/td/easy/AsyncTdEasy.java | 6 +- .../tdlibsession/td/middle/AsyncTdMiddle.java | 2 +- .../tdlibsession/td/middle/ExecuteObject.java | 15 +- .../LazyTdExecuteObjectMessageCodec.java | 12 +- .../middle/TdExecuteObjectMessageCodec.java | 12 +- .../client/AsyncTdMiddleEventBusClient.java | 5 +- .../td/middle/direct/AsyncTdMiddleDirect.java | 2 +- .../td/middle/direct/AsyncTdMiddleLocal.java | 2 +- .../server/AsyncTdMiddleEventBusServer.java | 8 +- .../it/tdlight/utils/BatchSubscriber.java | 157 ------------------ 17 files changed, 51 insertions(+), 202 deletions(-) delete mode 100644 src/main/java/it/tdlight/utils/BatchSubscriber.java diff --git a/pom.xml b/pom.xml index 9e4a93e..4e672e7 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ it.tdlight tdlight-java - 2.7.8.16 + 2.7.8.23 diff --git a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java index 6726c7d..a4f1612 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java @@ -12,7 +12,7 @@ public interface ReactorTelegramClient { Flux receive(); - Mono send(TdApi.Function query, Duration timeout); + Mono send(TdApi.Function query, Duration timeout); - TdApi.Object execute(TdApi.Function query); + TdApi.Object execute(TdApi.Function query); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/ResponseError.java b/src/main/java/it/tdlight/tdlibsession/td/ResponseError.java index 461d850..8f73d16 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/ResponseError.java +++ b/src/main/java/it/tdlight/tdlibsession/td/ResponseError.java @@ -18,7 +18,7 @@ public class ResponseError extends IOException { @NotNull private final String message; - private ResponseError(@NotNull Function function, @NotNull String botName, @NotNull TdApi.Error tdError, @Nullable Throwable cause) { + private ResponseError(@NotNull Function function, @NotNull String botName, @NotNull TdApi.Error tdError, @Nullable Throwable cause) { super("Bot '" + botName + "' failed the request '" + functionToInlineString(function) + "': " + tdError.code + " " + tdError.message, cause); this.botName = botName; this.tag = functionToInlineString(function); @@ -34,7 +34,7 @@ public class ResponseError extends IOException { this.message = tdError.message; } - private ResponseError(@NotNull Function function, @NotNull String botName, @Nullable Throwable cause) { + private ResponseError(@NotNull Function function, @NotNull String botName, @Nullable Throwable cause) { super("Bot '" + botName + "' failed the request '" + functionToInlineString(function) + "': " + (cause == null ? null : cause.getMessage()), cause); this.botName = botName; this.tag = functionToInlineString(function); @@ -50,7 +50,7 @@ public class ResponseError extends IOException { this.message = (cause == null ? "" : (cause.getMessage() == null ? "" : cause.getMessage())); } - public static ResponseError newResponseError(@NotNull Function function, + public static ResponseError newResponseError(@NotNull Function function, @NotNull String botName, @NotNull TdApi.Error tdError, @Nullable Throwable cause) { @@ -71,7 +71,7 @@ public class ResponseError extends IOException { return new ResponseError(tag, botName, tdError, cause); } - public static ResponseError newResponseError(@NotNull Function function, + public static ResponseError newResponseError(@NotNull Function function, @NotNull String botName, @Nullable Throwable cause) { return new ResponseError(function, botName, cause); @@ -84,7 +84,7 @@ public class ResponseError extends IOException { } @Nullable - public static T get(@NotNull Function function, @NotNull String botName, CompletableFuture action) throws ResponseError { + public static T get(@NotNull Function function, @NotNull String botName, CompletableFuture action) throws ResponseError { try { return action.get(); } catch (InterruptedException e) { @@ -131,7 +131,7 @@ public class ResponseError extends IOException { return message; } - private static String functionToInlineString(Function function) { + private static String functionToInlineString(Function function) { return function .toString() .replace("\n", " ") diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java index 7c0204f..f1ef877 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -74,7 +74,7 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { * @throws NullPointerException if query is null. */ @Override - public Mono send(TdApi.Function query, Duration timeout) { + public Mono send(TdApi.Function query, Duration timeout) { return Mono.from(reactiveTelegramClient.send(query, timeout)).single(); } @@ -86,7 +86,7 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { * @throws NullPointerException if query is null. */ @Override - public TdApi.Object execute(TdApi.Function query) { + public TdApi.Object execute(TdApi.Function query) { return reactiveTelegramClient.execute(query); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java index aaa095e..7d12ab5 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java @@ -27,6 +27,6 @@ public interface AsyncTdDirect { * @param synchronous Execute synchronously. * @return The request response or {@link it.tdlight.jni.TdApi.Error}. */ - Mono> execute(Function request, Duration timeout, boolean synchronous); + Mono> execute(Function request, Duration timeout, boolean synchronous); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 67cc8b2..eb28e5f 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -41,7 +41,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } @Override - public Mono> execute(Function request, Duration timeout, boolean synchronous) { + public Mono> execute(Function request, Duration timeout, boolean synchronous) { if (synchronous) { return MonoUtils.fromBlockingSingle(() -> { var td = this.td.get(); diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java index c65f143..d836283 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java @@ -113,7 +113,7 @@ public class TestClient implements ReactorTelegramClient { } @Override - public Mono send(Function query, Duration timeout) { + public Mono send(Function query, Duration timeout) { return Mono.fromCallable(() -> { TdApi.Object result = executeCommon(query); if (result != null) { @@ -124,7 +124,7 @@ public class TestClient implements ReactorTelegramClient { } @Override - public TdApi.Object execute(Function query) { + public TdApi.Object execute(Function query) { TdApi.Object result = executeCommon(query); if (result != null) { return result; @@ -133,7 +133,7 @@ public class TestClient implements ReactorTelegramClient { } @Nullable - public TdApi.Object executeCommon(Function query) { + public TdApi.Object executeCommon(Function query) { switch (query.getConstructor()) { case SetLogVerbosityLevel.CONSTRUCTOR: case SetLogTagVerbosityLevel.CONSTRUCTOR: diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 2424bf0..905ad88 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -186,11 +186,11 @@ public class AsyncTdEasy { * @param timeout Timeout duration. * @return The response or {@link TdApi.Error}. */ - public Mono> send(TdApi.Function request, Duration timeout) { + public Mono> send(TdApi.Function request, Duration timeout) { return td.execute(request, timeout, false); } - private Mono> sendDirectly(Function obj, boolean synchronous) { + private Mono> sendDirectly(Function obj, boolean synchronous) { return td.execute(obj, AsyncTdEasy.DEFAULT_TIMEOUT, synchronous); } @@ -315,7 +315,7 @@ public class AsyncTdEasy { * @param timeout Timeout. * @return The request response. */ - public Mono> execute(TdApi.Function request, Duration timeout) { + public Mono> execute(TdApi.Function request, Duration timeout) { return td.execute(request, timeout, true); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java index 0b5231d..0ff2cc7 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java @@ -24,5 +24,5 @@ public interface AsyncTdMiddle { * @param timeout Timeout. * @param executeSync Execute the function synchronously. */ - Mono> execute(TdApi.Function request, Duration timeout, boolean executeSync); + Mono> execute(TdApi.Function request, Duration timeout, boolean executeSync); } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java b/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java index 89e71bb..140db39 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/ExecuteObject.java @@ -7,17 +7,17 @@ import java.time.Duration; import java.util.Objects; import java.util.StringJoiner; -public class ExecuteObject { +public class ExecuteObject { - private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec(); + private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec<>(); private boolean executeDirectly; - private TdApi.Function request; + private TdApi.Function request; private Duration timeout; private int pos; private Buffer buffer; - public ExecuteObject(boolean executeDirectly, Function request, Duration timeout) { + public ExecuteObject(boolean executeDirectly, Function request, Duration timeout) { if (request == null) throw new NullPointerException(); this.executeDirectly = executeDirectly; @@ -32,7 +32,8 @@ public class ExecuteObject { private void tryDecode() { if (request == null) { - var data = realCodec.decodeFromWire(pos, buffer); + @SuppressWarnings("unchecked") + ExecuteObject data = (ExecuteObject) realCodec.decodeFromWire(pos, buffer); this.executeDirectly = data.executeDirectly; this.request = data.request; this.buffer = null; @@ -45,7 +46,7 @@ public class ExecuteObject { return executeDirectly; } - public TdApi.Function getRequest() { + public TdApi.Function getRequest() { tryDecode(); return request; } @@ -65,7 +66,7 @@ public class ExecuteObject { return false; } - ExecuteObject that = (ExecuteObject) o; + ExecuteObject that = (ExecuteObject) o; if (executeDirectly != that.executeDirectly) { return false; diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java index cfcad7c..78d8a6a 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/LazyTdExecuteObjectMessageCodec.java @@ -2,10 +2,12 @@ package it.tdlight.tdlibsession.td.middle; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; +import it.tdlight.jni.TdApi; -public class LazyTdExecuteObjectMessageCodec implements MessageCodec { +public class LazyTdExecuteObjectMessageCodec + implements MessageCodec, ExecuteObject> { - private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec(); + private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec<>(); public LazyTdExecuteObjectMessageCodec() { super(); @@ -17,12 +19,12 @@ public class LazyTdExecuteObjectMessageCodec implements MessageCodec decodeFromWire(int pos, Buffer buffer) { + return new ExecuteObject<>(pos, buffer); } @Override - public ExecuteObject transform(ExecuteObject t) { + public ExecuteObject transform(ExecuteObject t) { // If a message is sent *locally* across the event bus. // This sends message just as is return t; diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java index e27145d..6227594 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdExecuteObjectMessageCodec.java @@ -7,7 +7,8 @@ import it.tdlight.jni.TdApi.Function; import it.tdlight.utils.BufferUtils; import java.time.Duration; -public class TdExecuteObjectMessageCodec implements MessageCodec { +public class TdExecuteObjectMessageCodec + implements MessageCodec, ExecuteObject> { public TdExecuteObjectMessageCodec() { super(); @@ -22,17 +23,18 @@ public class TdExecuteObjectMessageCodec implements MessageCodec new ExecuteObject( + public ExecuteObject decodeFromWire(int pos, Buffer buffer) { + return BufferUtils.decode(pos, buffer, is -> new ExecuteObject( is.readBoolean(), - (Function) TdApi.Deserializer.deserialize(is), + (Function) TdApi.Deserializer.deserialize(is), Duration.ofMillis(is.readLong()) )); } @Override - public ExecuteObject transform(ExecuteObject t) { + public ExecuteObject transform(ExecuteObject t) { // If a message is sent *locally* across the event bus. // This sends message just as is return t; diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 10c1dd6..da44f11 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -343,8 +343,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { } @Override - public Mono> execute(Function request, Duration timeout, boolean executeSync) { - var req = new ExecuteObject(executeSync, request, timeout); + public Mono> execute(Function request, Duration timeout, + boolean executeSync) { + var req = new ExecuteObject<>(executeSync, request, timeout); var deliveryOptions = new DeliveryOptions(this.deliveryOptions) // Timeout + 5s (5 seconds extra are used to wait the graceful server-side timeout response) .setSendTimeout(timeout.toMillis() + 5000); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 1ecb684..15c6a99 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -102,7 +102,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd } @Override - public Mono> execute(Function requestFunction, + public Mono> execute(Function requestFunction, Duration timeout, boolean executeDirectly) { return td diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index ad159bd..5a9ca2d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -90,7 +90,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { } @Override - public Mono> execute(Function request, Duration timeout, boolean executeDirectly) { + public Mono> execute(Function request, Duration timeout, boolean executeDirectly) { var startError = this.startError.get(); if (startError != null) { return Mono.error(startError); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 04ff8c5..9b1ad81 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -56,7 +56,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { // Variables configured at startup private final AtomicReference td = new AtomicReference<>(); - private final AtomicReference> executeConsumer = new AtomicReference<>(); + private final AtomicReference>> executeConsumer = new AtomicReference<>(); private final AtomicReference> readBinlogConsumer = new AtomicReference<>(); private final AtomicReference> readyToReceiveConsumer = new AtomicReference<>(); private final AtomicReference> pingConsumer = new AtomicReference<>(); @@ -135,10 +135,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { return Mono.create(registrationSink -> { logger.trace("Preparing listeners"); - MessageConsumer executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); + MessageConsumer> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute"); this.executeConsumer.set(executeConsumer); Flux - .>create(sink -> { + .>>create(sink -> { executeConsumer.handler(sink::next); executeConsumer.endHandler(h -> sink.complete()); }) @@ -254,7 +254,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { /** * Override some requests */ - private Function overrideRequest(Function request, int botId) { + private Function overrideRequest(Function request, int botId) { if (request.getConstructor() == SetTdlibParameters.CONSTRUCTOR) { // Fix session directory locations var setTdlibParamsObj = (SetTdlibParameters) request; diff --git a/src/main/java/it/tdlight/utils/BatchSubscriber.java b/src/main/java/it/tdlight/utils/BatchSubscriber.java deleted file mode 100644 index 4ce9224..0000000 --- a/src/main/java/it/tdlight/utils/BatchSubscriber.java +++ /dev/null @@ -1,157 +0,0 @@ -package it.tdlight.utils; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; -import static reactor.core.Exceptions.addSuppressed; -import static reactor.core.publisher.Operators.cancelledSubscription; -import static reactor.core.publisher.Operators.onErrorDropped; -import static reactor.core.publisher.Operators.onOperatorError; -import static reactor.core.publisher.Operators.setOnce; -import static reactor.core.publisher.Operators.terminate; -import static reactor.core.scheduler.Schedulers.parallel; - -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.jetbrains.annotations.NotNull; -import org.reactivestreams.Subscription; -import org.warp.commonutils.log.Logger; -import org.warp.commonutils.log.LoggerFactory; -import reactor.core.CoreSubscriber; -import reactor.core.Disposable; -import reactor.core.scheduler.Scheduler; - -public abstract class BatchSubscriber implements CoreSubscriber { - - private static final Logger log = LoggerFactory.getLogger(BatchSubscriber.class); - - private final Scheduler scheduler; - private final int batchSize; - private final Duration timeout; - - private final BlockingQueue buffer = new LinkedBlockingQueue<>(); - private final AtomicInteger requests = new AtomicInteger(0); - - private final AtomicReference flushTimer = new AtomicReference<>(); - private final Runnable flushTask = () -> { - log.trace("timeout [{}] -> flush", buffer.size()); - flush(); - }; - - private volatile Subscription subscription; - private static AtomicReferenceFieldUpdater S = newUpdater(BatchSubscriber.class, Subscription.class, "subscription"); - - public BatchSubscriber(int batchSize, Duration timeout) { - this.batchSize = batchSize; - this.timeout = timeout; - this.scheduler = parallel(); - } - - @Override - public void onSubscribe(@NotNull Subscription s) { - setOnce(S, this, s); - } - - @Override - public void onNext(T record) { - try { - buffer.add(record); - if (requests.get() > 0) { - if (buffer.size() >= batchSize) { - log.trace("+ value [{}] -> flush", buffer.size()); - flush(); - } - else { - log.trace("+ value [{}] -> flush in {}ms", buffer.size(), timeout.toMillis()); - scheduleFlush(); - } - } - else { - log.trace("+ value [{}] -> buffer", buffer.size()); - } - } - catch (Throwable t) { - onError(onOperatorError(subscription, t, record, currentContext())); - } - } - - @Override - public void onError(Throwable t) { - if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) { - try { - suspendFlush(); - } - catch (Throwable e) { - t = addSuppressed(e, t); - } - } - - onErrorDropped(t, currentContext()); - } - - @Override - public void onComplete() { - if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) { - try { - suspendFlush(); - } - catch (Throwable e) { } - } - } - - // Implement what to do with a batch (either full or partial due to timeout). - // Could be publish to another subscriber. - public abstract void flush(List batch); - - private void flush() { - suspendFlush(); - - List batch = new ArrayList<>(batchSize); - buffer.drainTo(batch, batchSize); - flush(batch); - - requests.decrementAndGet(); - log.trace("- request [{}]", requests.get()); - } - - private void scheduleFlush() { - flushTimer.updateAndGet(current -> { - if (current != null) current.dispose(); - return scheduler.schedule(flushTask, timeout.toMillis(), MILLISECONDS); - }); - } - - private void suspendFlush() { - flushTimer.updateAndGet(current -> { - if (current != null) current.dispose(); - return null; - }); - } - - public void request() { - if (requests.get() == 0 && buffer.size() >= batchSize) { - log.trace(". request [{}] -> flush", buffer.size()); - flush(); - } - else { - int required = requests.incrementAndGet() == 1 - ? batchSize - buffer.size() - : batchSize; - log.trace("+ request [{}] -> request {} values", buffer.size(), required); - if (required > 0) { - subscription.request(required); - } - - if (!buffer.isEmpty()) scheduleFlush(); - } - } - - public void cancel() { - terminate(S, this); - } -} \ No newline at end of file