Add resources flux

This commit is contained in:
Andrea Cavalli 2021-09-20 16:22:39 +02:00
parent 3c5edbc06e
commit b8adbf452e

View File

@ -417,6 +417,24 @@ public class LLUtils {
.doOnDiscard(Send.class, Send::close); .doOnDiscard(Send.class, Send::close);
} }
/**
* cleanup resource
* @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful
*/
public static <U, T extends Resource<T>> Flux<U> usingSendResources(Mono<Send<T>> resourceSupplier,
Function<T, Flux<U>> resourceClosure,
boolean cleanupOnSuccess) {
return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> {
if (cleanupOnSuccess) {
return Mono.fromRunnable(r::close);
} else {
return Mono.empty();
}
}, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close))
.doOnDiscard(Resource.class, Resource::close)
.doOnDiscard(Send.class, Send::close);
}
public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {} public static record DirectBuffer(@NotNull Send<Buffer> buffer, @NotNull ByteBuffer byteBuffer) {}
@NotNull @NotNull