From 20e5ebcae412844596af8a8bd86b6caf95e57a48 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 6 Mar 2021 13:35:11 +0100 Subject: [PATCH] Bugfixes --- .../it/tdlight/tdlibsession/EventBusFlux.java | 4 +- .../client/AsyncTdMiddleEventBusClient.java | 21 ++++++++- .../it/tdlight/utils/BatchSubscriber.java | 4 +- .../it/tdlight/utils/BinlogAsyncFile.java | 45 ++++++++++--------- .../java/it/tdlight/utils/BinlogUtils.java | 4 +- 5 files changed, 51 insertions(+), 27 deletions(-) diff --git a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java index 72aac17..9c51bf4 100644 --- a/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java +++ b/src/main/java/it/tdlight/tdlibsession/EventBusFlux.java @@ -304,10 +304,10 @@ public class EventBusFlux { if (pingReplyException.failureCode() == -1) { pingSink.error(new ConnectException( "Can't send a ping to flux \"" + fluxAddress + "\" because the connection was lost")); } else { - pingSink.error(new ConnectException("Ping failed:" + pingReplyException.toString())); + pingSink.error(new ConnectException("Ping failed: " + pingReplyException.toString())); } } else { - pingSink.error(new IllegalStateException("Ping failed", pingError)); + pingSink.error(new IllegalStateException("Ping failed: " + pingError.getMessage())); } } }))) diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index ee413f1..1c2ad48 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -6,7 +6,9 @@ import io.vertx.reactivex.core.Vertx; import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.eventbus.MessageConsumer; import it.tdlight.jni.TdApi; +import it.tdlight.jni.TdApi.AuthorizationStateClosed; import it.tdlight.jni.TdApi.Function; +import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.td.ResponseError; import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResult; @@ -52,6 +54,10 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { private final Empty crash = Sinks.one(); // This will only result in a successful completion, never completes in other ways private final Empty pingFail = Sinks.one(); + // This will only result in a successful completion, never completes in other ways. + // It will be called when UpdateAuthorizationStateClosing is intercepted. + // If it's completed stop checking if the ping works or not + private final Empty authStateClosing = Sinks.one(); private int botId; private String botAddress; @@ -174,7 +180,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .doOnNext(s -> logger.trace("PING")) .then() .onErrorResume(ex -> { - logger.warn("Ping failed", ex); + logger.warn("Ping failed: {}", ex.getMessage()); return Mono.empty(); }) .doOnNext(s -> logger.debug("END PING")) @@ -245,9 +251,20 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { ex.setStackTrace(new StackTraceElement[0]); throw ex; }).onErrorResume(ex -> MonoUtils.emitError(crash, ex))) + .takeUntilOther(Mono + .firstWithSignal(crash.asMono(), authStateClosing.asMono()) + .onErrorResume(e -> Mono.empty()) + ) ) .doOnTerminate(() -> logger.trace("TakeUntilOther has been trigghered, the receive() flux will end")) ) + .takeUntil(a -> a.succeeded() && a.value().stream().anyMatch(item -> { + if (item.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + return ((UpdateAuthorizationState) item).authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR; + } + return false; + })) .flatMapSequential(updates -> { if (updates.succeeded()) { return Flux.fromIterable(updates.value()); @@ -272,6 +289,8 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { case TdApi.UpdateAuthorizationState.CONSTRUCTOR: var updateAuthorizationState = (TdApi.UpdateAuthorizationState) update; switch (updateAuthorizationState.authorizationState.getConstructor()) { + case TdApi.AuthorizationStateClosing.CONSTRUCTOR: + authStateClosing.tryEmitEmpty(); case TdApi.AuthorizationStateClosed.CONSTRUCTOR: return Mono.fromRunnable(() -> logger.info("Received AuthorizationStateClosed from tdlib")) .then(cluster.getEventBus().rxRequest(this.botAddress + ".read-binlog", EMPTY).as(MonoUtils::toMono)) diff --git a/src/main/java/it/tdlight/utils/BatchSubscriber.java b/src/main/java/it/tdlight/utils/BatchSubscriber.java index 0a1f7ae..a51a744 100644 --- a/src/main/java/it/tdlight/utils/BatchSubscriber.java +++ b/src/main/java/it/tdlight/utils/BatchSubscriber.java @@ -142,7 +142,9 @@ public abstract class BatchSubscriber implements CoreSubscriber { ? batchSize - buffer.size() : batchSize; log.trace("+ request [{}] -> request {} values", buffer.size(), required); - subscription.request(required); + if (required > 0) { + subscription.request(required); + } if (!buffer.isEmpty()) scheduleFlush(); } diff --git a/src/main/java/it/tdlight/utils/BinlogAsyncFile.java b/src/main/java/it/tdlight/utils/BinlogAsyncFile.java index 0bd247b..e56cb78 100644 --- a/src/main/java/it/tdlight/utils/BinlogAsyncFile.java +++ b/src/main/java/it/tdlight/utils/BinlogAsyncFile.java @@ -1,5 +1,6 @@ package it.tdlight.utils; +import io.vertx.core.file.OpenOptions; import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.file.AsyncFile; import io.vertx.reactivex.core.file.FileProps; @@ -14,45 +15,49 @@ public class BinlogAsyncFile { private final FileSystem filesystem; private final String path; - private final AsyncFile file; + private final OpenOptions openOptions; - public BinlogAsyncFile(FileSystem fileSystem, String path, AsyncFile file) { + public BinlogAsyncFile(FileSystem fileSystem, String path) { this.filesystem = fileSystem; this.path = path; - this.file = file; + this.openOptions = new OpenOptions().setWrite(true).setRead(true).setCreate(false).setDsync(true); + } + + private Mono openRW() { + return filesystem.rxOpen(path, openOptions).as(MonoUtils::toMono); } public Mono readFully() { - return filesystem - .rxProps(path) - .map(props -> (int) props.size()) - .as(MonoUtils::toMono) - .flatMap(size -> { - var buf = Buffer.buffer(size); - logger.debug("Reading binlog from disk. Size: " + BinlogUtils.humanReadableByteCountBin(size)); - return file.rxRead(buf, 0, 0, size).as(MonoUtils::toMono).thenReturn(buf); - }); + return openRW() + .flatMap(asyncFile -> filesystem + .rxProps(path) + .map(props -> (int) props.size()) + .as(MonoUtils::toMono) + .flatMap(size -> { + var buf = Buffer.buffer(size); + logger.debug("Reading binlog from disk. Size: " + BinlogUtils.humanReadableByteCountBin(size)); + return asyncFile.rxRead(buf, 0, 0, size).as(MonoUtils::toMono).thenReturn(buf); + }) + ); } public Mono readFullyBytes() { return this.readFully().map(Buffer::getBytes); } - public AsyncFile getFile() { - return file; - } - public Mono overwrite(Buffer newData) { - return getSize() + return openRW().flatMap(asyncFile -> this + .getSize() .doOnNext(size -> logger.debug("Preparing to overwrite binlog. Initial size: " + BinlogUtils.humanReadableByteCountBin(size))) - .then(file.rxWrite(newData, 0) - .andThen(file.rxFlush()) + .then(asyncFile.rxWrite(newData, 0) + .andThen(asyncFile.rxFlush()) .andThen(filesystem.rxTruncate(path, newData.length())) .as(MonoUtils::toMono) ) .then(getSize()) .doOnNext(size -> logger.debug("Overwritten binlog. Final size: " + BinlogUtils.humanReadableByteCountBin(size))) - .then(); + .then() + ); } public Mono overwrite(byte[] newData) { diff --git a/src/main/java/it/tdlight/utils/BinlogUtils.java b/src/main/java/it/tdlight/utils/BinlogUtils.java index c77234f..d9536c7 100644 --- a/src/main/java/it/tdlight/utils/BinlogUtils.java +++ b/src/main/java/it/tdlight/utils/BinlogUtils.java @@ -29,7 +29,6 @@ public class BinlogUtils { public static Mono retrieveBinlog(FileSystem vertxFilesystem, Path binlogPath) { var path = binlogPath.toString(); - var openOptions = new OpenOptions().setWrite(true).setRead(true).setCreate(false).setDsync(true); return vertxFilesystem // Create file if not exist to avoid errors .rxExists(path).filter(exists -> exists).as(MonoUtils::toMono) @@ -38,8 +37,7 @@ public class BinlogUtils { .thenReturn(true) ) // Open file - .flatMap(x -> vertxFilesystem.rxOpen(path, openOptions).as(MonoUtils::toMono)) - .map(file -> new BinlogAsyncFile(vertxFilesystem, path, file)) + .map(x -> new BinlogAsyncFile(vertxFilesystem, path)) .single() .publishOn(Schedulers.boundedElastic()); }