Pass timeout

This commit is contained in:
Andrea Cavalli 2021-10-02 23:33:00 +02:00
parent c30ac9bec6
commit f444afd465
4 changed files with 23 additions and 7 deletions

View File

@ -3,6 +3,7 @@ package it.tdlight.tdlibsession.td.middle;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Function;
import java.time.Duration;
import java.util.Objects; import java.util.Objects;
import java.util.StringJoiner; import java.util.StringJoiner;
@ -12,13 +13,16 @@ public class ExecuteObject {
private boolean executeDirectly; private boolean executeDirectly;
private TdApi.Function request; private TdApi.Function request;
private Duration timeout;
private int pos; private int pos;
private Buffer buffer; private Buffer buffer;
public ExecuteObject(boolean executeDirectly, Function request) { public ExecuteObject(boolean executeDirectly, Function request, Duration timeout) {
if (request == null) throw new NullPointerException();
this.executeDirectly = executeDirectly; this.executeDirectly = executeDirectly;
this.request = request; this.request = request;
if (request == null) throw new NullPointerException(); this.timeout = timeout;
} }
public ExecuteObject(int pos, Buffer buffer) { public ExecuteObject(int pos, Buffer buffer) {
@ -32,6 +36,7 @@ public class ExecuteObject {
this.executeDirectly = data.executeDirectly; this.executeDirectly = data.executeDirectly;
this.request = data.request; this.request = data.request;
this.buffer = null; this.buffer = null;
this.timeout = data.timeout;
} }
} }
@ -45,6 +50,11 @@ public class ExecuteObject {
return request; return request;
} }
public Duration getTimeout() {
tryDecode();
return timeout;
}
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
tryDecode(); tryDecode();
@ -76,6 +86,7 @@ public class ExecuteObject {
return new StringJoiner(", ", ExecuteObject.class.getSimpleName() + "[", "]") return new StringJoiner(", ", ExecuteObject.class.getSimpleName() + "[", "]")
.add("executeDirectly=" + executeDirectly) .add("executeDirectly=" + executeDirectly)
.add("request=" + request) .add("request=" + request)
.add("timeout=" + timeout)
.toString(); .toString();
} }
} }

View File

@ -5,6 +5,7 @@ import io.vertx.core.eventbus.MessageCodec;
import it.tdlight.jni.TdApi; import it.tdlight.jni.TdApi;
import it.tdlight.jni.TdApi.Function; import it.tdlight.jni.TdApi.Function;
import it.tdlight.utils.BufferUtils; import it.tdlight.utils.BufferUtils;
import java.time.Duration;
public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject, ExecuteObject> { public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject, ExecuteObject> {
@ -17,14 +18,17 @@ public class TdExecuteObjectMessageCodec implements MessageCodec<ExecuteObject,
BufferUtils.encode(buffer, os -> { BufferUtils.encode(buffer, os -> {
os.writeBoolean(t.isExecuteDirectly()); os.writeBoolean(t.isExecuteDirectly());
t.getRequest().serialize(os); t.getRequest().serialize(os);
os.writeLong(t.getTimeout().toMillis());
}); });
} }
@Override @Override
public ExecuteObject decodeFromWire(int pos, Buffer buffer) { public ExecuteObject decodeFromWire(int pos, Buffer buffer) {
return BufferUtils.decode(pos, buffer, is -> { return BufferUtils.decode(pos, buffer, is -> new ExecuteObject(
return new ExecuteObject(is.readBoolean(), (Function) TdApi.Deserializer.deserialize(is)); is.readBoolean(),
}); (Function) TdApi.Deserializer.deserialize(is),
Duration.ofMillis(is.readLong())
));
} }
@Override @Override

View File

@ -344,7 +344,7 @@ public class AsyncTdMiddleEventBusClient implements AsyncTdMiddle {
@Override @Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean executeSync) { public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, Duration timeout, boolean executeSync) {
var req = new ExecuteObject(executeSync, request); var req = new ExecuteObject(executeSync, request, timeout);
var deliveryOptions = new DeliveryOptions(this.deliveryOptions).setSendTimeout(timeout.toMillis()); var deliveryOptions = new DeliveryOptions(this.deliveryOptions).setSendTimeout(timeout.toMillis());
var crashMono = crash.asMono() var crashMono = crash.asMono()

View File

@ -145,11 +145,12 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.flatMap(msg -> { .flatMap(msg -> {
var body = msg.body(); var body = msg.body();
var request = overrideRequest(body.getRequest(), botId); var request = overrideRequest(body.getRequest(), botId);
var timeout = body.getTimeout();
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Received execute request {}", request); logger.trace("Received execute request {}", request);
} }
return td return td
.execute(request, Duration.ofSeconds(60 + 30), body.isExecuteDirectly()) .execute(request, timeout, body.isExecuteDirectly())
.single() .single()
.doOnSuccess(s -> logger.trace("Executed successfully. Request was {}", request)) .doOnSuccess(s -> logger.trace("Executed successfully. Request was {}", request))
.onErrorResume(ex -> Mono.fromRunnable(() -> msg.fail(500, ex.getLocalizedMessage()))) .onErrorResume(ex -> Mono.fromRunnable(() -> msg.fail(500, ex.getLocalizedMessage())))