Simplify scheduling

This commit is contained in:
Andrea Cavalli 2021-01-13 20:30:01 +01:00
parent a3a2893fb8
commit 89335f9526
5 changed files with 79 additions and 67 deletions

View File

@ -34,6 +34,7 @@ public class EventBusFlux {
MessageCodec<T, T> itemsCodec,
Duration connectionTimeout) {
var signalsCodec = new SignalMessageCodec<T>(itemsCodec);
EventBusFlux.registerFluxCodec(eventBus, itemsCodec);
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
.setSendTimeout(connectionTimeout.toMillis());
var signalDeliveryOptions = new DeliveryOptions(deliveryOptions)
@ -52,70 +53,82 @@ public class EventBusFlux {
long subscriptionId = 0;
var subscriptionAddress = fluxAddress + "." + subscriptionId;
MessageConsumer<byte[]> subscriptionReady = eventBus.consumer(fluxAddress + ".subscriptionReady");
MessageConsumer<byte[]> dispose = eventBus.consumer(subscriptionAddress + ".dispose");
MessageConsumer<byte[]> cancel = eventBus.consumer(subscriptionAddress + ".cancel");
var subscription = flux.subscribe(item -> {
var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
}, error -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onError(error), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
}, () -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
});
cancel.handler(msg3 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
msg3.reply(EMPTY, deliveryOptions);
});
dispose.handler(msg2 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
cancel.unregister(v -> {
if (v.failed()) {
logger.error("Failed to unregister cancel", v.cause());
}
dispose.unregister(v2 -> {
if (v.failed()) {
logger.error("Failed to unregister dispose", v2.cause());
subscriptionReady.<Long>handler(subscriptionReadyMsg -> {
var subscription = flux.subscribe(item -> {
var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
subscribe.unregister(v3 -> {
if (v2.failed()) {
logger.error("Failed to unregister subscribe", v3.cause());
});
}, error -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onError(error), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
}, () -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onComplete(), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
});
});
cancel.handler(msg3 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
msg3.reply(EMPTY, deliveryOptions);
});
dispose.handler(msg2 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
cancel.unregister(v -> {
if (v.failed()) {
logger.error("Failed to unregister cancel", v.cause());
}
dispose.unregister(v2 -> {
if (v.failed()) {
logger.error("Failed to unregister dispose", v2.cause());
}
msg2.reply(EMPTY);
subscribe.unregister(v3 -> {
if (v2.failed()) {
logger.error("Failed to unregister subscribe", v3.cause());
}
msg2.reply(EMPTY);
});
});
});
});
cancel.completionHandler(h -> {
if (h.succeeded()) {
dispose.completionHandler(h2 -> {
if (h2.succeeded()) {
subscriptionReadyMsg.reply((Long) subscriptionId);
} else {
logger.error("Failed to register dispose", h.cause());
subscriptionReadyMsg.fail(500, "Failed to register dispose");
}
});
} else {
logger.error("Failed to register cancel", h.cause());
subscriptionReadyMsg.fail(500, "Failed to register cancel");
}
});
});
cancel.completionHandler(h -> {
if (h.succeeded()) {
dispose.completionHandler(h2 -> {
if (h2.succeeded()) {
msg.reply((Long) subscriptionId);
} else {
logger.error("Failed to register dispose", h.cause());
msg.fail(500, "Failed to register dispose");
}
});
subscriptionReady.completionHandler(srh -> {
if (srh.succeeded()) {
msg.reply((Long) subscriptionId);
} else {
logger.error("Failed to register cancel", h.cause());
msg.fail(500, "Failed to register cancel");
logger.error("Failed to register \"subscription ready\"", srh.cause());
msg.fail(500, "Failed to register \"subscription ready\"");
}
});
@ -136,6 +149,7 @@ public class EventBusFlux {
DeliveryOptions baseDeliveryOptions,
MessageCodec<T, T> itemsCodec,
Duration connectionTimeout) {
EventBusFlux.registerFluxCodec(eventBus, itemsCodec);
return Flux.<T>create(emitter -> {
var deliveryOptions = new DeliveryOptions(baseDeliveryOptions)
.setSendTimeout(connectionTimeout.toMillis());
@ -163,6 +177,12 @@ public class EventBusFlux {
signalConsumer.completionHandler(h -> {
if (h.failed()) {
emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause()));
} else {
eventBus.<Long>request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to tell that the subscription is ready");
}
});
}
});

View File

@ -23,8 +23,6 @@ import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class TDLibRemoteClient implements AutoCloseable {
@ -36,7 +34,6 @@ public class TDLibRemoteClient implements AutoCloseable {
private final int port;
private final Set<String> membersAddresses;
private final Many<TdClusterManager> clusterManager = Sinks.many().replay().latest();
private final Scheduler deploymentScheduler = Schedulers.single();
public TDLibRemoteClient(SecurityInfo securityInfo, String masterHostname, String netInterface, int port, Set<String> membersAddresses) {
this.securityInfo = securityInfo;
@ -263,7 +260,7 @@ public class TDLibRemoteClient implements AutoCloseable {
});
verticle.start(botAddress, botAddress, false).doOnError(error -> {
logger.error("Can't deploy bot \"" + botAddress + "\"", error);
}).subscribeOn(deploymentScheduler).subscribe(v -> {}, err -> {
}).subscribe(v -> {}, err -> {
deploymentHandler.handle(Future.failedFuture(err));
}, () -> {
deploymentHandler.handle(Future.succeededFuture());

View File

@ -17,7 +17,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.One;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AsyncTdDirectImpl implements AsyncTdDirect {
@ -25,7 +24,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
private static final Logger logger = LoggerFactory.getLogger(AsyncTdDirect.class);
private final One<TelegramClient> td = Sinks.one();
private final Scheduler tdScheduler = Schedulers.newSingle("TdMain");
private final String botAlias;
@ -45,7 +43,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
throw new IllegalStateException("TDLib client is destroyed");
}
}).publishOn(Schedulers.boundedElastic()).single().subscribeOn(tdScheduler));
}).publishOn(Schedulers.boundedElastic()).single());
} else {
return td.asMono().flatMap(td -> Mono.<TdResult<T>>create(sink -> {
if (td != null) {
@ -60,7 +58,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
sink.error(new IllegalStateException("TDLib client is destroyed"));
}
})).single().subscribeOn(tdScheduler);
})).single();
}
}
@ -91,6 +89,6 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
ex -> logger.trace("Error when disposing td client", ex)
))).subscribe();
});
}).subscribeOn(tdScheduler);
});
}
}

View File

@ -149,7 +149,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
if (msg.succeeded()) {
this.listen()
.timeout(Duration.ofSeconds(30))
.subscribeOn(tdMiddleScheduler)
.subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(msg.cause());
@ -208,7 +207,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
@Override
public Flux<TdApi.Object> receive() {
var fluxCodec = new TdResultListMessageCodec();
EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec);
return Mono.from(tdClosed.asFlux()).single().filter(tdClosed -> !tdClosed).flatMapMany(_closed -> EventBusFlux
.<TdResultList>connect(cluster.getEventBus(),
botAddress + ".updates",
@ -245,7 +243,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
tdClosed.tryEmitNext(true);
}
}
})).subscribeOn(tdMiddleScheduler);
}));
}
@Override
@ -301,6 +299,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
}).switchIfEmpty(Mono.fromSupplier(() -> {
return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"));
})).subscribeOn(tdMiddleScheduler);
}));
}
}

View File

@ -253,7 +253,6 @@ public class AsyncTdMiddleEventBusServer {
this.undeploy(() -> {});
});
var fluxCodec = new TdResultListMessageCodec();
EventBusFlux.registerFluxCodec(cluster.getEventBus(), fluxCodec);
return EventBusFlux.<TdResultList>serve(updatesFlux,
cluster.getEventBus(),
botAddress + ".updates",