Fix iterations

This commit is contained in:
Andrea Cavalli 2021-03-14 14:16:07 +01:00
parent 3d5f987ffd
commit 05e8c87015
3 changed files with 113 additions and 91 deletions

View File

@ -43,44 +43,51 @@ public abstract class LLLocalLuceneGroupedReactiveIterator<T> {
public Flux<List<T>> flux() {
return Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
readOptions.setPrefixSameAsStart(true);
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}, (rocksIterator, sink) -> {
ObjectArrayList<T> values = new ObjectArrayList<>();
byte[] firstGroupKey = null;
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
if (firstGroupKey == null) {
firstGroupKey = key;
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
break;
synchronized (this) {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
byte[] value = readValues ? rocksIterator.value() : EMPTY;
rocksIterator.next();
values.add(getEntry(key, value));
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}
if (!values.isEmpty()) {
sink.next(values);
} else {
sink.complete();
}, (rocksIterator, sink) -> {
synchronized (this) {
ObjectArrayList<T> values = new ObjectArrayList<>();
byte[] firstGroupKey = null;
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
if (firstGroupKey == null) {
firstGroupKey = key;
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
break;
}
byte[] value = readValues ? rocksIterator.value() : EMPTY;
rocksIterator.next();
values.add(getEntry(key, value));
}
if (!values.isEmpty()) {
sink.next(values);
} else {
sink.complete();
}
return rocksIterator;
}
return rocksIterator;
}, rocksIterator1 -> rocksIterator1.close());
}, rocksIterator1 -> {
synchronized (this) {
rocksIterator1.close();
}
});
}
public abstract T getEntry(byte[] key, byte[] value);

View File

@ -38,41 +38,48 @@ public class LLLocalLuceneKeyPrefixesReactiveIterator {
public Flux<byte[]> flux() {
return Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
readOptions.setPrefixSameAsStart(true);
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}, (rocksIterator, sink) -> {
byte[] firstGroupKey = null;
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
if (firstGroupKey == null) {
firstGroupKey = key;
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
break;
synchronized (this) {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
rocksIterator.next();
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}
if (firstGroupKey != null) {
var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength);
sink.next(groupKeyPrefix);
} else {
sink.complete();
}, (rocksIterator, sink) -> {
synchronized (this) {
byte[] firstGroupKey = null;
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
if (firstGroupKey == null) {
firstGroupKey = key;
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
break;
}
rocksIterator.next();
}
if (firstGroupKey != null) {
var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength);
sink.next(groupKeyPrefix);
} else {
sink.complete();
}
return rocksIterator;
}
return rocksIterator;
}, rocksIterator1 -> rocksIterator1.close());
}, rocksIterator1 -> {
synchronized (this) {
rocksIterator1.close();
}
});
}
}

View File

@ -33,32 +33,40 @@ public abstract class LLLocalLuceneReactiveIterator<T> {
public Flux<T> flux() {
return Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
synchronized (this) {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}, (rocksIterator, sink) -> {
if (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
byte[] value = readValues ? rocksIterator.value() : EMPTY;
rocksIterator.next();
sink.next(getEntry(key, value));
} else {
sink.complete();
synchronized (this) {
if (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
byte[] value = readValues ? rocksIterator.value() : EMPTY;
rocksIterator.next();
sink.next(getEntry(key, value));
} else {
sink.complete();
}
return rocksIterator;
}
return rocksIterator;
}, rocksIterator1 -> rocksIterator1.close());
}, rocksIterator1 -> {
synchronized (this) {
rocksIterator1.close();
}
});
}
public abstract T getEntry(byte[] key, byte[] value);