Adapt to the future ids format

This commit is contained in:
Andrea Cavalli 2021-03-10 12:35:56 +01:00
parent 20e5ebcae4
commit 0f38d6573e
6 changed files with 17 additions and 16 deletions

View File

@ -108,12 +108,12 @@
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId> <artifactId>reactor-core</artifactId>
<version>3.4.2</version> <version>3.4.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>io.projectreactor</groupId> <groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId> <artifactId>reactor-tools</artifactId>
<version>3.4.2</version> <version>3.4.3</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.akaita.java</groupId> <groupId>com.akaita.java</groupId>

View File

@ -217,15 +217,15 @@ public class TDLibRemoteClient implements AutoCloseable {
.then(completion); .then(completion);
} }
public static Path getSessionDirectory(int botId) { public static Path getSessionDirectory(long botId) {
return Paths.get(".sessions-cache").resolve("id" + botId); return Paths.get(".sessions-cache").resolve("id" + botId);
} }
public static Path getMediaDirectory(int botId) { public static Path getMediaDirectory(long botId) {
return Paths.get(".cache").resolve("media").resolve("id" + botId); return Paths.get(".cache").resolve("media").resolve("id" + botId);
} }
public static Path getSessionBinlogDirectory(int botId) { public static Path getSessionBinlogDirectory(long botId) {
return getSessionDirectory(botId).resolve("td.binlog"); return getSessionDirectory(botId).resolve("td.binlog");
} }

View File

@ -7,13 +7,13 @@ import java.util.StringJoiner;
public final class StartSessionMessage { public final class StartSessionMessage {
private final int id; private final long id;
private final String alias; private final String alias;
private final Buffer binlog; private final Buffer binlog;
private final long binlogDate; private final long binlogDate;
private final JsonObject implementationDetails; private final JsonObject implementationDetails;
public StartSessionMessage(int id, String alias, Buffer binlog, long binlogDate, JsonObject implementationDetails) { public StartSessionMessage(long id, String alias, Buffer binlog, long binlogDate, JsonObject implementationDetails) {
this.id = id; this.id = id;
this.alias = alias; this.alias = alias;
this.binlog = binlog; this.binlog = binlog;
@ -21,7 +21,7 @@ public final class StartSessionMessage {
this.implementationDetails = implementationDetails; this.implementationDetails = implementationDetails;
} }
public int id() { public long id() {
return id; return id;
} }

View File

@ -18,7 +18,7 @@ public class StartSessionMessageCodec implements MessageCodec<StartSessionMessag
@Override @Override
public void encodeToWire(Buffer buffer, StartSessionMessage t) { public void encodeToWire(Buffer buffer, StartSessionMessage t) {
BufferUtils.encode(buffer, os -> { BufferUtils.encode(buffer, os -> {
os.writeInt(t.id()); os.writeLong(t.id());
UTFUtils.writeUTF(os, t.alias()); UTFUtils.writeUTF(os, t.alias());
BufferUtils.writeBuf(os, t.binlog()); BufferUtils.writeBuf(os, t.binlog());
os.writeLong(t.binlogDate()); os.writeLong(t.binlogDate());
@ -28,7 +28,7 @@ public class StartSessionMessageCodec implements MessageCodec<StartSessionMessag
@Override @Override
public StartSessionMessage decodeFromWire(int pos, Buffer buffer) { public StartSessionMessage decodeFromWire(int pos, Buffer buffer) {
return BufferUtils.decode(pos, buffer, is -> new StartSessionMessage(is.readInt(), return BufferUtils.decode(pos, buffer, is -> new StartSessionMessage(is.readLong(),
UTFUtils.readUTF(is), UTFUtils.readUTF(is),
BufferUtils.rxReadBuf(is), BufferUtils.rxReadBuf(is),
is.readLong(), is.readLong(),

View File

@ -59,7 +59,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
// If it's completed stop checking if the ping works or not // If it's completed stop checking if the ping works or not
private final Empty<Void> authStateClosing = Sinks.one(); private final Empty<Void> authStateClosing = Sinks.one();
private int botId; private long botId;
private String botAddress; private String botAddress;
private String botAlias; private String botAlias;
private boolean local; private boolean local;
@ -83,7 +83,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
} }
public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager, public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager,
int botId, long botId,
String botAlias, String botAlias,
boolean local, boolean local,
JsonObject implementationDetails, JsonObject implementationDetails,
@ -110,7 +110,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
* *
* @return optional result * @return optional result
*/ */
public static Mono<BinlogAsyncFile> retrieveBinlog(Vertx vertx, Path binlogsArchiveDirectory, int botId) { public static Mono<BinlogAsyncFile> retrieveBinlog(Vertx vertx, Path binlogsArchiveDirectory, long botId) {
return BinlogUtils.retrieveBinlog(vertx.fileSystem(), binlogsArchiveDirectory.resolve(botId + ".binlog")); return BinlogUtils.retrieveBinlog(vertx.fileSystem(), binlogsArchiveDirectory.resolve(botId + ".binlog"));
} }
@ -118,7 +118,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data)); return this.binlog.asMono().flatMap(binlog -> BinlogUtils.saveBinlog(binlog, data));
} }
public Mono<Void> start(int botId, public Mono<Void> start(long botId,
String botAlias, String botAlias,
boolean local, boolean local,
JsonObject implementationDetails, JsonObject implementationDetails,

View File

@ -25,6 +25,7 @@ import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdResultMessage; import it.tdlight.tdlibsession.td.middle.TdResultMessage;
import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.BufferTimeOutPublisher;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
import java.net.ConnectException; import java.net.ConnectException;
import java.time.Duration; import java.time.Duration;
@ -341,8 +342,8 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
} }
})) }))
.limitRate(Math.max(1, tdOptions.getEventsSize())) .limitRate(Math.max(1, tdOptions.getEventsSize()))
//.transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100))) .transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)))
.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) //.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100))
//.map(List::of) //.map(List::of)
.limitRate(Math.max(1, tdOptions.getEventsSize())) .limitRate(Math.max(1, tdOptions.getEventsSize()))
.map(TdResultList::new); .map(TdResultList::new);