Limit rate

This commit is contained in:
Andrea Cavalli 2021-02-15 02:29:49 +01:00
parent 0ef86b084a
commit 869447f5e6

View File

@ -25,7 +25,6 @@ import it.tdlight.tdlibsession.td.middle.TdResultList;
import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec; import it.tdlight.tdlibsession.td.middle.TdResultListMessageCodec;
import it.tdlight.tdlibsession.td.middle.TdResultMessage; import it.tdlight.tdlibsession.td.middle.TdResultMessage;
import it.tdlight.utils.BinlogUtils; import it.tdlight.utils.BinlogUtils;
import it.tdlight.utils.BufferTimeOutPublisher;
import it.tdlight.utils.MonoUtils; import it.tdlight.utils.MonoUtils;
import java.net.ConnectException; import java.net.ConnectException;
import java.time.Duration; import java.time.Duration;
@ -64,7 +63,7 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
private final One<Flux<Void>> pipeFlux = Sinks.one(); private final One<Flux<Void>> pipeFlux = Sinks.one();
public AsyncTdMiddleEventBusServer() { public AsyncTdMiddleEventBusServer() {
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 50); this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 100);
this.clientFactory = new TelegramClientFactory(); this.clientFactory = new TelegramClientFactory();
} }
@ -340,9 +339,11 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
return update; return update;
} }
})) }))
.transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100))) .limitRate(Math.max(1, tdOptions.getEventsSize()))
//.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)) //.transform(normal -> new BufferTimeOutPublisher<>(normal, Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100)))
.bufferTimeout(Math.max(1, tdOptions.getEventsSize()), local ? Duration.ofMillis(1) : Duration.ofMillis(100))
//.map(List::of) //.map(List::of)
.limitRate(Math.max(1, tdOptions.getEventsSize()))
.map(TdResultList::new); .map(TdResultList::new);
var fluxCodec = new TdResultListMessageCodec(); var fluxCodec = new TdResultListMessageCodec();