diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java index c1ac464..e5f8195 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirect.java @@ -13,7 +13,7 @@ public interface AsyncTdDirect { * Can be called only once. * */ - Flux> receive(AsyncTdDirectOptions options); + Flux receive(AsyncTdDirectOptions options); /** * Sends request to TDLib. diff --git a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java index 900ad98..4c69fcb 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java +++ b/src/main/java/it/tdlight/tdlibsession/td/direct/AsyncTdDirectImpl.java @@ -62,12 +62,12 @@ public class AsyncTdDirectImpl implements AsyncTdDirect { } @Override - public Flux> receive(AsyncTdDirectOptions options) { + public Flux receive(AsyncTdDirectOptions options) { // If closed it will be either true or false final One closedFromTd = Sinks.one(); - return Flux.>create(emitter -> { + return Flux.create(emitter -> { var client = ClientManager.create((Object object) -> { - emitter.next(TdResult.of(object)); + emitter.next(object); // Close the emitter if receive closed state if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR && ((UpdateAuthorizationState) object).authorizationState.getConstructor() diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java index 00e6689..8a4abab 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultList.java @@ -1,22 +1,37 @@ package it.tdlight.tdlibsession.td.middle; import it.tdlight.jni.TdApi; -import it.tdlight.tdlibsession.td.TdResult; +import it.tdlight.jni.TdApi.Error; import java.util.List; import java.util.Objects; import java.util.StringJoiner; public class TdResultList { - private final List> values; + private final List values; + private final Error error; - public TdResultList(List> values) { + public TdResultList(List values) { this.values = values; + this.error = null; } - public List> getValues() { + public TdResultList(TdApi.Error error) { + this.values = null; + this.error = error; + } + + public List value() { return values; } + public TdApi.Error error() { + return error; + } + + public boolean succeeded() { + return error == null && values != null; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -28,16 +43,24 @@ public class TdResultList { TdResultList that = (TdResultList) o; - return Objects.equals(values, that.values); + if (!Objects.equals(values, that.values)) { + return false; + } + return Objects.equals(error, that.error); } @Override public int hashCode() { - return values != null ? values.hashCode() : 0; + int result = values != null ? values.hashCode() : 0; + result = 31 * result + (error != null ? error.hashCode() : 0); + return result; } @Override public String toString() { - return new StringJoiner(", ", TdResultList.class.getSimpleName() + "[", "]").add("values=" + values).toString(); + return new StringJoiner(", ", TdResultList.class.getSimpleName() + "[", "]") + .add("values=" + values) + .add("error=" + error) + .toString(); } } diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java index 5ebfce4..75fcb4d 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/TdResultListMessageCodec.java @@ -4,14 +4,13 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.MessageCodec; import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi.Error; -import it.tdlight.tdlibsession.td.TdResult; import it.tdlight.utils.VertxBufferInputStream; import it.tdlight.utils.VertxBufferOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import org.warp.commonutils.stream.SafeDataInputStream; -import org.warp.commonutils.stream.SafeDataOutputStream; public class TdResultListMessageCodec implements MessageCodec { @@ -22,17 +21,17 @@ public class TdResultListMessageCodec implements MessageCodec t1 : t) { - if (t1.succeeded()) { - dos.writeBoolean(true); - t1.result().serialize(dos.asDataOutputStream()); - } else { - dos.writeBoolean(false); - t1.cause().serialize(dos.asDataOutputStream()); + try (var dos = new DataOutputStream(bos)) { + if (ts.succeeded()) { + dos.writeBoolean(true); + var t = ts.value(); + dos.writeInt(t.size()); + for (TdApi.Object t1 : t) { + t1.serialize(dos); } + } else { + dos.writeBoolean(false); + ts.error().serialize(dos); } } } catch (IOException ex) { @@ -44,16 +43,16 @@ public class TdResultListMessageCodec implements MessageCodec> list = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - if (dis.readBoolean()) { - list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis))); - } else { - list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis))); + if (dis.readBoolean()) { + var size = dis.readInt(); + ArrayList list = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + list.add((TdApi.Object) TdApi.Deserializer.deserialize(dis)); } + return new TdResultList(list); + } else { + return new TdResultList((Error) TdApi.Deserializer.deserialize(dis)); } - return new TdResultList(list); } } catch (IOException | UnsupportedOperationException ex) { ex.printStackTrace(); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java index 9f72cb9..f32641e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/client/AsyncTdMiddleEventBusClient.java @@ -144,8 +144,14 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle { .timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> { throw new IllegalStateException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)"); })) - .flatMap(updates -> Flux.fromIterable(((TdResultList) updates.body()).getValues())) - .flatMap(update -> Mono.fromCallable(update::orElseThrow)) + .flatMap(updates -> { + var result = (TdResultList) updates.body(); + if (result.succeeded()) { + return Flux.fromIterable(result.value()); + } else { + return Mono.fromCallable(() -> TdResult.failed(result.error()).orElseThrow()); + } + }) .flatMap(this::interceptUpdate) .doOnError(crash::tryEmitError) .doOnTerminate(updatesStreamEnd::tryEmitEmpty); diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java index d4dd070..3afb7d8 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/direct/AsyncTdMiddleDirect.java @@ -78,14 +78,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd .receive(new AsyncTdDirectOptions(WAIT_DURATION, 100)) .takeUntilOther(closeRequest.asMono()) .doOnError(ex -> logger.info("TdMiddle verticle error", ex)) - .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")) - .doOnNext(result -> { - if (result.failed()) { - logger.error("Received an errored update: {}", result.cause()); - } - }) - .filter(TdResult::succeeded) - .map(TdResult::result); + .doOnTerminate(() -> logger.debug("TdMiddle verticle stopped")); } @Override diff --git a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java index 688f58b..68b9c3e 100644 --- a/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java +++ b/src/main/java/it/tdlight/tdlibsession/td/middle/server/AsyncTdMiddleEventBusServer.java @@ -14,6 +14,7 @@ import it.tdlight.jni.TdApi.SetTdlibParameters; import it.tdlight.jni.TdApi.Update; import it.tdlight.jni.TdApi.UpdateAuthorizationState; import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient; +import it.tdlight.tdlibsession.td.TdError; import it.tdlight.tdlibsession.td.TdResultMessage; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl; import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions; @@ -270,26 +271,29 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle { Flux updatesFlux = td .receive(tdOptions) .flatMap(item -> Mono.defer(() -> { - if (item.succeeded()) { - var tdObject = item.result(); - if (tdObject instanceof Update) { - var tdUpdate = (Update) tdObject; - if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { - var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate; - if (tdUpdateAuthorizationState.authorizationState.getConstructor() - == AuthorizationStateClosed.CONSTRUCTOR) { - logger.debug("Undeploying after receiving AuthorizationStateClosed"); - return rxStop().as(MonoUtils::toMono).thenReturn(item); - } + if (item instanceof Update) { + var tdUpdate = (Update) item; + if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) { + var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate; + if (tdUpdateAuthorizationState.authorizationState.getConstructor() + == AuthorizationStateClosed.CONSTRUCTOR) { + logger.debug("Undeploying after receiving AuthorizationStateClosed"); + return rxStop().as(MonoUtils::toMono).thenReturn(item); } - } else if (tdObject instanceof Error) { - // An error in updates means that a fatal error occurred - logger.debug("Undeploying after receiving a fatal error"); - return rxStop().as(MonoUtils::toMono).thenReturn(item); } - return Mono.just(item); + } else if (item instanceof Error) { + // An error in updates means that a fatal error occurred + logger.debug("Undeploying after receiving a fatal error"); + return rxStop().as(MonoUtils::toMono).thenReturn(item); + } + return Mono.just(item); + })) + .flatMap(item -> Mono.fromCallable(() -> { + if (item.getConstructor() == TdApi.Error.CONSTRUCTOR) { + var error = (Error) item; + throw new TdError(error.code, error.message); } else { - return Mono.just(item); + return item; } })) .bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100))