Fix registration sink

This commit is contained in:
Andrea Cavalli 2021-01-13 17:40:57 +01:00
parent d3f813d8cb
commit 841045322f

View File

@ -95,22 +95,25 @@ public class EventBusFlux {
if (h.succeeded()) { if (h.succeeded()) {
dispose.completionHandler(h2 -> { dispose.completionHandler(h2 -> {
if (h2.succeeded()) { if (h2.succeeded()) {
sink.success(); msg.reply((Long) subscriptionId);
} else { } else {
sink.error(h.cause()); logger.error("Failed to register dispose", h.cause());
msg.fail(500, "Failed to register dispose");
} }
}); });
} else { } else {
sink.error(h.cause()); logger.error("Failed to register cancel", h.cause());
msg.fail(500, "Failed to register cancel");
} }
}); });
msg.reply((Long) subscriptionId);
}); });
subscribe.completionHandler(h -> { subscribe.completionHandler(h -> {
if (h.failed()) { if (h.failed()) {
sink.error(h.cause()); sink.error(h.cause());
} else {
sink.success();
} }
}); });
}); });