diff --git a/src/main/java/it/cavallium/dbengine/database/LLRange.java b/src/main/java/it/cavallium/dbengine/database/LLRange.java index f34da15..66aba39 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLRange.java +++ b/src/main/java/it/cavallium/dbengine/database/LLRange.java @@ -16,7 +16,7 @@ import org.jetbrains.annotations.Nullable; */ public class LLRange extends SimpleResource { - private static final LLRange RANGE_ALL = new LLRange((Buffer) null, (Buffer) null, (Buffer) null); + private static final LLRange RANGE_ALL = new LLRange( null, null, (Buffer) null, false); @Nullable private final Buffer min; @Nullable @@ -24,16 +24,16 @@ public class LLRange extends SimpleResource { @Nullable private final Buffer single; - private LLRange(Send min, Send max, Send single) { - super(); + private LLRange(Send min, Send max, Send single, boolean closeable) { + super(closeable); assert single == null || (min == null && max == null); this.min = min != null ? min.receive().makeReadOnly() : null; this.max = max != null ? max.receive().makeReadOnly() : null; this.single = single != null ? single.receive().makeReadOnly() : null; } - private LLRange(Buffer min, Buffer max, Buffer single) { - super(); + private LLRange(Buffer min, Buffer max, Buffer single, boolean closeable) { + super(closeable); assert single == null || (min == null && max == null); this.min = min != null ? min.makeReadOnly() : null; this.max = max != null ? max.makeReadOnly() : null; @@ -41,31 +41,31 @@ public class LLRange extends SimpleResource { } public static LLRange all() { - return RANGE_ALL.copy(); + return RANGE_ALL; } public static LLRange from(Send min) { - return new LLRange(min, null, null); + return new LLRange(min, null, null, true); } public static LLRange to(Send max) { - return new LLRange(null, max, null); + return new LLRange(null, max, null, true); } public static LLRange single(Send single) { - return new LLRange(null, null, single); + return new LLRange(null, null, single, true); } public static LLRange singleUnsafe(Buffer single) { - return new LLRange(null, null, single); + return new LLRange(null, null, single, true); } public static LLRange of(Send min, Send max) { - return new LLRange(min, max, null); + return new LLRange(min, max, null, true); } public static LLRange ofUnsafe(Buffer min, Buffer max) { - return new LLRange(min, max, null); + return new LLRange(min, max, null, true); } public boolean isAll() { @@ -224,7 +224,8 @@ public class LLRange extends SimpleResource { // todo: use a read-only copy return new LLRange(min != null ? min.copy().send() : null, max != null ? max.copy().send() : null, - single != null ? single.copy().send(): null + single != null ? single.copy().send() : null, + true ); } } diff --git a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java index 940af71..00747f8 100644 --- a/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java +++ b/src/main/java/it/cavallium/dbengine/lucene/HugePqPriorityQueue.java @@ -26,6 +26,7 @@ import org.rocksdb.RocksDBException; import org.rocksdb.WriteBatch; import org.rocksdb.WriteOptions; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; public class HugePqPriorityQueue extends SimpleResource implements PriorityQueue, Reversable>, ReversableResourceIterable { @@ -339,7 +340,7 @@ public class HugePqPriorityQueue extends SimpleResource if (rocksIterWithReadOpts != null) { rocksIterWithReadOpts.close(); } - }).concatMapIterable(item -> item); + }).subscribeOn(Schedulers.boundedElastic()).concatMapIterable(item -> item); } @Override