Bugfix
This commit is contained in:
parent
1b2fc7e4c1
commit
20862694d2
@ -4,6 +4,7 @@ import com.akaita.java.rxjava2debug.RxJava2Debug;
|
|||||||
import io.vertx.core.DeploymentOptions;
|
import io.vertx.core.DeploymentOptions;
|
||||||
import io.vertx.core.json.JsonObject;
|
import io.vertx.core.json.JsonObject;
|
||||||
import io.vertx.core.net.JksOptions;
|
import io.vertx.core.net.JksOptions;
|
||||||
|
import io.vertx.reactivex.core.eventbus.Message;
|
||||||
import io.vertx.reactivex.core.eventbus.MessageConsumer;
|
import io.vertx.reactivex.core.eventbus.MessageConsumer;
|
||||||
import it.tdlight.common.Init;
|
import it.tdlight.common.Init;
|
||||||
import it.tdlight.common.utils.CantLoadLibrary;
|
import it.tdlight.common.utils.CantLoadLibrary;
|
||||||
@ -22,11 +23,13 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.Sinks;
|
import reactor.core.publisher.Sinks;
|
||||||
import reactor.core.publisher.Sinks.One;
|
import reactor.core.publisher.Sinks.One;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
import reactor.tools.agent.ReactorDebugAgent;
|
import reactor.tools.agent.ReactorDebugAgent;
|
||||||
|
import reactor.util.function.Tuple2;
|
||||||
|
|
||||||
public class TDLibRemoteClient implements AutoCloseable {
|
public class TDLibRemoteClient implements AutoCloseable {
|
||||||
|
|
||||||
@ -130,7 +133,7 @@ public class TDLibRemoteClient implements AutoCloseable {
|
|||||||
.setPassword(securityInfo.getTrustStorePassword());
|
.setPassword(securityInfo.getTrustStorePassword());
|
||||||
|
|
||||||
return MonoUtils
|
return MonoUtils
|
||||||
.fromBlockingMaybe(() -> {
|
.fromBlockingEmpty(() -> {
|
||||||
// Set verbosity level here, before creating the bots
|
// Set verbosity level here, before creating the bots
|
||||||
if (Files.notExists(Paths.get("logs"))) {
|
if (Files.notExists(Paths.get("logs"))) {
|
||||||
try {
|
try {
|
||||||
@ -141,7 +144,6 @@ public class TDLibRemoteClient implements AutoCloseable {
|
|||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname);
|
"TDLib remote client is being hosted on" + netInterface + ":" + port + ". Master: " + masterHostname);
|
||||||
return null;
|
|
||||||
})
|
})
|
||||||
.then(TdClusterManager.ofNodes(keyStoreOptions,
|
.then(TdClusterManager.ofNodes(keyStoreOptions,
|
||||||
trustStoreOptions,
|
trustStoreOptions,
|
||||||
@ -161,9 +163,20 @@ public class TDLibRemoteClient implements AutoCloseable {
|
|||||||
.single()
|
.single()
|
||||||
.flatMap(clusterManager -> {
|
.flatMap(clusterManager -> {
|
||||||
MessageConsumer<StartSessionMessage> startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot");
|
MessageConsumer<StartSessionMessage> startBotConsumer = clusterManager.getEventBus().consumer("bots.start-bot");
|
||||||
startBotConsumer.handler(msg -> {
|
return MonoUtils
|
||||||
|
.fromReplyableResolvedMessageConsumer(startBotConsumer)
|
||||||
|
.flatMap(tuple -> this.listenForStartBotsCommand(clusterManager, tuple.getT1(), tuple.getT2()));
|
||||||
|
})
|
||||||
|
.then();
|
||||||
|
}
|
||||||
|
|
||||||
StartSessionMessage req = msg.body();
|
private Mono<Void> listenForStartBotsCommand(TdClusterManager clusterManager,
|
||||||
|
Mono<Void> completion,
|
||||||
|
Flux<Tuple2<Message<?>, StartSessionMessage>> messages) {
|
||||||
|
return MonoUtils
|
||||||
|
.fromBlockingEmpty(() -> messages
|
||||||
|
.flatMapSequential(msg -> {
|
||||||
|
StartSessionMessage req = msg.getT2();
|
||||||
DeploymentOptions deploymentOptions = clusterManager
|
DeploymentOptions deploymentOptions = clusterManager
|
||||||
.newDeploymentOpts()
|
.newDeploymentOpts()
|
||||||
.setConfig(new JsonObject()
|
.setConfig(new JsonObject()
|
||||||
@ -178,29 +191,24 @@ public class TDLibRemoteClient implements AutoCloseable {
|
|||||||
var mediaPath = getMediaDirectory(req.id());
|
var mediaPath = getMediaDirectory(req.id());
|
||||||
var blPath = getSessionBinlogDirectory(req.id());
|
var blPath = getSessionBinlogDirectory(req.id());
|
||||||
|
|
||||||
Schedulers.boundedElastic().schedule(() -> {
|
return BinlogUtils
|
||||||
BinlogUtils
|
|
||||||
.chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
|
.chooseBinlog(clusterManager.getVertx().fileSystem(), blPath, req.binlog(), req.binlogDate())
|
||||||
.then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath))
|
.then(BinlogUtils.cleanSessionPath(clusterManager.getVertx().fileSystem(), blPath, sessPath, mediaPath))
|
||||||
.then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
|
.then(clusterManager.getVertx().rxDeployVerticle(verticle, deploymentOptions).as(MonoUtils::toMono))
|
||||||
.then(MonoUtils.fromBlockingMaybe(() -> {
|
.then(MonoUtils.fromBlockingEmpty(() -> msg.getT1().reply(new byte[0])))
|
||||||
msg.reply(new byte[0]);
|
.onErrorResume(ex -> {
|
||||||
return null;
|
msg.getT1().fail(500, "Failed to deploy bot verticle: " + ex.getMessage());
|
||||||
}))
|
logger.error("Failed to deploy bot verticle", ex);
|
||||||
.publishOn(Schedulers.single())
|
return Mono.empty();
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.publishOn(Schedulers.parallel())
|
||||||
.subscribe(
|
.subscribe(
|
||||||
v -> {},
|
v -> {},
|
||||||
ex -> {
|
ex -> logger.error("Bots starter activity crashed. From now on, no new bots can be started anymore", ex)
|
||||||
logger.error("Failed to deploy bot verticle", ex);
|
)
|
||||||
msg.fail(500, "Failed to deploy bot verticle: " + ex.getMessage());
|
)
|
||||||
},
|
.then(completion);
|
||||||
() -> {}
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
return startBotConsumer.rxCompletionHandler().as(MonoUtils::toMono);
|
|
||||||
})
|
|
||||||
.then();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Path getSessionDirectory(int botId) {
|
public static Path getSessionDirectory(int botId) {
|
||||||
|
@ -100,6 +100,7 @@ public class AsyncTdEasy {
|
|||||||
logger.warn("Updates stream has closed while"
|
logger.warn("Updates stream has closed while"
|
||||||
+ " the current authorization state is"
|
+ " the current authorization state is"
|
||||||
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
||||||
|
this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED);
|
||||||
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -110,6 +111,7 @@ public class AsyncTdEasy {
|
|||||||
logger.warn("Updates stream has terminated with an error while"
|
logger.warn("Updates stream has terminated with an error while"
|
||||||
+ " the current authorization state is"
|
+ " the current authorization state is"
|
||||||
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
+ " still {}. Setting authorization state as closed!", authState.getClass().getSimpleName());
|
||||||
|
this.fatalError.tryEmitValue(FatalErrorType.CONNECTION_KILLED);
|
||||||
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
this.authState.tryEmitNext(new AuthorizationStateClosed());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -152,10 +152,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
Mono
|
Mono
|
||||||
.defer(() -> {
|
.defer(() -> {
|
||||||
logger.trace("Requesting ping...");
|
logger.trace("Requesting ping...");
|
||||||
return cluster.getEventBus().<byte[]>rxRequest(botAddress + ".ping",
|
return cluster.getEventBus()
|
||||||
EMPTY,
|
.<byte[]>rxRequest(botAddress + ".ping", EMPTY, deliveryOptionsWithTimeout).as(MonoUtils::toMono);
|
||||||
deliveryOptionsWithTimeout
|
|
||||||
).as(MonoUtils::toMono);
|
|
||||||
})
|
})
|
||||||
.flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic()))
|
.flatMap(msg -> Mono.fromCallable(() -> msg.body()).subscribeOn(Schedulers.boundedElastic()))
|
||||||
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
|
.repeatWhen(l -> l.delayElements(Duration.ofSeconds(10)).takeWhile(x -> true))
|
||||||
@ -164,12 +162,13 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
}), this.crash.asMono().onErrorResume(ex -> Mono.empty()).doOnTerminate(() -> {
|
}), this.crash.asMono().onErrorResume(ex -> Mono.empty()).doOnTerminate(() -> {
|
||||||
logger.trace("About to kill pinger because it has seen a crash signal");
|
logger.trace("About to kill pinger because it has seen a crash signal");
|
||||||
})))
|
})))
|
||||||
.doOnNext(s -> logger.warn("REPEATING PING"))
|
.doOnNext(s -> logger.trace("PING"))
|
||||||
.map(x -> 1)
|
|
||||||
.defaultIfEmpty(0)
|
|
||||||
.doOnNext(s -> logger.warn("PING"))
|
|
||||||
.then()
|
.then()
|
||||||
.doOnNext(s -> logger.warn("END PING"))
|
.onErrorResume(ex -> {
|
||||||
|
logger.trace("Ping failed", ex);
|
||||||
|
return Mono.empty();
|
||||||
|
})
|
||||||
|
.doOnNext(s -> logger.debug("END PING"))
|
||||||
.then(MonoUtils.emitEmpty(this.pingFail))
|
.then(MonoUtils.emitEmpty(this.pingFail))
|
||||||
.subscribeOn(Schedulers.single())
|
.subscribeOn(Schedulers.single())
|
||||||
.subscribe();
|
.subscribe();
|
||||||
@ -209,7 +208,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
|
|||||||
.then(updates.asMono().publishOn(Schedulers.single()))
|
.then(updates.asMono().publishOn(Schedulers.single()))
|
||||||
.timeout(Duration.ofSeconds(5))
|
.timeout(Duration.ofSeconds(5))
|
||||||
.publishOn(Schedulers.single())
|
.publishOn(Schedulers.single())
|
||||||
.flatMap(MonoUtils::fromConsumerAdvanced)
|
.flatMap(MonoUtils::fromMessageConsumer)
|
||||||
.flatMapMany(registration -> Mono
|
.flatMapMany(registration -> Mono
|
||||||
.fromRunnable(() -> logger.trace("Registering updates flux"))
|
.fromRunnable(() -> logger.trace("Registering updates flux"))
|
||||||
.then(registration.getT1())
|
.then(registration.getT1())
|
||||||
|
@ -17,24 +17,23 @@ import it.tdlight.jni.TdApi.Update;
|
|||||||
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
|
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
|
||||||
import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient;
|
import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient;
|
||||||
import it.tdlight.tdlibsession.td.TdError;
|
import it.tdlight.tdlibsession.td.TdError;
|
||||||
import it.tdlight.tdlibsession.td.middle.TdResultMessage;
|
|
||||||
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
|
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
|
||||||
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
|
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
|
||||||
import it.tdlight.tdlibsession.td.direct.TelegramClientFactory;
|
import it.tdlight.tdlibsession.td.direct.TelegramClientFactory;
|
||||||
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
|
import it.tdlight.tdlibsession.td.middle.ExecuteObject;
|
||||||
import it.tdlight.tdlibsession.td.middle.TdResultList;
|
import it.tdlight.tdlibsession.td.middle.TdResultList;
|
||||||
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
|
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
|
||||||
|
import it.tdlight.tdlibsession.td.middle.TdResultMessage;
|
||||||
import it.tdlight.utils.BinlogUtils;
|
import it.tdlight.utils.BinlogUtils;
|
||||||
import it.tdlight.utils.MonoUtils;
|
import it.tdlight.utils.MonoUtils;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.Collections;
|
import java.util.List;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
import reactor.core.publisher.Mono;
|
import reactor.core.publisher.Mono;
|
||||||
import reactor.core.publisher.Sinks;
|
import reactor.core.publisher.Sinks;
|
||||||
import reactor.core.publisher.Sinks.Empty;
|
|
||||||
import reactor.core.publisher.Sinks.One;
|
import reactor.core.publisher.Sinks.One;
|
||||||
import reactor.core.scheduler.Schedulers;
|
import reactor.core.scheduler.Schedulers;
|
||||||
import reactor.util.function.Tuples;
|
import reactor.util.function.Tuples;
|
||||||
@ -63,7 +62,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
|||||||
private final One<MessageConsumer<byte[]>> readyToReceiveConsumer = Sinks.one();
|
private final One<MessageConsumer<byte[]>> readyToReceiveConsumer = Sinks.one();
|
||||||
private final One<MessageConsumer<byte[]>> pingConsumer = Sinks.one();
|
private final One<MessageConsumer<byte[]>> pingConsumer = Sinks.one();
|
||||||
private final One<Flux<Void>> pipeFlux = Sinks.one();
|
private final One<Flux<Void>> pipeFlux = Sinks.one();
|
||||||
private final Empty<Void> terminatePingOverPipeFlux = Sinks.empty();
|
|
||||||
|
|
||||||
public AsyncTdMiddleEventBusServer() {
|
public AsyncTdMiddleEventBusServer() {
|
||||||
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 50);
|
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 50);
|
||||||
@ -346,12 +344,9 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
|
|||||||
return update;
|
return update;
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
.bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100))
|
//.transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)))
|
||||||
.doFinally(signalType -> terminatePingOverPipeFlux.tryEmitEmpty())
|
//.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100))
|
||||||
.mergeWith(Flux
|
.map(List::of)
|
||||||
.interval(Duration.ofSeconds(5))
|
|
||||||
.map(l -> Collections.<TdApi.Object>emptyList())
|
|
||||||
.takeUntilOther(terminatePingOverPipeFlux.asMono()))
|
|
||||||
.map(TdResultList::new);
|
.map(TdResultList::new);
|
||||||
|
|
||||||
var fluxCodec = new TdResultListMessageCodec();
|
var fluxCodec = new TdResultListMessageCodec();
|
||||||
|
160
src/main/java/it/tdlight/utils/BufferTimeOutPublisher.java
Normal file
160
src/main/java/it/tdlight/utils/BufferTimeOutPublisher.java
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
package it.tdlight.utils;
|
||||||
|
|
||||||
|
/** Based on:
|
||||||
|
* https://gist.github.com/glandais-sparklane/e38834aa9df0c56f23e2d8d2e6899c78
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import org.reactivestreams.Publisher;
|
||||||
|
import org.reactivestreams.Subscriber;
|
||||||
|
import org.reactivestreams.Subscription;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
@SuppressWarnings("ReactiveStreamsPublisherImplementation")
|
||||||
|
public class BufferTimeOutPublisher<T> implements Publisher<List<T>> {
|
||||||
|
|
||||||
|
private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
|
private final Publisher<T> source;
|
||||||
|
private final int size;
|
||||||
|
private final long duration;
|
||||||
|
|
||||||
|
public BufferTimeOutPublisher(Publisher<T> source, int size, Duration duration) {
|
||||||
|
this.source = source;
|
||||||
|
this.size = size;
|
||||||
|
this.duration = duration.toMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void subscribe(Subscriber<? super List<T>> subscriber) {
|
||||||
|
subscriber.onSubscribe(new BufferTimeOutSubscription<T>(source, subscriber, size, duration));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static class BufferTimeOutSubscription<T> implements Subscription, Subscriber<T> {
|
||||||
|
|
||||||
|
private final Subscriber<? super List<T>> subscriber;
|
||||||
|
private final int size;
|
||||||
|
private final long duration;
|
||||||
|
private Subscription subscription;
|
||||||
|
|
||||||
|
private final ReentrantLock lock = new ReentrantLock();
|
||||||
|
|
||||||
|
private List<T> buffer;
|
||||||
|
private ScheduledFuture<?> scheduledFuture;
|
||||||
|
|
||||||
|
private long downstreamRequests = 0;
|
||||||
|
private long downstreamTransmit = 0;
|
||||||
|
|
||||||
|
private long upstreamRequests = 0;
|
||||||
|
private long upstreamTransmit = 0;
|
||||||
|
private boolean upstreamCompleted = false;
|
||||||
|
|
||||||
|
public BufferTimeOutSubscription(Publisher<T> source,
|
||||||
|
Subscriber<? super List<T>> subscriber,
|
||||||
|
int size,
|
||||||
|
long duration) {
|
||||||
|
this.subscriber = subscriber;
|
||||||
|
this.size = size;
|
||||||
|
this.duration = duration;
|
||||||
|
this.buffer = new ArrayList<>(size);
|
||||||
|
source.subscribe(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
// downstream
|
||||||
|
@Override
|
||||||
|
public void request(long n) {
|
||||||
|
lock.lock();
|
||||||
|
downstreamRequests = downstreamRequests + n;
|
||||||
|
|
||||||
|
checkSend();
|
||||||
|
|
||||||
|
long downstreamMax = (downstreamRequests - downstreamTransmit) * size;
|
||||||
|
long upstreamRequested = upstreamRequests - upstreamTransmit;
|
||||||
|
long toRequest = downstreamMax - upstreamRequested;
|
||||||
|
|
||||||
|
if (toRequest > 0) {
|
||||||
|
subscription.request(toRequest);
|
||||||
|
upstreamRequests = upstreamRequests + toRequest;
|
||||||
|
}
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancel() {
|
||||||
|
subscription.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
// upstream
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(Subscription s) {
|
||||||
|
this.subscription = s;
|
||||||
|
scheduledFuture = EXECUTOR.scheduleAtFixedRate(this::timeout, 0, this.duration, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void timeout() {
|
||||||
|
checkSend();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkSend() {
|
||||||
|
lock.lock();
|
||||||
|
if (!this.buffer.isEmpty() && downstreamRequests > downstreamTransmit) {
|
||||||
|
List<T> output = prepareOutput();
|
||||||
|
subscriber.onNext(output);
|
||||||
|
downstreamTransmit++;
|
||||||
|
if (!this.buffer.isEmpty()) {
|
||||||
|
checkSend();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (upstreamCompleted && downstreamRequests > downstreamTransmit) {
|
||||||
|
scheduledFuture.cancel(false);
|
||||||
|
subscriber.onComplete();
|
||||||
|
}
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<T> prepareOutput() {
|
||||||
|
if (this.buffer.size() > size) {
|
||||||
|
List<T> output = new ArrayList<>(this.buffer.subList(0, size));
|
||||||
|
this.buffer = new ArrayList<>(this.buffer.subList(size, this.buffer.size()));
|
||||||
|
return output;
|
||||||
|
} else {
|
||||||
|
List<T> output = this.buffer;
|
||||||
|
this.buffer = new ArrayList<>(size);
|
||||||
|
return output;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(T t) {
|
||||||
|
lock.lock();
|
||||||
|
this.buffer.add(t);
|
||||||
|
upstreamTransmit++;
|
||||||
|
if (this.buffer.size() == size) {
|
||||||
|
checkSend();
|
||||||
|
}
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
scheduledFuture.cancel(false);
|
||||||
|
subscriber.onError(t);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
lock.lock();
|
||||||
|
upstreamCompleted = true;
|
||||||
|
checkSend();
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
5
src/main/java/it/tdlight/utils/EmptyCallable.java
Normal file
5
src/main/java/it/tdlight/utils/EmptyCallable.java
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
package it.tdlight.utils;
|
||||||
|
|
||||||
|
public interface EmptyCallable {
|
||||||
|
void call() throws Exception;
|
||||||
|
}
|
@ -106,6 +106,13 @@ public class MonoUtils {
|
|||||||
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic());
|
return Mono.fromCallable(callable).subscribeOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Mono<Void> fromBlockingEmpty(EmptyCallable callable) {
|
||||||
|
return Mono.<Void>fromCallable(() -> {
|
||||||
|
callable.call();
|
||||||
|
return null;
|
||||||
|
}).subscribeOn(Schedulers.boundedElastic());
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> Mono<T> fromBlockingSingle(Callable<T> callable) {
|
public static <T> Mono<T> fromBlockingSingle(Callable<T> callable) {
|
||||||
return fromBlockingMaybe(callable).single();
|
return fromBlockingMaybe(callable).single();
|
||||||
}
|
}
|
||||||
@ -350,7 +357,7 @@ public class MonoUtils {
|
|||||||
* Use fromConsumerAdvanced if you want better stability.
|
* Use fromConsumerAdvanced if you want better stability.
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static <T> Flux<T> fromConsumer(MessageConsumer<T> messageConsumer) {
|
public static <T> Flux<T> fromConsumerUnsafe(MessageConsumer<T> messageConsumer) {
|
||||||
return Flux.<Message<T>>create(sink -> {
|
return Flux.<Message<T>>create(sink -> {
|
||||||
messageConsumer.endHandler(e -> sink.complete());
|
messageConsumer.endHandler(e -> sink.complete());
|
||||||
sink.onDispose(messageConsumer::unregister);
|
sink.onDispose(messageConsumer::unregister);
|
||||||
@ -360,8 +367,24 @@ public class MonoUtils {
|
|||||||
.subscribeOn(Schedulers.boundedElastic());
|
.subscribeOn(Schedulers.boundedElastic());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> Mono<Tuple2<Mono<Void>, Flux<T>>> fromConsumerAdvanced(MessageConsumer<T> messageConsumer) {
|
public static <T> Mono<Tuple2<Mono<Void>, Flux<T>>> fromMessageConsumer(MessageConsumer<T> messageConsumer) {
|
||||||
return Mono.<Tuple2<Mono<Void>, Flux<T>>>fromCallable(() -> {
|
return fromReplyableMessageConsumer(messageConsumer)
|
||||||
|
.map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono
|
||||||
|
.fromCallable(msg::body)
|
||||||
|
.subscribeOn(Schedulers.boundedElastic())))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Mono<Tuple2<Mono<Void>, Flux<Tuple2<Message<?>, T>>>> fromReplyableResolvedMessageConsumer(MessageConsumer<T> messageConsumer) {
|
||||||
|
return fromReplyableMessageConsumer(messageConsumer)
|
||||||
|
.map(tuple -> tuple.mapT2(msgs -> msgs.flatMapSequential(msg -> Mono
|
||||||
|
.fromCallable(() -> Tuples.<Message<?>, T>of(msg, msg.body()))
|
||||||
|
.subscribeOn(Schedulers.boundedElastic())))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Mono<Tuple2<Mono<Void>, Flux<Message<T>>>> fromReplyableMessageConsumer(MessageConsumer<T> messageConsumer) {
|
||||||
|
return Mono.<Tuple2<Mono<Void>, Flux<Message<T>>>>fromCallable(() -> {
|
||||||
Many<Message<T>> messages = Sinks.many().unicast().onBackpressureError();
|
Many<Message<T>> messages = Sinks.many().unicast().onBackpressureError();
|
||||||
Empty<Void> registrationRequested = Sinks.empty();
|
Empty<Void> registrationRequested = Sinks.empty();
|
||||||
Empty<Void> registrationCompletion = Sinks.empty();
|
Empty<Void> registrationCompletion = Sinks.empty();
|
||||||
@ -369,15 +392,11 @@ public class MonoUtils {
|
|||||||
messages.tryEmitComplete();
|
messages.tryEmitComplete();
|
||||||
registrationCompletion.tryEmitEmpty();
|
registrationCompletion.tryEmitEmpty();
|
||||||
});
|
});
|
||||||
messageConsumer.<T>handler(messages::tryEmitNext);
|
messageConsumer.<Message<T>>handler(messages::tryEmitNext);
|
||||||
|
|
||||||
Flux<T> dataFlux = Flux
|
Flux<Message<T>> dataFlux = Flux
|
||||||
.<T>concatDelayError(
|
.<Message<T>>concatDelayError(
|
||||||
messages.asFlux()
|
messages.asFlux(),
|
||||||
.<T>flatMap(msg -> Mono
|
|
||||||
.fromCallable(msg::body)
|
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
|
||||||
),
|
|
||||||
messageConsumer.rxUnregister().as(MonoUtils::toMono)
|
messageConsumer.rxUnregister().as(MonoUtils::toMono)
|
||||||
)
|
)
|
||||||
.doOnSubscribe(s -> registrationRequested.tryEmitEmpty());
|
.doOnSubscribe(s -> registrationRequested.tryEmitEmpty());
|
||||||
|
Loading…
x
Reference in New Issue
Block a user