This commit is contained in:
Andrea Cavalli 2021-01-13 22:05:34 +01:00
parent ae0d2e1e68
commit 9b6812982f
4 changed files with 29 additions and 33 deletions

View File

@ -63,7 +63,7 @@ public class EventBusFlux {
subscriptionReady.unregister(subscriptionReadyUnregistered -> {
if (subscriptionReadyUnregistered.succeeded()) {
var subscription = flux.subscribe(item -> {
var request = eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, msg2 -> {
eventBus.request(subscriptionAddress + ".signal", SignalMessage.<T>onNext(item), signalDeliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to send onNext signal", msg2.cause());
}
@ -83,15 +83,11 @@ public class EventBusFlux {
});
cancel.handler(msg3 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
msg3.reply(EMPTY, deliveryOptions);
});
dispose.handler(msg2 -> {
if (!subscription.isDisposed()) {
subscription.dispose();
}
cancel.unregister(v -> {
if (v.failed()) {
logger.error("Failed to unregister cancel", v.cause());
@ -181,20 +177,28 @@ public class EventBusFlux {
msg2.reply(EMPTY);
});
signalConsumer.completionHandler(h -> {
if (h.failed()) {
emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause()));
} else {
if (h.succeeded()) {
eventBus.<Long>request(fluxAddress + ".subscriptionReady", EMPTY, deliveryOptions, msg2 -> {
if (msg2.failed()) {
logger.error("Failed to tell that the subscription is ready");
}
});
} else {
emitter.error(new IllegalStateException("Signal consumer registration failed", msg.cause()));
}
});
emitter.onDispose(() -> eventBus.send(subscriptionAddress + ".dispose", EMPTY, deliveryOptions));
emitter.onDispose(() -> eventBus.request(subscriptionAddress + ".dispose", EMPTY, deliveryOptions, msg2 -> {
if (msg.failed()) {
logger.error("Failed to tell that the subscription is disposed");
}
}));
emitter.onCancel(() -> eventBus.send(subscriptionAddress + ".cancel", EMPTY, deliveryOptions));
emitter.onCancel(() -> eventBus.request(subscriptionAddress + ".cancel", EMPTY, deliveryOptions, msg2 -> {
if (msg.failed()) {
logger.error("Failed to tell that the subscription is cancelled");
}
}));
} else {
emitter.error(new IllegalStateException("Subscription failed", msg.cause()));
}

View File

@ -45,7 +45,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
throw new IllegalStateException("TDLib client is destroyed");
}
}).publishOn(Schedulers.boundedElastic()).single().subscribeOn(tdScheduler));
}).publishOn(Schedulers.boundedElastic()).single());
} else {
return td.asMono().flatMap(td -> Mono.<TdResult<T>>create(sink -> {
if (td != null) {
@ -60,7 +60,7 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
}
sink.error(new IllegalStateException("TDLib client is destroyed"));
}
})).single().subscribeOn(tdScheduler);
})).single();
}
}
@ -89,8 +89,8 @@ public class AsyncTdDirectImpl implements AsyncTdDirect {
closedFromTd.asMono().take(Duration.ofMillis(10)).switchIfEmpty(Mono.fromRunnable(() -> client.send(new Close(),
result -> logger.trace("Close result: {}", result),
ex -> logger.trace("Error when disposing td client", ex)
))).subscribeOn(tdScheduler).subscribe();
))).subscribe();
});
});
}).subscribeOn(tdScheduler);
}
}

View File

@ -39,8 +39,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.Many;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements AsyncTdMiddle {
@ -49,7 +47,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
public static final boolean OUTPUT_REQUESTS = false;
public static final byte[] EMPTY = new byte[0];
private final Scheduler tdMiddleScheduler = Schedulers.single();
private final Many<Boolean> tdClosed = Sinks.many().replay().latestOrDefault(false);
private final DeliveryOptions deliveryOptions;
private final DeliveryOptions deliveryOptionsWithTimeout;
@ -149,7 +146,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
if (msg.succeeded()) {
this.listen()
.timeout(Duration.ofSeconds(30))
.subscribeOn(tdMiddleScheduler)
.subscribe(v -> {}, future::fail, future::complete);
} else {
future.fail(msg.cause());
@ -244,7 +240,7 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
tdClosed.tryEmitNext(true);
}
}
})).subscribeOn(tdMiddleScheduler);
}));
}
@Override
@ -300,6 +296,6 @@ public class AsyncTdMiddleEventBusClient extends AbstractVerticle implements Asy
}
}).switchIfEmpty(Mono.fromSupplier(() -> {
return TdResult.failed(new TdApi.Error(500, "Client is closed or response is empty"));
})).subscribeOn(tdMiddleScheduler);
}));
}
}

View File

@ -33,8 +33,6 @@ import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public class AsyncTdMiddleEventBusServer {
@ -54,7 +52,6 @@ public class AsyncTdMiddleEventBusServer {
private boolean local;
protected AsyncTdDirectImpl td;
private final Scheduler tdSrvPoll;
/**
* Value is not important, emits when a request is received
*/
@ -68,7 +65,6 @@ public class AsyncTdMiddleEventBusServer {
public AsyncTdMiddleEventBusServer(TdClusterManager clusterManager) {
this.cluster = clusterManager;
this.tdOptions = new AsyncTdDirectOptions(WAIT_DURATION, 1000);
this.tdSrvPoll = Schedulers.single();
if (cluster.registerDefaultCodec(TdResultList.class, new TdResultListMessageCodec())) {
cluster.registerDefaultCodec(ExecuteObject.class, new TdExecuteObjectMessageCodec());
cluster.registerDefaultCodec(TdResultMessage.class, new TdResultMessageCodec());
@ -99,7 +95,7 @@ public class AsyncTdMiddleEventBusServer {
workingMsg.reply(EMPTY, cluster.newDeliveryOpts().setLocalOnly(local));
});
this.isWorkingConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
})).subscribeOn(this.tdSrvPoll)
}))
.subscribe(v -> {}, ex -> {
logger.info(botAddress + " server deployed and started. succeeded: false");
logger.error(ex.getLocalizedMessage(), ex);
@ -123,7 +119,7 @@ public class AsyncTdMiddleEventBusServer {
sink.error(h.cause());
}
});
}).subscribeOn(tdSrvPoll);
});
}
public void onBeforeStop(Consumer<Promise<Void>> r) {
@ -180,7 +176,7 @@ public class AsyncTdMiddleEventBusServer {
} catch (Exception ex) {
sink.error(ex);
}
}).subscribeOn(this.tdSrvPoll)
})
.subscribe(response -> {}, ex -> {
logger.error("Error when processing a request", ex);
msg.fail(500, ex.getLocalizedMessage());
@ -191,7 +187,7 @@ public class AsyncTdMiddleEventBusServer {
}
});
executeConsumer.completionHandler(MonoUtils.toHandler(registrationSink));
}).subscribeOn(tdSrvPoll);
});
}
private void undeploy(Runnable whenUndeployed) {
@ -227,7 +223,7 @@ public class AsyncTdMiddleEventBusServer {
whenUndeployed.run();
});
}).subscribeOn(this.tdSrvPoll).subscribe(v -> {}, ex -> {
}).subscribe(v -> {}, ex -> {
logger.error("Error when stopping", ex);
}, () -> {});
});
@ -251,7 +247,7 @@ public class AsyncTdMiddleEventBusServer {
System.out.println("<=: end (3)");
}
this.undeploy(() -> {});
}).subscribeOn(tdSrvPoll);
});
var fluxCodec = new TdResultListMessageCodec();
return EventBusFlux.<TdResultList>serve(updatesFlux,
cluster.getEventBus(),
@ -259,6 +255,6 @@ public class AsyncTdMiddleEventBusServer {
cluster.newDeliveryOpts().setLocalOnly(local),
fluxCodec,
Duration.ofSeconds(30)
).subscribeOn(tdSrvPoll);
);
}
}