Reduce overhead

This commit is contained in:
Andrea Cavalli 2021-01-24 04:21:47 +01:00
parent df116331d7
commit 1f6f4ec62f
7 changed files with 83 additions and 58 deletions

View File

@ -13,7 +13,7 @@ public interface AsyncTdDirect {
* Can be called only once.
*
*/
Flux<TdResult<TdApi.Object>> receive(AsyncTdDirectOptions options);
Flux<TdApi.Object> receive(AsyncTdDirectOptions options);
/**
* Sends request to TDLib.

View File

@ -62,12 +62,12 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
@Override
public Flux<TdResult<TdApi.Object>> receive(AsyncTdDirectOptions options) {
public Flux<TdApi.Object> receive(AsyncTdDirectOptions options) {
// If closed it will be either true or false
final One<Boolean> closedFromTd = Sinks.one();
return Flux.<TdResult<TdApi.Object>>create(emitter -> {
return Flux.<TdApi.Object>create(emitter -> {
var client = ClientManager.create((Object object) -> {
emitter.next(TdResult.of(object));
emitter.next(object);
// Close the emitter if receive closed state
if (object.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR
&& ((UpdateAuthorizationState) object).authorizationState.getConstructor()

View File

@ -1,22 +1,37 @@
package it.tdlight.tdlibsession.td.middle;
import it.tdlight.jni.TdApi;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.jni.TdApi.Error;
import java.util.List;
import java.util.Objects;
import java.util.StringJoiner;
public class TdResultList {
private final List<TdResult<TdApi.Object>> values;
private final List<TdApi.Object> values;
private final Error error;
public TdResultList(List<TdResult<TdApi.Object>> values) {
public TdResultList(List<TdApi.Object> values) {
this.values = values;
this.error = null;
}
public List<TdResult<TdApi.Object>> getValues() {
public TdResultList(TdApi.Error error) {
this.values = null;
this.error = error;
}
public List<TdApi.Object> value() {
return values;
}
public TdApi.Error error() {
return error;
}
public boolean succeeded() {
return error == null && values != null;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -28,16 +43,24 @@ public class TdResultList {
TdResultList that = (TdResultList) o;
return Objects.equals(values, that.values);
if (!Objects.equals(values, that.values)) {
return false;
}
return Objects.equals(error, that.error);
}
@Override
public int hashCode() {
return values != null ? values.hashCode() : 0;
int result = values != null ? values.hashCode() : 0;
result = 31 * result + (error != null ? error.hashCode() : 0);
return result;
}
@Override
public String toString() {
return new StringJoiner(", ", TdResultList.class.getSimpleName() + "[", "]").add("values=" + values).toString();
return new StringJoiner(", ", TdResultList.class.getSimpleName() + "[", "]")
.add("values=" + values)
.add("error=" + error)
.toString();
}
}

View File

@ -4,14 +4,13 @@ import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Error;
import it.tdlight.tdlibsession.td.TdResult;
import it.tdlight.utils.VertxBufferInputStream;
import it.tdlight.utils.VertxBufferOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import org.warp.commonutils.stream.SafeDataInputStream;
import org.warp.commonutils.stream.SafeDataOutputStream;
public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdResultList> {
@ -22,17 +21,17 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
@Override
public void encodeToWire(Buffer buffer, TdResultList ts) {
try (var bos = new VertxBufferOutputStream(buffer)) {
try (var dos = new SafeDataOutputStream(bos)) {
var t = ts.getValues();
dos.writeInt(t.size());
for (TdResult<TdApi.Object> t1 : t) {
if (t1.succeeded()) {
dos.writeBoolean(true);
t1.result().serialize(dos.asDataOutputStream());
} else {
dos.writeBoolean(false);
t1.cause().serialize(dos.asDataOutputStream());
try (var dos = new DataOutputStream(bos)) {
if (ts.succeeded()) {
dos.writeBoolean(true);
var t = ts.value();
dos.writeInt(t.size());
for (TdApi.Object t1 : t) {
t1.serialize(dos);
}
} else {
dos.writeBoolean(false);
ts.error().serialize(dos);
}
}
} catch (IOException ex) {
@ -44,16 +43,16 @@ public class TdResultListMessageCodec implements MessageCodec<TdResultList, TdRe
public TdResultList decodeFromWire(int pos, Buffer buffer) {
try (var fis = new VertxBufferInputStream(buffer, pos)) {
try (var dis = new SafeDataInputStream(fis)) {
var size = dis.readInt();
ArrayList<TdResult<TdApi.Object>> list = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
if (dis.readBoolean()) {
list.add(TdResult.succeeded((TdApi.Object) TdApi.Deserializer.deserialize(dis)));
} else {
list.add(TdResult.failed((Error) TdApi.Deserializer.deserialize(dis)));
if (dis.readBoolean()) {
var size = dis.readInt();
ArrayList<TdApi.Object> list = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
list.add((TdApi.Object) TdApi.Deserializer.deserialize(dis));
}
return new TdResultList(list);
} else {
return new TdResultList((Error) TdApi.Deserializer.deserialize(dis));
}
return new TdResultList(list);
}
} catch (IOException | UnsupportedOperationException ex) {
ex.printStackTrace();

View File

@ -144,8 +144,14 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
.timeout(Duration.ofSeconds(20), Mono.fromCallable(() -> {
throw new IllegalStateException("Server did not respond to 4 pings after 20 seconds (5 seconds per ping)");
}))
.flatMap(updates -> Flux.fromIterable(((TdResultList) updates.body()).getValues()))
.flatMap(update -> Mono.fromCallable(update::orElseThrow))
.flatMap(updates -> {
var result = (TdResultList) updates.body();
if (result.succeeded()) {
return Flux.fromIterable(result.value());
} else {
return Mono.fromCallable(() -> TdResult.failed(result.error()).orElseThrow());
}
})
.flatMap(this::interceptUpdate)
.doOnError(crash::tryEmitError)
.doOnTerminate(updatesStreamEnd::tryEmitEmpty);

View File

@ -78,14 +78,7 @@ public class AsyncTdMiddleDirect extends AbstractVerticle implements AsyncTdMidd
.receive(new AsyncTdDirectOptions(WAIT_DURATION, 100))
.takeUntilOther(closeRequest.asMono())
.doOnError(ex -> logger.info("TdMiddle verticle error", ex))
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"))
.doOnNext(result -> {
if (result.failed()) {
logger.error("Received an errored update: {}", result.cause());
}
})
.filter(TdResult::succeeded)
.map(TdResult::result);
.doOnTerminate(() -> logger.debug("TdMiddle verticle stopped"));
}
@Override

View File

@ -14,6 +14,7 @@ import it.tdlight.jni.TdApi.SetTdlibParameters;
import it.tdlight.jni.TdApi.Update;
import it.tdlight.jni.TdApi.UpdateAuthorizationState;
import it.tdlight.tdlibsession.remoteclient.TDLibRemoteClient;
import it.tdlight.tdlibsession.td.TdError;
import it.tdlight.tdlibsession.td.TdResultMessage;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectImpl;
import it.tdlight.tdlibsession.td.direct.AsyncTdDirectOptions;
@ -270,26 +271,29 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
Flux<TdResultList> updatesFlux = td
.receive(tdOptions)
.flatMap(item -> Mono.defer(() -> {
if (item.succeeded()) {
var tdObject = item.result();
if (tdObject instanceof Update) {
var tdUpdate = (Update) tdObject;
if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate;
if (tdUpdateAuthorizationState.authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) {
logger.debug("Undeploying after receiving AuthorizationStateClosed");
return rxStop().as(MonoUtils::toMono).thenReturn(item);
}
if (item instanceof Update) {
var tdUpdate = (Update) item;
if (tdUpdate.getConstructor() == UpdateAuthorizationState.CONSTRUCTOR) {
var tdUpdateAuthorizationState = (UpdateAuthorizationState) tdUpdate;
if (tdUpdateAuthorizationState.authorizationState.getConstructor()
== AuthorizationStateClosed.CONSTRUCTOR) {
logger.debug("Undeploying after receiving AuthorizationStateClosed");
return rxStop().as(MonoUtils::toMono).thenReturn(item);
}
} else if (tdObject instanceof Error) {
// An error in updates means that a fatal error occurred
logger.debug("Undeploying after receiving a fatal error");
return rxStop().as(MonoUtils::toMono).thenReturn(item);
}
return Mono.just(item);
} else if (item instanceof Error) {
// An error in updates means that a fatal error occurred
logger.debug("Undeploying after receiving a fatal error");
return rxStop().as(MonoUtils::toMono).thenReturn(item);
}
return Mono.just(item);
}))
.flatMap(item -> Mono.fromCallable(() -> {
if (item.getConstructor() == TdApi.Error.CONSTRUCTOR) {
var error = (Error) item;
throw new TdError(error.code, error.message);
} else {
return Mono.just(item);
return item;
}
}))
.bufferTimeout(tdOptions.getEventsSize(), local ? Duration.ofMillis(1) : Duration.ofMillis(100))