diff --git a/pom.xml b/pom.xml index 5bd9ae7..82207c6 100644 --- a/pom.xml +++ b/pom.xml @@ -201,25 +201,6 @@ maven-install-plugin 3.0.0-M1 - - org.apache.maven.plugins - maven-dependency-plugin - 3.1.2 - - - package - - copy-dependencies - - - ${project.build.directory}/lib - false - false - true - - - - org.apache.maven.plugins maven-compiler-plugin diff --git a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java index 3921c8c..9284663 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/ReactorTelegramClient.java @@ -7,7 +7,9 @@ import reactor.core.publisher.Mono; public interface ReactorTelegramClient { - Flux initialize(); + Mono initialize(); + + Flux receive(); Mono send(TdApi.Function query); diff --git a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java index 4af92fb..417fb35 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/WrappedReactorTelegramClient.java @@ -2,6 +2,7 @@ package it.tdlight.tdlibsession.td; import it.tdlight.common.ReactiveTelegramClient; import it.tdlight.jni.TdApi; +import it.tdlight.utils.MonoUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,19 +14,27 @@ public class WrappedReactorTelegramClient implements ReactorTelegramClient { this.reactiveTelegramClient = reactiveTelegramClient; } + @SuppressWarnings("Convert2MethodRef") + public Mono initialize() { + return MonoUtils + .fromBlockingEmpty(() -> reactiveTelegramClient.createAndRegisterClient()); + } + @Override - public Flux initialize() { - return Flux.from(reactiveTelegramClient).concatMap(item -> { - if (item.isUpdate()) { - return Mono.just(item.getUpdate()); - } else if (item.isHandleException()) { - return Mono.error(item.getHandleException()); - } else if (item.isUpdateException()) { - return Mono.error(item.getUpdateException()); - } else { - return Mono.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type")); - } - }); + public Flux receive() { + return Flux + .from(reactiveTelegramClient) + .concatMap(item -> { + if (item.isUpdate()) { + return Mono.just(item.getUpdate()); + } else if (item.isHandleException()) { + return Mono.error(item.getHandleException()); + } else if (item.isUpdateException()) { + return Mono.error(item.getUpdateException()); + } else { + return Mono.error(new IllegalStateException("This shouldn't happen. Received unknown ReactiveItem type")); + } + }); } /** diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java index e5f8195..9c74f04 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java @@ -8,6 +8,8 @@ import reactor.core.publisher.Mono; public interface AsyncTdDirect { + Mono initialize(); + /** * Receives incoming updates and request responses from TDLib. * Can be called only once. diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 12d2c49..7ce67dc 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -71,11 +71,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } @Override - public Flux receive(AsyncTdDirectOptions options) { - // If closed it will be either true or false - final One closedFromTd = Sinks.one(); + public Mono initialize() { return telegramClientFactory .create(implementationDetails) + .flatMap(reactorTelegramClient -> reactorTelegramClient.initialize().thenReturn(reactorTelegramClient)) .doOnNext(client -> client.execute(new TdApi.SetLogVerbosityLevel(1))) .flatMap(client -> { if (td.tryEmitValue(client).isFailure()) { @@ -83,7 +82,16 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } return Mono.just(client); }) - .flatMapMany(ReactorTelegramClient::initialize) + .then(); + } + + @Override + public Flux receive(AsyncTdDirectOptions options) { + // If closed it will be either true or false + final One closedFromTd = Sinks.one(); + return td + .asMono() + .flatMapMany(ReactorTelegramClient::receive) .doOnNext(update -> { // Close the emitter if receive closed state if (update.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java index e6bf06e..f59c87d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/TestClient.java @@ -68,7 +68,12 @@ public class TestClient implements ReactorTelegramClient { } @Override - public Flux initialize() { + public Mono initialize() { + return Mono.empty(); + } + + @Override + public Flux receive() { return Flux.fromIterable(features).flatMap(featureName -> { switch (featureName) { case "status-update": diff --git a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java index 0c4e57d..9bf1968 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java +++ b/src/main/java/it/tdlight/tdlibsession/td/easy/AsyncTdEasy.java @@ -144,7 +144,8 @@ public class AsyncTdEasy { .flatMap(_v -> { this.settings.tryEmitNext(settings); return Mono.empty(); - }); + }) + .then(td.initialize()); } /** diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java index b6c6253..2c2c679 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/AsyncTdMiddle.java @@ -7,6 +7,8 @@ import reactor.core.publisher.Mono; public interface AsyncTdMiddle { + Mono initialize(); + /** * Receives incoming updates from TDLib. * diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java index 167c0c4..7342a11 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdClusterManager.java @@ -144,6 +144,7 @@ public class TdClusterManager { cfg.setProperty("hazelcast.wait.seconds.before.join", "0"); cfg.setProperty("hazelcast.tcp.join.port.try.count", "5"); cfg.setProperty("hazelcast.socket.bind.any", "false"); + cfg.setProperty("hazelcast.health.monitoring.level", "OFF"); cfg.setClusterName("tdlib-session-container"); mgr = new HazelcastClusterManager(cfg); vertxOptions.setClusterManager(mgr); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 3c6c7f6..7b17b16 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -64,10 +64,15 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { this.deliveryOptionsWithTimeout = cluster.newDeliveryOpts().setLocalOnly(local).setSendTimeout(30000); } - private Mono initialize() { + private Mono initializeEb() { return Mono.just(this); } + @Override + public Mono initialize() { + return Mono.empty(); + } + public static Mono getAndDeployInstance(TdClusterManager clusterManager, int botId, String botAlias, @@ -75,7 +80,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { JsonObject implementationDetails, Path binlogsArchiveDirectory) { return new AsyncTdMiddleEventBusClient(clusterManager) - .initialize() + .initializeEb() .flatMap(instance -> retrieveBinlog(clusterManager.getVertx(), binlogsArchiveDirectory, botId) .flatMap(binlog -> binlog .getLastModifiedTime() diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index 7d88080..8815eb2 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -83,6 +83,12 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd return Completable.complete(); } + @Override + public Mono initialize() { + return td + .initialize(); + } + @Override public Flux receive() { return td diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java index 1e3321d..9521be7 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleLocal.java @@ -68,6 +68,11 @@ public class AsyncTdMiddleLocal implements AsyncTdMiddle { .thenReturn(this); } + @Override + public Mono initialize() { + return cli.asMono().single().flatMap(AsyncTdMiddle::initialize); + } + @Override public Flux receive() { return cli.asMono().single().flatMapMany(AsyncTdMiddle::receive);