diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java index b4e9df5..af7db42 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneGroupedReactiveIterator.java @@ -43,44 +43,51 @@ public abstract class LLLocalLuceneGroupedReactiveIterator { public Flux> flux() { return Flux .generate(() -> { - var readOptions = new ReadOptions(this.readOptions); - readOptions.setFillCache(range.hasMin() && range.hasMax()); - if (range.hasMin()) { - readOptions.setIterateLowerBound(new Slice(range.getMin())); - } - if (range.hasMax()) { - readOptions.setIterateUpperBound(new Slice(range.getMax())); - } - readOptions.setPrefixSameAsStart(true); - var rocksIterator = db.newIterator(cfh, readOptions); - if (range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return rocksIterator; - }, (rocksIterator, sink) -> { - ObjectArrayList values = new ObjectArrayList<>(); - byte[] firstGroupKey = null; - - while (rocksIterator.isValid()) { - byte[] key = rocksIterator.key(); - if (firstGroupKey == null) { - firstGroupKey = key; - } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { - break; + synchronized (this) { + var readOptions = new ReadOptions(this.readOptions); + readOptions.setFillCache(range.hasMin() && range.hasMax()); + if (range.hasMin()) { + readOptions.setIterateLowerBound(new Slice(range.getMin())); } - byte[] value = readValues ? rocksIterator.value() : EMPTY; - rocksIterator.next(); - values.add(getEntry(key, value)); + if (range.hasMax()) { + readOptions.setIterateUpperBound(new Slice(range.getMax())); + } + var rocksIterator = db.newIterator(cfh, readOptions); + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return rocksIterator; } - if (!values.isEmpty()) { - sink.next(values); - } else { - sink.complete(); + }, (rocksIterator, sink) -> { + synchronized (this) { + ObjectArrayList values = new ObjectArrayList<>(); + byte[] firstGroupKey = null; + + while (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + if (firstGroupKey == null) { + firstGroupKey = key; + } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + break; + } + byte[] value = readValues ? rocksIterator.value() : EMPTY; + rocksIterator.next(); + values.add(getEntry(key, value)); + } + if (!values.isEmpty()) { + sink.next(values); + } else { + sink.complete(); + } + return rocksIterator; } - return rocksIterator; - }, rocksIterator1 -> rocksIterator1.close()); + }, rocksIterator1 -> { + synchronized (this) { + rocksIterator1.close(); + } + }); } public abstract T getEntry(byte[] key, byte[] value); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java index 07961a8..7e591cf 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneKeyPrefixesReactiveIterator.java @@ -38,41 +38,48 @@ public class LLLocalLuceneKeyPrefixesReactiveIterator { public Flux flux() { return Flux .generate(() -> { - var readOptions = new ReadOptions(this.readOptions); - readOptions.setFillCache(range.hasMin() && range.hasMax()); - if (range.hasMin()) { - readOptions.setIterateLowerBound(new Slice(range.getMin())); - } - if (range.hasMax()) { - readOptions.setIterateUpperBound(new Slice(range.getMax())); - } - readOptions.setPrefixSameAsStart(true); - var rocksIterator = db.newIterator(cfh, readOptions); - if (range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return rocksIterator; - }, (rocksIterator, sink) -> { - byte[] firstGroupKey = null; - - while (rocksIterator.isValid()) { - byte[] key = rocksIterator.key(); - if (firstGroupKey == null) { - firstGroupKey = key; - } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { - break; + synchronized (this) { + var readOptions = new ReadOptions(this.readOptions); + readOptions.setFillCache(range.hasMin() && range.hasMax()); + if (range.hasMin()) { + readOptions.setIterateLowerBound(new Slice(range.getMin())); } - rocksIterator.next(); + if (range.hasMax()) { + readOptions.setIterateUpperBound(new Slice(range.getMax())); + } + var rocksIterator = db.newIterator(cfh, readOptions); + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return rocksIterator; } - if (firstGroupKey != null) { - var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength); - sink.next(groupKeyPrefix); - } else { - sink.complete(); + }, (rocksIterator, sink) -> { + synchronized (this) { + byte[] firstGroupKey = null; + + while (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + if (firstGroupKey == null) { + firstGroupKey = key; + } else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) { + break; + } + rocksIterator.next(); + } + if (firstGroupKey != null) { + var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength); + sink.next(groupKeyPrefix); + } else { + sink.complete(); + } + return rocksIterator; } - return rocksIterator; - }, rocksIterator1 -> rocksIterator1.close()); + }, rocksIterator1 -> { + synchronized (this) { + rocksIterator1.close(); + } + }); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java index e024913..6944299 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalLuceneReactiveIterator.java @@ -33,32 +33,40 @@ public abstract class LLLocalLuceneReactiveIterator { public Flux flux() { return Flux .generate(() -> { - var readOptions = new ReadOptions(this.readOptions); - readOptions.setFillCache(range.hasMin() && range.hasMax()); - if (range.hasMin()) { - readOptions.setIterateLowerBound(new Slice(range.getMin())); + synchronized (this) { + var readOptions = new ReadOptions(this.readOptions); + readOptions.setFillCache(range.hasMin() && range.hasMax()); + if (range.hasMin()) { + readOptions.setIterateLowerBound(new Slice(range.getMin())); + } + if (range.hasMax()) { + readOptions.setIterateUpperBound(new Slice(range.getMax())); + } + var rocksIterator = db.newIterator(cfh, readOptions); + if (range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return rocksIterator; } - if (range.hasMax()) { - readOptions.setIterateUpperBound(new Slice(range.getMax())); - } - var rocksIterator = db.newIterator(cfh, readOptions); - if (range.hasMin()) { - rocksIterator.seek(range.getMin()); - } else { - rocksIterator.seekToFirst(); - } - return rocksIterator; }, (rocksIterator, sink) -> { - if (rocksIterator.isValid()) { - byte[] key = rocksIterator.key(); - byte[] value = readValues ? rocksIterator.value() : EMPTY; - rocksIterator.next(); - sink.next(getEntry(key, value)); - } else { - sink.complete(); + synchronized (this) { + if (rocksIterator.isValid()) { + byte[] key = rocksIterator.key(); + byte[] value = readValues ? rocksIterator.value() : EMPTY; + rocksIterator.next(); + sink.next(getEntry(key, value)); + } else { + sink.complete(); + } + return rocksIterator; } - return rocksIterator; - }, rocksIterator1 -> rocksIterator1.close()); + }, rocksIterator1 -> { + synchronized (this) { + rocksIterator1.close(); + } + }); } public abstract T getEntry(byte[] key, byte[] value);