Update BoundedGroupedRocksFluxIterable

This commit is contained in:
Andrea Cavalli 2021-02-26 16:24:03 +01:00
parent 4e2030dab8
commit e854b4252c

View File

@ -19,8 +19,6 @@ public abstract class BoundedGroupedRocksFluxIterable<T> extends BlockingFluxIte
protected RocksIterator rocksIterator;
protected ReadOptions readOptions;
protected byte[] firstGroupKey = null;
protected List<T> currentGroupValues = new ArrayList<>();
public BoundedGroupedRocksFluxIterable(Scheduler scheduler,
RocksDB db,
@ -55,33 +53,35 @@ public abstract class BoundedGroupedRocksFluxIterable<T> extends BlockingFluxIte
@Nullable
@Override
public List<T> onNext() {
byte[] firstGroupKey = null;
List<T> currentGroupValues = new ArrayList<>();
while (rocksIterator.isValid()) {
if (!rocksIterator.isValid()) {
break;
}
byte[] key = rocksIterator.key();
if (firstGroupKey == null) { // Fix first value
firstGroupKey = key;
}
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
rocksIterator.next();
break;
}
List<T> result = null;
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(transformEntry(key));
} else {
if (!currentGroupValues.isEmpty()) {
result = currentGroupValues;
List<T> result = null;
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(transformEntry(key));
} else {
if (!currentGroupValues.isEmpty()) {
result = currentGroupValues;
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
}
if (result != null) {
rocksIterator.next();
return result;
} else {
rocksIterator.next();
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
}
if (result != null) {
return result;
}
rocksIterator.next();
}
if (!currentGroupValues.isEmpty()) {
return currentGroupValues;