Fix sigsegv
This commit is contained in:
parent
1aed618ca5
commit
63469c0f89
@ -5,6 +5,7 @@ import it.cavallium.dbengine.database.LLDictionary;
|
|||||||
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
import it.cavallium.dbengine.database.LLDictionaryResultType;
|
||||||
import it.cavallium.dbengine.database.LLRange;
|
import it.cavallium.dbengine.database.LLRange;
|
||||||
import it.cavallium.dbengine.database.LLSnapshot;
|
import it.cavallium.dbengine.database.LLSnapshot;
|
||||||
|
import it.cavallium.dbengine.database.disk.LLLocalDictionary;
|
||||||
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
@ -214,7 +215,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||||||
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
|
||||||
byte[] keySuffixData = serializeSuffix(keySuffix);
|
byte[] keySuffixData = serializeSuffix(keySuffix);
|
||||||
Flux<byte[]> keyFlux;
|
Flux<byte[]> keyFlux;
|
||||||
if (this.subStageGetter.needsDebuggingKeyFlux()) {
|
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) {
|
||||||
keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData));
|
keyFlux = this.dictionary.getRangeKeys(resolveSnapshot(snapshot), toExtRange(keySuffixData));
|
||||||
} else {
|
} else {
|
||||||
keyFlux = Flux.empty();
|
keyFlux = Flux.empty();
|
||||||
@ -229,7 +230,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
|
||||||
if (this.subStageGetter.needsDebuggingKeyFlux()) {
|
if (LLLocalDictionary.DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED && this.subStageGetter.needsDebuggingKeyFlux()) {
|
||||||
return dictionary
|
return dictionary
|
||||||
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
|
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
|
||||||
.flatMapSequential(rangeKeys -> {
|
.flatMapSequential(rangeKeys -> {
|
||||||
|
@ -48,8 +48,9 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
|
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
|
||||||
static final int MULTI_GET_WINDOW = 500;
|
static final int MULTI_GET_WINDOW = 500;
|
||||||
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
|
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions().setLowPri(true);
|
||||||
static final boolean PREFER_ALWAYS_SEEK_TO_FIRST = true;
|
static final boolean PREFER_SEEK_TO_FIRST = false;
|
||||||
static final boolean ALWAYS_VERIFY_CHECKSUMS = true;
|
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
|
||||||
|
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
|
||||||
|
|
||||||
private static final int STRIPES = 512;
|
private static final int STRIPES = 512;
|
||||||
private static final byte[] FIRST_KEY = new byte[]{};
|
private static final byte[] FIRST_KEY = new byte[]{};
|
||||||
@ -169,7 +170,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
return Mono
|
return Mono
|
||||||
.fromCallable(() -> {
|
.fromCallable(() -> {
|
||||||
var readOpts = resolveSnapshot(snapshot);
|
var readOpts = resolveSnapshot(snapshot);
|
||||||
readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS);
|
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||||
readOpts.setFillCache(false);
|
readOpts.setFillCache(false);
|
||||||
if (range.hasMin()) {
|
if (range.hasMin()) {
|
||||||
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
||||||
@ -700,7 +701,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
return Mono
|
return Mono
|
||||||
.<Void>fromCallable(() -> {
|
.<Void>fromCallable(() -> {
|
||||||
var readOpts = getReadOptions(null);
|
var readOpts = getReadOptions(null);
|
||||||
readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS);
|
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||||
|
|
||||||
// readOpts.setIgnoreRangeDeletions(true);
|
// readOpts.setIgnoreRangeDeletions(true);
|
||||||
readOpts.setFillCache(false);
|
readOpts.setFillCache(false);
|
||||||
@ -754,7 +755,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
.fromCallable(() -> {
|
.fromCallable(() -> {
|
||||||
var readOpts = resolveSnapshot(snapshot);
|
var readOpts = resolveSnapshot(snapshot);
|
||||||
readOpts.setFillCache(false);
|
readOpts.setFillCache(false);
|
||||||
readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS);
|
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||||
if (range.hasMin()) {
|
if (range.hasMin()) {
|
||||||
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
||||||
}
|
}
|
||||||
@ -842,7 +843,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
rocksdbSnapshot.setFillCache(false);
|
rocksdbSnapshot.setFillCache(false);
|
||||||
rocksdbSnapshot.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS);
|
rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||||
rocksdbSnapshot.setIgnoreRangeDeletions(false);
|
rocksdbSnapshot.setIgnoreRangeDeletions(false);
|
||||||
long count = 0;
|
long count = 0;
|
||||||
try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
|
try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
|
||||||
@ -860,7 +861,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
|
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
|
||||||
var readOpts = resolveSnapshot(snapshot);
|
var readOpts = resolveSnapshot(snapshot);
|
||||||
readOpts.setFillCache(false);
|
readOpts.setFillCache(false);
|
||||||
readOpts.setVerifyChecksums(ALWAYS_VERIFY_CHECKSUMS);
|
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
|
||||||
|
|
||||||
long count = 0;
|
long count = 0;
|
||||||
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
|
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
|
||||||
@ -885,7 +886,7 @@ public class LLLocalDictionary implements LLDictionary {
|
|||||||
readOpts.setIterateUpperBound(new Slice(range.getMax()));
|
readOpts.setIterateUpperBound(new Slice(range.getMax()));
|
||||||
}
|
}
|
||||||
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
|
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
|
||||||
if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) {
|
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
|
||||||
iter.seek(range.getMin());
|
iter.seek(range.getMin());
|
||||||
} else {
|
} else {
|
||||||
iter.seekToFirst();
|
iter.seekToFirst();
|
||||||
|
@ -4,11 +4,14 @@ import it.cavallium.dbengine.database.LLRange;
|
|||||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
import org.rocksdb.ColumnFamilyHandle;
|
import org.rocksdb.ColumnFamilyHandle;
|
||||||
import org.rocksdb.ReadOptions;
|
import org.rocksdb.ReadOptions;
|
||||||
import org.rocksdb.RocksDB;
|
import org.rocksdb.RocksDB;
|
||||||
|
import org.rocksdb.RocksMutableObject;
|
||||||
import org.rocksdb.Slice;
|
import org.rocksdb.Slice;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
public abstract class LLLocalLuceneGroupedReactiveIterator<T> {
|
public abstract class LLLocalLuceneGroupedReactiveIterator<T> {
|
||||||
|
|
||||||
@ -43,50 +46,56 @@ public abstract class LLLocalLuceneGroupedReactiveIterator<T> {
|
|||||||
public Flux<List<T>> flux() {
|
public Flux<List<T>> flux() {
|
||||||
return Flux
|
return Flux
|
||||||
.generate(() -> {
|
.generate(() -> {
|
||||||
synchronized (this) {
|
var readOptions = new ReadOptions(this.readOptions);
|
||||||
var readOptions = new ReadOptions(this.readOptions);
|
readOptions.setFillCache(range.hasMin() && range.hasMax());
|
||||||
readOptions.setFillCache(range.hasMin() && range.hasMax());
|
Slice sliceMin;
|
||||||
if (range.hasMin()) {
|
Slice sliceMax;
|
||||||
readOptions.setIterateLowerBound(new Slice(range.getMin()));
|
if (range.hasMin()) {
|
||||||
}
|
sliceMin = new Slice(range.getMin());
|
||||||
if (range.hasMax()) {
|
readOptions.setIterateLowerBound(sliceMin);
|
||||||
readOptions.setIterateUpperBound(new Slice(range.getMax()));
|
} else {
|
||||||
}
|
sliceMin = null;
|
||||||
var rocksIterator = db.newIterator(cfh, readOptions);
|
|
||||||
if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) {
|
|
||||||
rocksIterator.seek(range.getMin());
|
|
||||||
} else {
|
|
||||||
rocksIterator.seekToFirst();
|
|
||||||
}
|
|
||||||
return rocksIterator;
|
|
||||||
}
|
}
|
||||||
}, (rocksIterator, sink) -> {
|
if (range.hasMax()) {
|
||||||
synchronized (this) {
|
sliceMax = new Slice(range.getMax());
|
||||||
ObjectArrayList<T> values = new ObjectArrayList<>();
|
readOptions.setIterateUpperBound(sliceMax);
|
||||||
byte[] firstGroupKey = null;
|
} else {
|
||||||
|
sliceMax = null;
|
||||||
|
}
|
||||||
|
var rocksIterator = db.newIterator(cfh, readOptions);
|
||||||
|
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
|
||||||
|
rocksIterator.seek(range.getMin());
|
||||||
|
} else {
|
||||||
|
rocksIterator.seekToFirst();
|
||||||
|
}
|
||||||
|
return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax));
|
||||||
|
}, (tuple, sink) -> {
|
||||||
|
var rocksIterator = tuple.getT1();
|
||||||
|
ObjectArrayList<T> values = new ObjectArrayList<>();
|
||||||
|
byte[] firstGroupKey = null;
|
||||||
|
|
||||||
while (rocksIterator.isValid()) {
|
while (rocksIterator.isValid()) {
|
||||||
byte[] key = rocksIterator.key();
|
byte[] key = rocksIterator.key();
|
||||||
if (firstGroupKey == null) {
|
if (firstGroupKey == null) {
|
||||||
firstGroupKey = key;
|
firstGroupKey = key;
|
||||||
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
|
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
byte[] value = readValues ? rocksIterator.value() : EMPTY;
|
|
||||||
rocksIterator.next();
|
|
||||||
values.add(getEntry(key, value));
|
|
||||||
}
|
}
|
||||||
if (!values.isEmpty()) {
|
byte[] value = readValues ? rocksIterator.value() : EMPTY;
|
||||||
sink.next(values);
|
rocksIterator.next();
|
||||||
} else {
|
values.add(getEntry(key, value));
|
||||||
sink.complete();
|
|
||||||
}
|
|
||||||
return rocksIterator;
|
|
||||||
}
|
}
|
||||||
}, rocksIterator1 -> {
|
if (!values.isEmpty()) {
|
||||||
synchronized (this) {
|
sink.next(values);
|
||||||
rocksIterator1.close();
|
} else {
|
||||||
|
sink.complete();
|
||||||
}
|
}
|
||||||
|
return tuple;
|
||||||
|
}, tuple -> {
|
||||||
|
var rocksIterator = tuple.getT1();
|
||||||
|
rocksIterator.close();
|
||||||
|
tuple.getT2().ifPresent(RocksMutableObject::close);
|
||||||
|
tuple.getT3().ifPresent(RocksMutableObject::close);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,11 +2,14 @@ package it.cavallium.dbengine.database.disk;
|
|||||||
|
|
||||||
import it.cavallium.dbengine.database.LLRange;
|
import it.cavallium.dbengine.database.LLRange;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Optional;
|
||||||
import org.rocksdb.ColumnFamilyHandle;
|
import org.rocksdb.ColumnFamilyHandle;
|
||||||
import org.rocksdb.ReadOptions;
|
import org.rocksdb.ReadOptions;
|
||||||
import org.rocksdb.RocksDB;
|
import org.rocksdb.RocksDB;
|
||||||
|
import org.rocksdb.RocksMutableObject;
|
||||||
import org.rocksdb.Slice;
|
import org.rocksdb.Slice;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
public class LLLocalLuceneKeyPrefixesReactiveIterator {
|
public class LLLocalLuceneKeyPrefixesReactiveIterator {
|
||||||
|
|
||||||
@ -38,50 +41,54 @@ public class LLLocalLuceneKeyPrefixesReactiveIterator {
|
|||||||
public Flux<byte[]> flux() {
|
public Flux<byte[]> flux() {
|
||||||
return Flux
|
return Flux
|
||||||
.generate(() -> {
|
.generate(() -> {
|
||||||
synchronized (this) {
|
var readOptions = new ReadOptions(this.readOptions);
|
||||||
System.out.println(Thread.currentThread().getName());
|
readOptions.setFillCache(range.hasMin() && range.hasMax());
|
||||||
var readOptions = new ReadOptions(this.readOptions);
|
Slice sliceMin;
|
||||||
readOptions.setFillCache(range.hasMin() && range.hasMax());
|
Slice sliceMax;
|
||||||
if (range.hasMin()) {
|
if (range.hasMin()) {
|
||||||
readOptions.setIterateLowerBound(new Slice(range.getMin()));
|
sliceMin = new Slice(range.getMin());
|
||||||
}
|
readOptions.setIterateLowerBound(sliceMin);
|
||||||
if (range.hasMax()) {
|
} else {
|
||||||
readOptions.setIterateUpperBound(new Slice(range.getMax()));
|
sliceMin = null;
|
||||||
}
|
|
||||||
var rocksIterator = db.newIterator(cfh, readOptions);
|
|
||||||
if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) {
|
|
||||||
rocksIterator.seek(range.getMin());
|
|
||||||
} else {
|
|
||||||
rocksIterator.seekToFirst();
|
|
||||||
}
|
|
||||||
return rocksIterator;
|
|
||||||
}
|
}
|
||||||
}, (rocksIterator, sink) -> {
|
if (range.hasMax()) {
|
||||||
synchronized (this) {
|
sliceMax = new Slice(range.getMax());
|
||||||
System.out.println(Thread.currentThread().getName());
|
readOptions.setIterateUpperBound(sliceMax);
|
||||||
byte[] firstGroupKey = null;
|
} else {
|
||||||
|
sliceMax = null;
|
||||||
|
}
|
||||||
|
var rocksIterator = db.newIterator(cfh, readOptions);
|
||||||
|
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
|
||||||
|
rocksIterator.seek(range.getMin());
|
||||||
|
} else {
|
||||||
|
rocksIterator.seekToFirst();
|
||||||
|
}
|
||||||
|
return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax));
|
||||||
|
}, (tuple, sink) -> {
|
||||||
|
var rocksIterator = tuple.getT1();
|
||||||
|
byte[] firstGroupKey = null;
|
||||||
|
|
||||||
while (rocksIterator.isValid()) {
|
while (rocksIterator.isValid()) {
|
||||||
byte[] key = rocksIterator.key();
|
byte[] key = rocksIterator.key();
|
||||||
if (firstGroupKey == null) {
|
if (firstGroupKey == null) {
|
||||||
firstGroupKey = key;
|
firstGroupKey = key;
|
||||||
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
|
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
rocksIterator.next();
|
|
||||||
}
|
}
|
||||||
if (firstGroupKey != null) {
|
rocksIterator.next();
|
||||||
var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength);
|
|
||||||
sink.next(groupKeyPrefix);
|
|
||||||
} else {
|
|
||||||
sink.complete();
|
|
||||||
}
|
|
||||||
return rocksIterator;
|
|
||||||
}
|
}
|
||||||
}, rocksIterator1 -> {
|
if (firstGroupKey != null) {
|
||||||
synchronized (this) {
|
var groupKeyPrefix = Arrays.copyOf(firstGroupKey, prefixLength);
|
||||||
rocksIterator1.close();
|
sink.next(groupKeyPrefix);
|
||||||
|
} else {
|
||||||
|
sink.complete();
|
||||||
}
|
}
|
||||||
|
return tuple;
|
||||||
|
}, tuple -> {
|
||||||
|
var rocksIterator = tuple.getT1();
|
||||||
|
rocksIterator.close();
|
||||||
|
tuple.getT2().ifPresent(RocksMutableObject::close);
|
||||||
|
tuple.getT3().ifPresent(RocksMutableObject::close);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,14 @@
|
|||||||
package it.cavallium.dbengine.database.disk;
|
package it.cavallium.dbengine.database.disk;
|
||||||
|
|
||||||
import it.cavallium.dbengine.database.LLRange;
|
import it.cavallium.dbengine.database.LLRange;
|
||||||
|
import java.util.Optional;
|
||||||
import org.rocksdb.ColumnFamilyHandle;
|
import org.rocksdb.ColumnFamilyHandle;
|
||||||
import org.rocksdb.ReadOptions;
|
import org.rocksdb.ReadOptions;
|
||||||
import org.rocksdb.RocksDB;
|
import org.rocksdb.RocksDB;
|
||||||
|
import org.rocksdb.RocksMutableObject;
|
||||||
import org.rocksdb.Slice;
|
import org.rocksdb.Slice;
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
import reactor.util.function.Tuples;
|
||||||
|
|
||||||
public abstract class LLLocalLuceneReactiveIterator<T> {
|
public abstract class LLLocalLuceneReactiveIterator<T> {
|
||||||
|
|
||||||
@ -33,39 +36,45 @@ public abstract class LLLocalLuceneReactiveIterator<T> {
|
|||||||
public Flux<T> flux() {
|
public Flux<T> flux() {
|
||||||
return Flux
|
return Flux
|
||||||
.generate(() -> {
|
.generate(() -> {
|
||||||
synchronized (this) {
|
var readOptions = new ReadOptions(this.readOptions);
|
||||||
var readOptions = new ReadOptions(this.readOptions);
|
readOptions.setFillCache(range.hasMin() && range.hasMax());
|
||||||
readOptions.setFillCache(range.hasMin() && range.hasMax());
|
Slice sliceMin;
|
||||||
if (range.hasMin()) {
|
Slice sliceMax;
|
||||||
readOptions.setIterateLowerBound(new Slice(range.getMin()));
|
if (range.hasMin()) {
|
||||||
}
|
sliceMin = new Slice(range.getMin());
|
||||||
if (range.hasMax()) {
|
readOptions.setIterateLowerBound(sliceMin);
|
||||||
readOptions.setIterateUpperBound(new Slice(range.getMax()));
|
} else {
|
||||||
}
|
sliceMin = null;
|
||||||
var rocksIterator = db.newIterator(cfh, readOptions);
|
|
||||||
if (!LLLocalDictionary.PREFER_ALWAYS_SEEK_TO_FIRST && range.hasMin()) {
|
|
||||||
rocksIterator.seek(range.getMin());
|
|
||||||
} else {
|
|
||||||
rocksIterator.seekToFirst();
|
|
||||||
}
|
|
||||||
return rocksIterator;
|
|
||||||
}
|
}
|
||||||
}, (rocksIterator, sink) -> {
|
if (range.hasMax()) {
|
||||||
synchronized (this) {
|
sliceMax = new Slice(range.getMax());
|
||||||
if (rocksIterator.isValid()) {
|
readOptions.setIterateUpperBound(sliceMax);
|
||||||
byte[] key = rocksIterator.key();
|
} else {
|
||||||
byte[] value = readValues ? rocksIterator.value() : EMPTY;
|
sliceMax = null;
|
||||||
rocksIterator.next();
|
|
||||||
sink.next(getEntry(key, value));
|
|
||||||
} else {
|
|
||||||
sink.complete();
|
|
||||||
}
|
|
||||||
return rocksIterator;
|
|
||||||
}
|
}
|
||||||
}, rocksIterator1 -> {
|
var rocksIterator = db.newIterator(cfh, readOptions);
|
||||||
synchronized (this) {
|
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
|
||||||
rocksIterator1.close();
|
rocksIterator.seek(range.getMin());
|
||||||
|
} else {
|
||||||
|
rocksIterator.seekToFirst();
|
||||||
}
|
}
|
||||||
|
return Tuples.of(rocksIterator, Optional.ofNullable(sliceMin), Optional.ofNullable(sliceMax));
|
||||||
|
}, (tuple, sink) -> {
|
||||||
|
var rocksIterator = tuple.getT1();
|
||||||
|
if (rocksIterator.isValid()) {
|
||||||
|
byte[] key = rocksIterator.key();
|
||||||
|
byte[] value = readValues ? rocksIterator.value() : EMPTY;
|
||||||
|
rocksIterator.next();
|
||||||
|
sink.next(getEntry(key, value));
|
||||||
|
} else {
|
||||||
|
sink.complete();
|
||||||
|
}
|
||||||
|
return tuple;
|
||||||
|
}, tuple -> {
|
||||||
|
var rocksIterator = tuple.getT1();
|
||||||
|
rocksIterator.close();
|
||||||
|
tuple.getT2().ifPresent(RocksMutableObject::close);
|
||||||
|
tuple.getT3().ifPresent(RocksMutableObject::close);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user