Add missing generics
This commit is contained in:
parent
2f40958971
commit
630b293e8e
2
pom.xml
2
pom.xml
@ -66,7 +66,7 @@
|
||||
<dependency>
|
||||
<groupId>it.tdlight</groupId>
|
||||
<artifactId>tdlight-java</artifactId>
|
||||
<version>2.7.8.16</version>
|
||||
<version>2.7.8.23</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
@ -12,7 +12,7 @@ public interface ReactorTelegramClient {
|
||||
|
||||
Flux<Object> receive();
|
||||
|
||||
Mono<TdApi.Object> send(TdApi.Function query, Duration timeout);
|
||||
<T extends TdApi.Object> Mono<TdApi.Object> send(TdApi.Function<T> query, Duration timeout);
|
||||
|
||||
TdApi.Object execute(TdApi.Function query);
|
||||
<T extends TdApi.Object> TdApi.Object execute(TdApi.Function<T> query);
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ public class ResponseError extends IOException {
|
||||
@NotNull
|
||||
private final String message;
|
||||
|
||||
private ResponseError(@NotNull Function function, @NotNull String botName, @NotNull TdApi.Error tdError, @Nullable Throwable cause) {
|
||||
private ResponseError(@NotNull Function<?> function, @NotNull String botName, @NotNull TdApi.Error tdError, @Nullable Throwable cause) {
|
||||
super("Bot '" + botName + "' failed the request '" + functionToInlineString(function) + "': " + tdError.code + " " + tdError.message, cause);
|
||||
this.botName = botName;
|
||||
this.tag = functionToInlineString(function);
|
||||
@ -34,7 +34,7 @@ public class ResponseError extends IOException {
|
||||
this.message = tdError.message;
|
||||
}
|
||||
|
||||
private ResponseError(@NotNull Function function, @NotNull String botName, @Nullable Throwable cause) {
|
||||
private ResponseError(@NotNull Function<?> function, @NotNull String botName, @Nullable Throwable cause) {
|
||||
super("Bot '" + botName + "' failed the request '" + functionToInlineString(function) + "': " + (cause == null ? null : cause.getMessage()), cause);
|
||||
this.botName = botName;
|
||||
this.tag = functionToInlineString(function);
|
||||
@ -50,7 +50,7 @@ public class ResponseError extends IOException {
|
||||
this.message = (cause == null ? "" : (cause.getMessage() == null ? "" : cause.getMessage()));
|
||||
}
|
||||
|
||||
public static ResponseError newResponseError(@NotNull Function function,
|
||||
public static ResponseError newResponseError(@NotNull Function<?> function,
|
||||
@NotNull String botName,
|
||||
@NotNull TdApi.Error tdError,
|
||||
@Nullable Throwable cause) {
|
||||
@ -71,7 +71,7 @@ public class ResponseError extends IOException {
|
||||
return new ResponseError(tag, botName, tdError, cause);
|
||||
}
|
||||
|
||||
public static ResponseError newResponseError(@NotNull Function function,
|
||||
public static ResponseError newResponseError(@NotNull Function<?> function,
|
||||
@NotNull String botName,
|
||||
@Nullable Throwable cause) {
|
||||
return new ResponseError(function, botName, cause);
|
||||
@ -84,7 +84,7 @@ public class ResponseError extends IOException {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static <T> T get(@NotNull Function function, @NotNull String botName, CompletableFuture<T> action) throws ResponseError {
|
||||
public static <T> T get(@NotNull Function<?> function, @NotNull String botName, CompletableFuture<T> action) throws ResponseError {
|
||||
try {
|
||||
return action.get();
|
||||
} catch (InterruptedException e) {
|
||||
@ -131,7 +131,7 @@ public class ResponseError extends IOException {
|
||||
return message;
|
||||
}
|
||||
|
||||
private static String functionToInlineString(Function function) {
|
||||
private static String functionToInlineString(Function<?> function) {
|
||||
return function
|
||||
.toString()
|
||||
.replace("\n", " ")
|
||||
|
@ -74,7 +74,7 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient {
|
||||
* @throws NullPointerException if query is null.
|
||||
*/
|
||||
@Override
|
||||
public Mono<TdApi.Object> send(TdApi.Function query, Duration timeout) {
|
||||
public <T extends TdApi.Object> Mono<TdApi.Object> send(TdApi.Function<T> query, Duration timeout) {
|
||||
return Mono.from(reactiveTelegramClient.send(query, timeout)).single();
|
||||
}
|
||||
|
||||
@ -86,7 +86,7 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient {
|
||||
* @throws NullPointerException if query is null.
|
||||
*/
|
||||
@Override
|
||||
public TdApi.Object execute(TdApi.Function query) {
|
||||
public <T extends TdApi.Object> TdApi.Object execute(TdApi.Function<T> query) {
|
||||
return reactiveTelegramClient.execute(query);
|
||||
}
|
||||
}
|
||||
|
@ -27,6 +27,6 @@ public interface AsyncTdDirect {
|
||||
* @param synchronous Execute synchronously.
|
||||
* @return The request response or {@link it.tdlight.jni.TdApi.Error}.
|
||||
*/
|
||||
<T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean synchronous);
|
||||
<T extends TdApi.Object> Mono<TdResult<T>> execute(Function<T> request, Duration timeout, boolean synchronous);
|
||||
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean synchronous) {
|
||||
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function<T> request, Duration timeout, boolean synchronous) {
|
||||
if (synchronous) {
|
||||
return MonoUtils.fromBlockingSingle(() -> {
|
||||
var td = this.td.get();
|
||||
|
@ -113,7 +113,7 @@ public class TestClient implements ReactorTelegramClient {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<TdApi.Object> send(Function query, Duration timeout) {
|
||||
public <T extends TdApi.Object> Mono<TdApi.Object> send(Function<T> query, Duration timeout) {
|
||||
return Mono.fromCallable(() -> {
|
||||
TdApi.Object result = executeCommon(query);
|
||||
if (result != null) {
|
||||
@ -124,7 +124,7 @@ public class TestClient implements ReactorTelegramClient {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TdApi.Object execute(Function query) {
|
||||
public <T extends TdApi.Object> TdApi.Object execute(Function<T> query) {
|
||||
TdApi.Object result = executeCommon(query);
|
||||
if (result != null) {
|
||||
return result;
|
||||
@ -133,7 +133,7 @@ public class TestClient implements ReactorTelegramClient {
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public TdApi.Object executeCommon(Function query) {
|
||||
public <T extends TdApi.Object> TdApi.Object executeCommon(Function<T> query) {
|
||||
switch (query.getConstructor()) {
|
||||
case SetLogVerbosityLevel.CONSTRUCTOR:
|
||||
case SetLogTagVerbosityLevel.CONSTRUCTOR:
|
||||
|
@ -186,11 +186,11 @@ public class AsyncTdEasy {
|
||||
* @param timeout Timeout duration.
|
||||
* @return The response or {@link TdApi.Error}.
|
||||
*/
|
||||
public <T extends Object> Mono<TdResult<T>> send(TdApi.Function request, Duration timeout) {
|
||||
public <T extends Object> Mono<TdResult<T>> send(TdApi.Function<T> request, Duration timeout) {
|
||||
return td.execute(request, timeout, false);
|
||||
}
|
||||
|
||||
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(Function obj, boolean synchronous) {
|
||||
private <T extends TdApi.Object> Mono<TdResult<T>> sendDirectly(Function<T> obj, boolean synchronous) {
|
||||
return td.execute(obj, AsyncTdEasy.DEFAULT_TIMEOUT, synchronous);
|
||||
}
|
||||
|
||||
@ -315,7 +315,7 @@ public class AsyncTdEasy {
|
||||
* @param timeout Timeout.
|
||||
* @return The request response.
|
||||
*/
|
||||
public <T extends Object> Mono<TdResult<T>> execute(TdApi.Function request, Duration timeout) {
|
||||
public <T extends Object> Mono<TdResult<T>> execute(TdApi.Function<T> request, Duration timeout) {
|
||||
return td.execute(request, timeout, true);
|
||||
}
|
||||
|
||||
|
@ -24,5 +24,5 @@ public interface AsyncTdMiddle {
|
||||
* @param timeout Timeout.
|
||||
* @param executeSync Execute the function synchronously.
|
||||
*/
|
||||
<T extends TdApi.Object> Mono<TdResult<T>> execute(TdApi.Function request, Duration timeout, boolean executeSync);
|
||||
<T extends TdApi.Object> Mono<TdResult<T>> execute(TdApi.Function<T> request, Duration timeout, boolean executeSync);
|
||||
}
|
||||
|
@ -7,17 +7,17 @@ import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class ExecuteObject {
|
||||
public class ExecuteObject<T extends TdApi.Object> {
|
||||
|
||||
private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec();
|
||||
private static final TdExecuteObjectMessageCodec<?> realCodec = new TdExecuteObjectMessageCodec<>();
|
||||
|
||||
private boolean executeDirectly;
|
||||
private TdApi.Function request;
|
||||
private TdApi.Function<T> request;
|
||||
private Duration timeout;
|
||||
private int pos;
|
||||
private Buffer buffer;
|
||||
|
||||
public ExecuteObject(boolean executeDirectly, Function request, Duration timeout) {
|
||||
public ExecuteObject(boolean executeDirectly, Function<T> request, Duration timeout) {
|
||||
if (request == null) throw new NullPointerException();
|
||||
|
||||
this.executeDirectly = executeDirectly;
|
||||
@ -32,7 +32,8 @@ public class ExecuteObject {
|
||||
|
||||
private void tryDecode() {
|
||||
if (request == null) {
|
||||
var data = realCodec.decodeFromWire(pos, buffer);
|
||||
@SuppressWarnings("unchecked")
|
||||
ExecuteObject<T> data = (ExecuteObject<T>) realCodec.decodeFromWire(pos, buffer);
|
||||
this.executeDirectly = data.executeDirectly;
|
||||
this.request = data.request;
|
||||
this.buffer = null;
|
||||
@ -45,7 +46,7 @@ public class ExecuteObject {
|
||||
return executeDirectly;
|
||||
}
|
||||
|
||||
public TdApi.Function getRequest() {
|
||||
public TdApi.Function<T> getRequest() {
|
||||
tryDecode();
|
||||
return request;
|
||||
}
|
||||
@ -65,7 +66,7 @@ public class ExecuteObject {
|
||||
return false;
|
||||
}
|
||||
|
||||
ExecuteObject that = (ExecuteObject) o;
|
||||
ExecuteObject<?> that = (ExecuteObject<?>) o;
|
||||
|
||||
if (executeDirectly != that.executeDirectly) {
|
||||
return false;
|
||||
|
@ -2,10 +2,12 @@ package it.tdlight.tdlibsession.td.middle;
|
||||
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.eventbus.MessageCodec;
|
||||
import it.tdlight.jni.TdApi;
|
||||
|
||||
public class LazyTdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject, ExecuteObject> {
|
||||
public class LazyTdExecuteObjectMessageCodec<T extends TdApi.Object>
|
||||
implements MessageCodec<ExecuteObject<T>, ExecuteObject<T>> {
|
||||
|
||||
private static final TdExecuteObjectMessageCodec realCodec = new TdExecuteObjectMessageCodec();
|
||||
private static final TdExecuteObjectMessageCodec<?> realCodec = new TdExecuteObjectMessageCodec<>();
|
||||
|
||||
public LazyTdExecuteObjectMessageCodec() {
|
||||
super();
|
||||
@ -17,12 +19,12 @@ public class LazyTdExecuteObjectMessageCodec implements MessageCodec<ExecuteObje
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteObject decodeFromWire(int pos, Buffer buffer) {
|
||||
return new ExecuteObject(pos, buffer);
|
||||
public ExecuteObject<T> decodeFromWire(int pos, Buffer buffer) {
|
||||
return new ExecuteObject<>(pos, buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteObject transform(ExecuteObject t) {
|
||||
public ExecuteObject<T> transform(ExecuteObject t) {
|
||||
// If a message is sent *locally* across the event bus.
|
||||
// This sends message just as is
|
||||
return t;
|
||||
|
@ -7,7 +7,8 @@ import it.tdlight.jni.TdApi.Function;
|
||||
import it.tdlight.utils.BufferUtils;
|
||||
import java.time.Duration;
|
||||
|
||||
public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject, ExecuteObject> {
|
||||
public class TdExecuteObjectMessageCodec<T extends TdApi.Object>
|
||||
implements MessageCodec<ExecuteObject<T>, ExecuteObject<T>> {
|
||||
|
||||
public TdExecuteObjectMessageCodec() {
|
||||
super();
|
||||
@ -22,17 +23,18 @@ public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject,
|
||||
});
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public ExecuteObject decodeFromWire(int pos, Buffer buffer) {
|
||||
return BufferUtils.decode(pos, buffer, is -> new ExecuteObject(
|
||||
public ExecuteObject<T> decodeFromWire(int pos, Buffer buffer) {
|
||||
return BufferUtils.decode(pos, buffer, is -> new ExecuteObject<T>(
|
||||
is.readBoolean(),
|
||||
(Function) TdApi.Deserializer.deserialize(is),
|
||||
(Function<T>) TdApi.Deserializer.deserialize(is),
|
||||
Duration.ofMillis(is.readLong())
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecuteObject transform(ExecuteObject t) {
|
||||
public ExecuteObject<T> transform(ExecuteObject<T> t) {
|
||||
// If a message is sent *locally* across the event bus.
|
||||
// This sends message just as is
|
||||
return t;
|
||||
|
@ -343,8 +343,9 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean executeSync) {
|
||||
var req = new ExecuteObject(executeSync, request, timeout);
|
||||
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function<T> request, Duration timeout,
|
||||
boolean executeSync) {
|
||||
var req = new ExecuteObject<>(executeSync, request, timeout);
|
||||
var deliveryOptions = new DeliveryOptions(this.deliveryOptions)
|
||||
// Timeout + 5s (5 seconds extra are used to wait the graceful server-side timeout response)
|
||||
.setSendTimeout(timeout.toMillis() + 5000);
|
||||
|
@ -102,7 +102,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Object> Mono<TdResult<T>> execute(Function requestFunction,
|
||||
public <T extends Object> Mono<TdResult<T>> execute(Function<T> requestFunction,
|
||||
Duration timeout,
|
||||
boolean executeDirectly) {
|
||||
return td
|
||||
|
@ -90,7 +90,7 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean executeDirectly) {
|
||||
public <T extends Object> Mono<TdResult<T>> execute(Function<T> request, Duration timeout, boolean executeDirectly) {
|
||||
var startError = this.startError.get();
|
||||
if (startError != null) {
|
||||
return Mono.error(startError);
|
||||
|
@ -56,7 +56,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||
|
||||
// Variables configured at startup
|
||||
private final AtomicReference<AsyncTdDirectImpl> td = new AtomicReference<>();
|
||||
private final AtomicReference<MessageConsumer<ExecuteObject>> executeConsumer = new AtomicReference<>();
|
||||
private final AtomicReference<MessageConsumer<ExecuteObject<?>>> executeConsumer = new AtomicReference<>();
|
||||
private final AtomicReference<MessageConsumer<byte[]>> readBinlogConsumer = new AtomicReference<>();
|
||||
private final AtomicReference<MessageConsumer<byte[]>> readyToReceiveConsumer = new AtomicReference<>();
|
||||
private final AtomicReference<MessageConsumer<byte[]>> pingConsumer = new AtomicReference<>();
|
||||
@ -135,10 +135,10 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||
return Mono.<Void>create(registrationSink -> {
|
||||
logger.trace("Preparing listeners");
|
||||
|
||||
MessageConsumer<ExecuteObject> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute");
|
||||
MessageConsumer<ExecuteObject<?>> executeConsumer = vertx.eventBus().consumer(botAddress + ".execute");
|
||||
this.executeConsumer.set(executeConsumer);
|
||||
Flux
|
||||
.<Message<ExecuteObject>>create(sink -> {
|
||||
.<Message<ExecuteObject<?>>>create(sink -> {
|
||||
executeConsumer.handler(sink::next);
|
||||
executeConsumer.endHandler(h -> sink.complete());
|
||||
})
|
||||
@ -254,7 +254,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
||||
/**
|
||||
* Override some requests
|
||||
*/
|
||||
private Function overrideRequest(Function request, int botId) {
|
||||
private <T extends TdApi.Object> Function<T> overrideRequest(Function<T> request, int botId) {
|
||||
if (request.getConstructor() == SetTdlibParameters.CONSTRUCTOR) {
|
||||
// Fix session directory locations
|
||||
var setTdlibParamsObj = (SetTdlibParameters) request;
|
||||
|
@ -1,157 +0,0 @@
|
||||
package it.tdlight.utils;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
|
||||
import static reactor.core.Exceptions.addSuppressed;
|
||||
import static reactor.core.publisher.Operators.cancelledSubscription;
|
||||
import static reactor.core.publisher.Operators.onErrorDropped;
|
||||
import static reactor.core.publisher.Operators.onOperatorError;
|
||||
import static reactor.core.publisher.Operators.setOnce;
|
||||
import static reactor.core.publisher.Operators.terminate;
|
||||
import static reactor.core.scheduler.Schedulers.parallel;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.reactivestreams.Subscription;
|
||||
import org.warp.commonutils.log.Logger;
|
||||
import org.warp.commonutils.log.LoggerFactory;
|
||||
import reactor.core.CoreSubscriber;
|
||||
import reactor.core.Disposable;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
|
||||
public abstract class BatchSubscriber<T> implements CoreSubscriber<T> {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(BatchSubscriber.class);
|
||||
|
||||
private final Scheduler scheduler;
|
||||
private final int batchSize;
|
||||
private final Duration timeout;
|
||||
|
||||
private final BlockingQueue<T> buffer = new LinkedBlockingQueue<>();
|
||||
private final AtomicInteger requests = new AtomicInteger(0);
|
||||
|
||||
private final AtomicReference<Disposable> flushTimer = new AtomicReference<>();
|
||||
private final Runnable flushTask = () -> {
|
||||
log.trace("timeout [{}] -> flush", buffer.size());
|
||||
flush();
|
||||
};
|
||||
|
||||
private volatile Subscription subscription;
|
||||
private static AtomicReferenceFieldUpdater<BatchSubscriber, Subscription> S = newUpdater(BatchSubscriber.class, Subscription.class, "subscription");
|
||||
|
||||
public BatchSubscriber(int batchSize, Duration timeout) {
|
||||
this.batchSize = batchSize;
|
||||
this.timeout = timeout;
|
||||
this.scheduler = parallel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onSubscribe(@NotNull Subscription s) {
|
||||
setOnce(S, this, s);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T record) {
|
||||
try {
|
||||
buffer.add(record);
|
||||
if (requests.get() > 0) {
|
||||
if (buffer.size() >= batchSize) {
|
||||
log.trace("+ value [{}] -> flush", buffer.size());
|
||||
flush();
|
||||
}
|
||||
else {
|
||||
log.trace("+ value [{}] -> flush in {}ms", buffer.size(), timeout.toMillis());
|
||||
scheduleFlush();
|
||||
}
|
||||
}
|
||||
else {
|
||||
log.trace("+ value [{}] -> buffer", buffer.size());
|
||||
}
|
||||
}
|
||||
catch (Throwable t) {
|
||||
onError(onOperatorError(subscription, t, record, currentContext()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) {
|
||||
try {
|
||||
suspendFlush();
|
||||
}
|
||||
catch (Throwable e) {
|
||||
t = addSuppressed(e, t);
|
||||
}
|
||||
}
|
||||
|
||||
onErrorDropped(t, currentContext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
if (S.getAndSet(this, cancelledSubscription()) != cancelledSubscription()) {
|
||||
try {
|
||||
suspendFlush();
|
||||
}
|
||||
catch (Throwable e) { }
|
||||
}
|
||||
}
|
||||
|
||||
// Implement what to do with a batch (either full or partial due to timeout).
|
||||
// Could be publish to another subscriber.
|
||||
public abstract void flush(List<T> batch);
|
||||
|
||||
private void flush() {
|
||||
suspendFlush();
|
||||
|
||||
List<T> batch = new ArrayList<>(batchSize);
|
||||
buffer.drainTo(batch, batchSize);
|
||||
flush(batch);
|
||||
|
||||
requests.decrementAndGet();
|
||||
log.trace("- request [{}]", requests.get());
|
||||
}
|
||||
|
||||
private void scheduleFlush() {
|
||||
flushTimer.updateAndGet(current -> {
|
||||
if (current != null) current.dispose();
|
||||
return scheduler.schedule(flushTask, timeout.toMillis(), MILLISECONDS);
|
||||
});
|
||||
}
|
||||
|
||||
private void suspendFlush() {
|
||||
flushTimer.updateAndGet(current -> {
|
||||
if (current != null) current.dispose();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
public void request() {
|
||||
if (requests.get() == 0 && buffer.size() >= batchSize) {
|
||||
log.trace(". request [{}] -> flush", buffer.size());
|
||||
flush();
|
||||
}
|
||||
else {
|
||||
int required = requests.incrementAndGet() == 1
|
||||
? batchSize - buffer.size()
|
||||
: batchSize;
|
||||
log.trace("+ request [{}] -> request {} values", buffer.size(), required);
|
||||
if (required > 0) {
|
||||
subscription.request(required);
|
||||
}
|
||||
|
||||
if (!buffer.isEmpty()) scheduleFlush();
|
||||
}
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
terminate(S, this);
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user