From b8adbf452ef8e2d5ab99aed224a105853c948a94 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Mon, 20 Sep 2021 16:22:39 +0200 Subject: [PATCH] Add resources flux --- .../cavallium/dbengine/database/LLUtils.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 5194c1e..83fe3f9 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -417,6 +417,24 @@ public class LLUtils { .doOnDiscard(Send.class, Send::close); } + /** + * cleanup resource + * @param cleanupOnSuccess if true the resource will be cleaned up if the function is successful + */ + public static > Flux usingSendResources(Mono> resourceSupplier, + Function> resourceClosure, + boolean cleanupOnSuccess) { + return Flux.usingWhen(resourceSupplier.map(Send::receive), resourceClosure, r -> { + if (cleanupOnSuccess) { + return Mono.fromRunnable(r::close); + } else { + return Mono.empty(); + } + }, (r, ex) -> Mono.fromRunnable(r::close), r -> Mono.fromRunnable(r::close)) + .doOnDiscard(Resource.class, Resource::close) + .doOnDiscard(Send.class, Send::close); + } + public static record DirectBuffer(@NotNull Send buffer, @NotNull ByteBuffer byteBuffer) {} @NotNull