This commit is contained in:
Andrea Cavalli 2021-03-06 13:35:11 +01:00
parent 3e542d9ec4
commit 20e5ebcae4
5 changed files with 51 additions and 27 deletions

View File

@ -304,10 +304,10 @@ public class EventBusFlux {
if (pingReplyException.failureCode() == -1) {
pingSink.error(new ConnectException( "Can't send a ping to flux \"" + fluxAddress + "\" because the connection was lost"));
} else {
pingSink.error(new ConnectException("Ping failed:" + pingReplyException.toString()));
pingSink.error(new ConnectException("Ping failed: " + pingReplyException.toString()));
}
} else {
pingSink.error(new IllegalStateException("Ping failed", pingError));
pingSink.error(new IllegalStateException("Ping failed: " + pingError.getMessage()));
}
}
})))

View File

@ -6,7 +6,9 @@ import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.AuthorizationStateClosed;
import it.tdlight.jni.TdApi.Function;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.td.ResponseError;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResult;
@ -52,6 +54,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
private final Empty<Void> crash = Sinks.one();
// This will only result in a successful completion, never completes in other ways
private final Empty<Void> pingFail = Sinks.one();
// This will only result in a successful completion, never completes in other ways.
// It will be called when UpdateAuthorizationStateClosing is intercepted.
// If it's completed stop checking if the ping works or not
private final Empty<Void> authStateClosing = Sinks.one();
private int botId;
private String botAddress;
@ -174,7 +180,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.doOnNext(s -> logger.trace("PING"))
.then()
.onErrorResume(ex -> {
logger.warn("Ping failed", ex);
logger.warn("Ping failed: {}", ex.getMessage());
return Mono.empty();
})
.doOnNext(s -> logger.debug("END PING"))
@ -245,9 +251,20 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
ex.setStackTrace(new StackTraceElement[0]);
throw ex;
}).onErrorResume(ex -> MonoUtils.emitError(crash, ex)))
.takeUntilOther(Mono
.firstWithSignal(crash.asMono(), authStateClosing.asMono())
.onErrorResume(e -> Mono.empty())
)
)
.doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end"))
)
.takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> {
if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
return ((UpdateAuthorizationState) item).authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR;
}
return false;
}))
.flatMapSequential(updates -> {
if (updates.succeeded()) {
return Flux.fromIterable(updates.value());
@ -272,6 +289,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
case TdApi.UpdateAuthorizationState.CONSTRUCTOR:
var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update;
switch (updateAuthorizationState.authorizationState.getConstructor()) {
case TdApi.AuthorizationStateClosing.CONSTRUCTOR:
authStateClosing.tryEmitEmpty();
case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib"))
.then(cluster.getEventBus().<EndSessionMessage>rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono))

View File

@ -142,7 +142,9 @@ public abstract class BatchSubscriber<T> implements CoreSubscriber<T> {
? batchSize - buffer.size()
: batchSize;
log.trace("+ request [{}] -> request {} values", buffer.size(), required);
subscription.request(required);
if (required > 0) {
subscription.request(required);
}
if (!buffer.isEmpty()) scheduleFlush();
}

View File

@ -1,5 +1,6 @@
package it.tdlight.utils;
import io.vertx.core.file.OpenOptions;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.file.AsyncFile;
import io.vertx.reactivex.core.file.FileProps;
@ -14,45 +15,49 @@ public class BinlogAsyncFile {
private final FileSystem filesystem;
private final String path;
private final AsyncFile file;
private final OpenOptions openOptions;
public BinlogAsyncFile(FileSystem fileSystem, String path, AsyncFile file) {
public BinlogAsyncFile(FileSystem fileSystem, String path) {
this.filesystem = fileSystem;
this.path = path;
this.file = file;
this.openOptions = new OpenOptions().setWrite(true).setRead(true).setCreate(false).setDsync(true);
}
private Mono<AsyncFile> openRW() {
return filesystem.rxOpen(path, openOptions).as(MonoUtils::toMono);
}
public Mono<Buffer> readFully() {
return filesystem
.rxProps(path)
.map(props -> (int) props.size())
.as(MonoUtils::toMono)
.flatMap(size -> {
var buf = Buffer.buffer(size);
logger.debug("Reading binlog from disk. Size: " + BinlogUtils.humanReadableByteCountBin(size));
return file.rxRead(buf, 0, 0, size).as(MonoUtils::toMono).thenReturn(buf);
});
return openRW()
.flatMap(asyncFile -> filesystem
.rxProps(path)
.map(props -> (int) props.size())
.as(MonoUtils::toMono)
.flatMap(size -> {
var buf = Buffer.buffer(size);
logger.debug("Reading binlog from disk. Size: " + BinlogUtils.humanReadableByteCountBin(size));
return asyncFile.rxRead(buf, 0, 0, size).as(MonoUtils::toMono).thenReturn(buf);
})
);
}
public Mono<byte[]> readFullyBytes() {
return this.readFully().map(Buffer::getBytes);
}
public AsyncFile getFile() {
return file;
}
public Mono<Void> overwrite(Buffer newData) {
return getSize()
return openRW().flatMap(asyncFile -> this
.getSize()
.doOnNext(size -> logger.debug("Preparing to overwrite binlog. Initial size: " + BinlogUtils.humanReadableByteCountBin(size)))
.then(file.rxWrite(newData, 0)
.andThen(file.rxFlush())
.then(asyncFile.rxWrite(newData, 0)
.andThen(asyncFile.rxFlush())
.andThen(filesystem.rxTruncate(path, newData.length()))
.as(MonoUtils::toMono)
)
.then(getSize())
.doOnNext(size -> logger.debug("Overwritten binlog. Final size: " + BinlogUtils.humanReadableByteCountBin(size)))
.then();
.then()
);
}
public Mono<Void> overwrite(byte[] newData) {

View File

@ -29,7 +29,6 @@ public class BinlogUtils {
public static Mono<BinlogAsyncFile> retrieveBinlog(FileSystem vertxFilesystem, Path binlogPath) {
var path = binlogPath.toString();
var openOptions = new OpenOptions().setWrite(true).setRead(true).setCreate(false).setDsync(true);
return vertxFilesystem
// Create file if not exist to avoid errors
.rxExists(path).filter(exists -> exists).as(MonoUtils::toMono)
@ -38,8 +37,7 @@ public class BinlogUtils {
.thenReturn(true)
)
// Open file
.flatMap(x -> vertxFilesystem.rxOpen(path, openOptions).as(MonoUtils::toMono))
.map(file -> new BinlogAsyncFile(vertxFilesystem, path, file))
.map(x -> new BinlogAsyncFile(vertxFilesystem, path))
.single()
.publishOn(Schedulers.boundedElastic());
}