Redesigned subscribers

This commit is contained in:
Andrea Cavalli 2020-10-19 17:25:10 +02:00
parent d57c5ef8ae
commit 85bac8670d
2 changed files with 13 additions and 30 deletions

View File

@ -28,10 +28,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
private final AtomicReference<TelegramClient> td = new AtomicReference<>();
private final Scheduler tdScheduler = Schedulers.newSingle("TdMain");
private final Scheduler tdPollScheduler = Schedulers.newSingle("TdPoll");
private final Scheduler tdUpdatesScheduler = Schedulers.newSingle("TdUpdate");
private final Scheduler tdResponsesScheduler = Schedulers.newSingle("TdResponse");
private final Scheduler tdResponsesOutputScheduler = Schedulers.boundedElastic();
private final EmitterProcessor<AsyncResult<TdResult<Update>>> updatesProcessor = EmitterProcessor.create();
private Flux<AsyncResult<TdResult<Update>>> updatesProcessor;
private final String botAlias;
public AsyncTdDirectImpl(String botAlias) {
@ -41,7 +41,10 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
@Override
public <T extends TdApi.Object> Mono<TdResult<T>> execute(Function request, boolean synchronous) {
if (synchronous) {
return Mono.just(TdResult.of(this.td.get().execute(request)));
return Mono
.fromCallable(() -> TdResult.<T>of(this.td.get().execute(request)))
.subscribeOn(tdResponsesScheduler)
.publishOn(Schedulers.single());
} else {
return Mono.<TdResult<T>>create(sink -> {
try {
@ -51,21 +54,13 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
} catch (Throwable t) {
sink.error(t);
}
}).subscribeOn(tdResponsesScheduler);
}).subscribeOn(tdResponsesScheduler).publishOn(tdResponsesOutputScheduler);
}
}
@Override
public Flux<AsyncResult<TdResult<Update>>> getUpdates(Duration receiveDuration, int eventsSize) {
return Flux.from(updatesProcessor.subscribeOn(tdUpdatesScheduler));
}
public Scheduler getTdUpdatesScheduler() {
return tdUpdatesScheduler;
}
public Scheduler getTdResponsesScheduler() {
return tdResponsesScheduler;
return updatesProcessor;
}
@Override
@ -93,25 +88,14 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}).subscribeOn(tdPollScheduler).publish();
// Complete initialization when receiving first update
updatesConnectableFlux.subscribeOn(tdPollScheduler).take(1).single().subscribe(next -> {
sink.success(true);
}, error -> {
sink.error(error);
}, () -> {
sink.success(true);
});
updatesConnectableFlux.take(1).single()
.doOnSuccess(_v -> sink.success(true)).doOnError(sink::error).subscribe();
// Pass updates to UpdatesProcessor
updatesConnectableFlux.subscribeOn(tdPollScheduler).subscribe(next -> {
updatesProcessor.onNext(next);
}, error -> {
updatesProcessor.onError(error);
}, () -> {
updatesProcessor.onComplete();
});
updatesProcessor = updatesConnectableFlux.publish().refCount();
updatesConnectableFlux.connect();
}).single().then().subscribeOn(tdScheduler);
}).single().then().subscribeOn(tdScheduler).publishOn(tdResponsesOutputScheduler);
}
@Override
@ -119,6 +103,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
return Mono.fromCallable(() -> {
// do nothing
return (Void) null;
}).single().subscribeOn(tdScheduler);
}).single().subscribeOn(tdScheduler).publishOn(tdResponsesOutputScheduler);
}
}

View File

@ -140,7 +140,6 @@ public class AsyncTdMiddleEventBusServer extends AbstractVerticle {
.from(tdClosed)
.single()
.filter(tdClosedVal -> !tdClosedVal)
.subscribeOn(td.getTdUpdatesScheduler())
.map(_v -> {
ArrayList<AsyncResult<TdResult<Update>>> updatesBatch = new ArrayList<>();
while (!queue.isEmpty() && updatesBatch.size() < 1000) {