Fix initialization

This commit is contained in:
Andrea Cavalli 2021-02-25 11:21:03 +01:00
parent 909a5a2e6d
commit 6a52cab8de
12 changed files with 67 additions and 40 deletions

19
pom.xml
View File

@ -201,25 +201,6 @@
<artifactId>maven-install-plugin</artifactId>
<version>3.0.0-M1</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.1.2</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>

View File

@ -7,7 +7,9 @@ import reactor.core.publisher.Mono;
public interface ReactorTelegramClient {
Flux<Object> initialize();
Mono<Void> initialize();
Flux<Object> receive();
Mono<Object> send(TdApi.Function query);

View File

@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td;
import it.tdlight.common.ReactiveTelegramClient;
import it.tdlight.jni.TdApi;
import it.tdlight.utils.MonoUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -13,19 +14,27 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient {
this.reactiveTelegramClient = reactiveTelegramClient;
}
@SuppressWarnings("Convert2MethodRef")
public Mono<Void> initialize() {
return MonoUtils
.fromBlockingEmpty(() -> reactiveTelegramClient.createAndRegisterClient());
}
@Override
public Flux<TdApi.Object> initialize() {
return Flux.from(reactiveTelegramClient).concatMap(item -> {
if (item.isUpdate()) {
return Mono.just(item.getUpdate());
} else if (item.isHandleException()) {
return Mono.error(item.getHandleException());
} else if (item.isUpdateException()) {
return Mono.error(item.getUpdateException());
} else {
return Mono.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type"));
}
});
public Flux<TdApi.Object> receive() {
return Flux
.from(reactiveTelegramClient)
.concatMap(item -> {
if (item.isUpdate()) {
return Mono.just(item.getUpdate());
} else if (item.isHandleException()) {
return Mono.error(item.getHandleException());
} else if (item.isUpdateException()) {
return Mono.error(item.getUpdateException());
} else {
return Mono.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type"));
}
});
}
/**

View File

@ -8,6 +8,8 @@ import reactor.core.publisher.Mono;
public interface AsyncTdDirect {
Mono<Void> initialize();
/**
* Receives incoming updates and request responses from TDLib.
* Can be called only once.

View File

@ -71,11 +71,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
@Override
public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) {
// If closed it will be either true or false
final One<Boolean> closedFromTd = Sinks.one();
public Mono<Void> initialize() {
return telegramClientFactory
.create(implementationDetails)
.flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient))
.doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1)))
.flatMap(client -> {
if (td.tryEmitValue(client).isFailure()) {
@ -83,7 +82,16 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
return Mono.just(client);
})
.flatMapMany(ReactorTelegramClient::initialize)
.then();
}
@Override
public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) {
// If closed it will be either true or false
final One<Boolean> closedFromTd = Sinks.one();
return td
.asMono()
.flatMapMany(ReactorTelegramClient::receive)
.doOnNext(update -> {
// Close the emitter if receive closed state
if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR

View File

@ -68,7 +68,12 @@ public class TestClient implements ReactorTelegramClient {
}
@Override
public Flux<TdApi.Object> initialize() {
public Mono<Void> initialize() {
return Mono.empty();
}
@Override
public Flux<TdApi.Object> receive() {
return Flux.fromIterable(features).flatMap(featureName -> {
switch (featureName) {
case "status-update":

View File

@ -144,7 +144,8 @@ public class AsyncTdEasy {
.flatMap(_v -> {
this.settings.tryEmitNext(settings);
return Mono.empty();
});
})
.then(td.initialize());
}
/**

View File

@ -7,6 +7,8 @@ import reactor.core.publisher.Mono;
public interface AsyncTdMiddle {
Mono<Void> initialize();
/**
* Receives incoming updates from TDLib.
*

View File

@ -144,6 +144,7 @@ public class TdClusterManager {
cfg.setProperty("hazelcast.wait.seconds.before.join", "0");
cfg.setProperty("hazelcast.tcp.join.port.try.count", "5");
cfg.setProperty("hazelcast.socket.bind.any", "false");
cfg.setProperty("hazelcast.health.monitoring.level", "OFF");
cfg.setClusterName("tdlib-session-container");
mgr = new HazelcastClusterManager(cfg);
vertxOptions.setClusterManager(mgr);

View File

@ -64,10 +64,15 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000);
}
private Mono<AsyncTdMiddleEventBusClient> initialize() {
private Mono<AsyncTdMiddleEventBusClient> initializeEb() {
return Mono.just(this);
}
@Override
public Mono<Void> initialize() {
return Mono.empty();
}
public static Mono<AsyncTdMiddle> getAndDeployInstance(TdClusterManager clusterManager,
int botId,
String botAlias,
@ -75,7 +80,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
JsonObject implementationDetails,
Path binlogsArchiveDirectory) {
return new AsyncTdMiddleEventBusClient(clusterManager)
.initialize()
.initializeEb()
.flatMap(instance -> retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId)
.flatMap(binlog -> binlog
.getLastModifiedTime()

View File

@ -83,6 +83,12 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
return Completable.complete();
}
@Override
public Mono<Void> initialize() {
return td
.initialize();
}
@Override
public Flux<TdApi.Object> receive() {
return td

View File

@ -68,6 +68,11 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle {
.thenReturn(this);
}
@Override
public Mono<Void> initialize() {
return cli.asMono().single().flatMap(AsyncTdMiddle::initialize);
}
@Override
public Flux<TdApi.Object> receive() {
return cli.asMono().single().flatMapMany(AsyncTdMiddle::receive);