Refactor iterations

This commit is contained in:
Andrea Cavalli 2021-03-14 03:13:19 +01:00
parent 32d1d76f69
commit 5f3bf768ad
16 changed files with 651 additions and 323 deletions

17
pom.xml
View File

@ -123,6 +123,18 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.12.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
@ -173,6 +185,11 @@
<artifactId>reactor-tools</artifactId>
<version>3.4.3</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.3</version>
</dependency>
<dependency>
<groupId>org.novasearch</groupId>
<artifactId>lucene-relevance</artifactId>

View File

@ -1,97 +0,0 @@
package it.cavallium.dbengine.database;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
public abstract class BoundedGroupedRocksFluxIterable<T> extends BlockingFluxIterable<List<T>> {
private final RocksDB db;
private final ColumnFamilyHandle cfh;
protected final LLRange range;
private final int prefixLength;
protected RocksIterator rocksIterator;
protected ReadOptions readOptions;
public BoundedGroupedRocksFluxIterable(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
int prefixLength) {
super("bounded-grouped-rocksdb");
this.db = db;
this.cfh = cfh;
this.range = range;
this.prefixLength = prefixLength;
}
@Override
public void onStartup() {
readOptions = this.getReadOptions();
rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
}
@Override
public void onTerminate() {
if (rocksIterator != null) {
rocksIterator.close();
}
}
@Nullable
@Override
public List<T> onNext() {
byte[] firstGroupKey = null;
List<T> currentGroupValues = new ArrayList<>();
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
if (firstGroupKey == null) { // Fix first value
firstGroupKey = key;
}
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
rocksIterator.next();
break;
} else {
List<T> result = null;
if (Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
currentGroupValues.add(transformEntry(key));
} else {
if (!currentGroupValues.isEmpty()) {
result = currentGroupValues;
}
firstGroupKey = key;
currentGroupValues = new ArrayList<>();
}
if (result != null) {
rocksIterator.next();
return result;
} else {
rocksIterator.next();
}
}
}
if (!currentGroupValues.isEmpty()) {
return currentGroupValues;
}
return null;
}
protected abstract ReadOptions getReadOptions();
protected abstract T transformEntry(byte[] key);
protected byte[] getValue() {
return rocksIterator.value();
}
}

View File

@ -1,68 +0,0 @@
package it.cavallium.dbengine.database;
import java.util.Arrays;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksIterator;
public abstract class BoundedRocksFluxIterable<T> extends BlockingFluxIterable<T> {
private final RocksDB db;
private final ColumnFamilyHandle cfh;
protected final LLRange range;
protected RocksIterator rocksIterator;
protected ReadOptions readOptions;
public BoundedRocksFluxIterable(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range) {
super("bounded-rocksdb");
this.db = db;
this.cfh = cfh;
this.range = range;
}
@Override
public void onStartup() {
readOptions = this.getReadOptions();
rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
}
@Override
public void onTerminate() {
if (rocksIterator != null) {
rocksIterator.close();
}
}
@Nullable
@Override
public T onNext() {
if (!rocksIterator.isValid()) {
return null;
}
byte[] key = rocksIterator.key();
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
return null;
}
var transformedEntry = this.transformEntry(key);
rocksIterator.next();
return transformedEntry;
}
protected abstract ReadOptions getReadOptions();
protected abstract T transformEntry(byte[] key);
protected byte[] getValue() {
return rocksIterator.value();
}
}

View File

@ -19,6 +19,8 @@ public interface LLDictionary extends LLKeyValueDatabaseStructure {
Mono<Boolean> update(byte[] key, Function<Optional<byte[]>, Optional<byte[]>> updater);
Mono<Void> clear();
Mono<byte[]> remove(byte[] key, LLDictionaryResultType resultType);
Flux<Entry<byte[], byte[]>> getMulti(@Nullable LLSnapshot snapshot, Flux<byte[]> keys);

View File

@ -91,6 +91,11 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
return dictionary.sizeRange(resolveSnapshot(snapshot), range, fast);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), range);
}
@Override
public Mono<DatabaseStageEntry<U>> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
return Mono
@ -187,6 +192,22 @@ public class DatabaseMapDictionary<T, U> extends DatabaseMapDictionaryDeep<T, U,
.map(entry -> Map.entry(deserializeSuffix(stripPrefix(entry.getKey())), deserialize(entry.getValue())));
}
@Override
public Mono<Void> clear() {
if (range.isAll()) {
return dictionary
.clear();
} else if (range.isSingle()) {
return dictionary
.remove(range.getSingle(), LLDictionaryResultType.VOID)
.then();
} else {
return dictionary
.setRange(range, Flux.empty(), false)
.then();
}
}
//todo: temporary wrapper. convert the whole class to buffers
private U deserialize(byte[] bytes) {
return valueSerializer.deserialize(bytes);

View File

@ -2,6 +2,7 @@ package it.cavallium.dbengine.database.collections;
import it.cavallium.dbengine.client.CompositeSnapshot;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@ -203,6 +204,11 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
return dictionary.sizeRange(resolveSnapshot(snapshot), range, fast);
}
@Override
public Mono<Boolean> isEmpty(@Nullable CompositeSnapshot snapshot) {
return dictionary.isRangeEmpty(resolveSnapshot(snapshot), range);
}
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
@Override
public Mono<US> at(@Nullable CompositeSnapshot snapshot, T keySuffix) {
@ -219,37 +225,35 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Flux<Entry<T, US>> getAllStages(@Nullable CompositeSnapshot snapshot) {
return Flux.defer(() -> {
if (this.subStageGetter.needsKeyFlux()) {
return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
.flatMap(rangeKeys -> {
byte[] groupKeyWithExt = rangeKeys.get(0);
byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt);
byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt);
assert subStageKeysConsistency(groupKeyWithExt.length);
return this.subStageGetter
.subStage(dictionary,
snapshot,
groupKeyWithoutExt,
this.subStageGetter.needsKeyFlux() ? Flux.defer(() -> Flux.fromIterable(rangeKeys)) : Flux.empty()
)
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us));
});
} else {
return dictionary
.getOneKey(resolveSnapshot(snapshot), range)
.flatMap(randomKeyWithExt -> {
byte[] keyWithoutExt = removeExtFromFullKey(randomKeyWithExt);
byte[] keySuffix = this.stripPrefix(keyWithoutExt);
assert subStageKeysConsistency(keyWithoutExt.length);
return this.subStageGetter
.subStage(dictionary, snapshot, keyWithoutExt, Mono.just(randomKeyWithExt).flux())
.map(us -> Map.entry(this.deserializeSuffix(keySuffix), us));
});
}
});
if (this.subStageGetter.needsKeyFlux()) {
return dictionary
.getRangeKeysGrouped(resolveSnapshot(snapshot), range, keyPrefix.length + keySuffixLength)
.flatMapSequential(rangeKeys -> {
byte[] groupKeyWithExt = rangeKeys.get(0);
byte[] groupKeyWithoutExt = removeExtFromFullKey(groupKeyWithExt);
byte[] groupSuffix = this.stripPrefix(groupKeyWithoutExt);
assert subStageKeysConsistency(groupKeyWithExt.length);
return this.subStageGetter
.subStage(dictionary,
snapshot,
groupKeyWithoutExt,
this.subStageGetter.needsKeyFlux() ? Flux.defer(() -> Flux.fromIterable(rangeKeys)) : Flux.empty()
)
.map(us -> Map.entry(this.deserializeSuffix(groupSuffix), us));
});
} else {
return dictionary
.getOneKey(resolveSnapshot(snapshot), range)
.flatMap(randomKeyWithExt -> {
byte[] keyWithoutExt = removeExtFromFullKey(randomKeyWithExt);
byte[] keySuffix = this.stripPrefix(keyWithoutExt);
assert subStageKeysConsistency(keyWithoutExt.length);
return this.subStageGetter
.subStage(dictionary, snapshot, keyWithoutExt, Mono.just(randomKeyWithExt).flux())
.map(us -> Map.entry(this.deserializeSuffix(keySuffix), us));
})
.flux();
}
}
private boolean subStageKeysConsistency(int totalKeyLength) {
@ -267,7 +271,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Flux<Entry<T, U>> setAllValuesAndGetPrevious(Flux<Entry<T, U>> entries) {
return getAllStages(null)
.flatMap(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val)))
.flatMapSequential(stage -> stage.getValue().get(null).map(val -> Map.entry(stage.getKey(), val)))
.concatWith(clear().then(entries
.flatMap(entry -> at(null, entry.getKey()).map(us -> Tuples.of(us, entry.getValue())))
.flatMap(tuple -> tuple.getT1().set(tuple.getT2()))
@ -276,9 +280,18 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
@Override
public Mono<Void> clear() {
return dictionary
.setRange(range, Flux.empty(), false)
.then();
if (range.isAll()) {
return dictionary
.clear();
} else if (range.isSingle()) {
return dictionary
.remove(range.getSingle(), LLDictionaryResultType.VOID)
.then();
} else {
return dictionary
.setRange(range, Flux.empty(), false)
.then();
}
}
//todo: temporary wrapper. convert the whole class to buffers

View File

@ -54,7 +54,7 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
}
default Flux<Entry<T, U>> getMulti(@Nullable CompositeSnapshot snapshot, Flux<T> keys) {
return keys.flatMap(key -> this.getValue(snapshot, key).map(value -> Map.entry(key, value)));
return keys.flatMapSequential(key -> this.getValue(snapshot, key).map(value -> Map.entry(key, value)));
}
default Mono<Void> putMulti(Flux<Entry<T, U>> entries) {
@ -66,7 +66,11 @@ public interface DatabaseStageMap<T, U, US extends DatabaseStage<U>> extends Dat
default Flux<Entry<T, U>> getAllValues(@Nullable CompositeSnapshot snapshot) {
return this
.getAllStages(snapshot)
.flatMap(entry -> entry.getValue().get(snapshot).map(value -> Map.entry(entry.getKey(), value)));
.flatMapSequential(entry -> entry
.getValue()
.get(snapshot)
.map(value -> Map.entry(entry.getKey(), value))
);
}
default Mono<Void> setAllValues(Flux<Entry<T, U>> entries) {

View File

@ -1,7 +1,5 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.BoundedGroupedRocksFluxIterable;
import it.cavallium.dbengine.database.BoundedRocksFluxIterable;
import it.cavallium.dbengine.database.LLDictionary;
import it.cavallium.dbengine.database.LLDictionaryResultType;
import it.cavallium.dbengine.database.LLRange;
@ -554,33 +552,11 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<Entry<byte[],byte[]>> getRangeMulti(LLSnapshot snapshot, LLRange range) {
return new BoundedRocksFluxIterable<Entry<byte[], byte[]>>(db, cfh, range) {
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected Entry<byte[], byte[]> transformEntry(byte[] key) {
return Map.entry(key, this.getValue());
}
}.generateNonblocking(dbScheduler, 128);
return new LLLocalLuceneEntryReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler);
}
private Flux<List<Entry<byte[],byte[]>>> getRangeMultiGrouped(LLSnapshot snapshot, LLRange range, int prefixLength) {
return new BoundedGroupedRocksFluxIterable<Entry<byte[], byte[]>>(db, cfh, range, prefixLength) {
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected Entry<byte[], byte[]> transformEntry(byte[] key) {
return Map.entry(key, this.getValue());
}
}.generateNonblocking(dbScheduler, 128);
return new LLLocalLuceneGroupedEntryReactiveIterator(db, cfh, prefixLength, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler);
}
@Override
@ -596,18 +572,12 @@ public class LLLocalDictionary implements LLDictionary {
@Override
public Flux<List<byte[]>> getRangeKeysGrouped(@Nullable LLSnapshot snapshot, LLRange range, int prefixLength) {
return new BoundedGroupedRocksFluxIterable<byte[]>(db, cfh, range, prefixLength) {
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected byte[] transformEntry(byte[] key) {
return key;
}
}.generateNonblocking(dbScheduler, 128);
return new LLLocalLuceneGroupedKeysReactiveIterator(db,
cfh,
prefixLength,
range,
resolveSnapshot(snapshot)
).subscribeOn(dbScheduler);
}
private Flux<byte[]> getRangeKeysSingle(LLSnapshot snapshot, byte[] key) {
@ -619,98 +589,132 @@ public class LLLocalDictionary implements LLDictionary {
}
private Flux<byte[]> getRangeKeysMulti(LLSnapshot snapshot, LLRange range) {
return new BoundedRocksFluxIterable<byte[]>(db, cfh, range) {
@Override
protected ReadOptions getReadOptions() {
return resolveSnapshot(snapshot);
}
@Override
protected byte[] transformEntry(byte[] key) {
return key;
}
}.generateNonblocking(dbScheduler, 128);
return new LLLocalLuceneKeysReactiveIterator(db, cfh, range, resolveSnapshot(snapshot)).subscribeOn(dbScheduler);
}
@Override
public Flux<Entry<byte[], byte[]>> setRange(LLRange range,
Flux<Entry<byte[], byte[]>> entries,
boolean getOldValues) {
return Flux.defer(() -> {
if (range.isAll()) {
return clear().thenMany(Flux.empty());
return Flux.defer(() -> Flux
.usingWhen(
Mono
.fromCallable(() -> new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS)
)
.subscribeOn(dbScheduler),
writeBatch -> Mono
.fromCallable(() -> {
if (range.isSingle()) {
writeBatch.delete(cfh, range.getSingle());
} else if (range.hasMin() && range.hasMax()) {
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
} else if (range.hasMax()) {
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
} else if (range.hasMin()) {
// Delete from x to end of column
var readOpts = getReadOptions(null);
readOpts.setIterateLowerBound(new Slice(range.getMin()));
try (var it = db.newIterator(cfh, readOpts)) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, range.getMin(), it.key());
// Delete the last key because we are deleting everything from "min" onward, without a max bound
writeBatch.delete(it.key());
}
}
} else {
// Delete all
var readOpts = getReadOptions(null);
try (var it = db.newIterator(cfh, readOpts)) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, FIRST_KEY, it.key());
// Delete the last key because we are deleting everything without a max bound
writeBatch.delete(it.key());
}
}
}
return null;
})
.subscribeOn(dbScheduler)
.thenMany(entries)
.flatMapSequential(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)),
writeBatch -> Mono
.fromCallable(() -> {
try (writeBatch) {
writeBatch.writeToDbAndClose();
}
return null;
})
.subscribeOn(dbScheduler)
)
.subscribeOn(dbScheduler)
.onErrorMap(cause -> new IOException("Failed to write range", cause))
);
}
private static byte[] incrementLexicographically(byte[] key) {
boolean remainder = true;
int prefixLength = key.length;
final byte ff = (byte) 0xFF;
for (int i = prefixLength - 1; i >= 0; i--) {
if (key[i] != ff) {
key[i]++;
remainder = false;
break;
} else {
return Flux
.usingWhen(
Mono
.fromCallable(() -> new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS)
)
.subscribeOn(dbScheduler),
writeBatch -> Mono
.fromCallable(() -> {
if (range.hasMin() && range.hasMax()) {
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
} else if (range.hasMax()) {
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
} else {
// Delete from x to end of column
var readOpts = getReadOptions(null);
try (var it = db.newIterator(cfh, readOpts)) {
it.seekToLast();
if (it.isValid()) {
writeBatch.deleteRange(cfh, range.getMin(), it.key());
// Delete the last key because we are deleting everything from "min" onward
writeBatch.delete(it.key());
}
}
}
return null;
})
.subscribeOn(dbScheduler)
.thenMany(entries)
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)),
writeBatch -> Mono
.fromCallable(() -> {
try (writeBatch) {
writeBatch.writeToDbAndClose();
}
return null;
})
.subscribeOn(dbScheduler)
)
.subscribeOn(dbScheduler)
.onErrorMap(cause -> new IOException("Failed to write range", cause));
key[i] = 0x00;
remainder = true;
}
});
}
if (remainder) {
Arrays.fill(key, 0, prefixLength, (byte) 0xFF);
return Arrays.copyOf(key, key.length + 1);
} else {
return key;
}
}
public Mono<Void> clear() {
return Mono
.<Void>fromCallable(() -> {
try (RocksIterator iter = db.newIterator(cfh); CappedWriteBatch writeBatch = new CappedWriteBatch(db,
var readOpts = getReadOptions(null);
readOpts.setVerifyChecksums(false);
// readOpts.setIgnoreRangeDeletions(true);
readOpts.setFillCache(false);
try (CappedWriteBatch writeBatch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
RESERVED_WRITE_BATCH_SIZE,
MAX_WRITE_BATCH_SIZE,
BATCH_WRITE_OPTIONS
)) {
iter.seekToFirst();
//byte[] firstDeletedKey = null;
//byte[] lastDeletedKey = null;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToLast();
while (iter.isValid()) {
writeBatch.delete(cfh, iter.key());
iter.next();
if (iter.isValid()) {
writeBatch.deleteRange(cfh, FIRST_KEY, iter.key());
writeBatch.delete(cfh, iter.key());
//firstDeletedKey = FIRST_KEY;
//lastDeletedKey = incrementLexicographically(iter.key());
}
}
writeBatch.writeToDbAndClose();
// Compact range
db.compactRange(cfh);
db.suggestCompactRange(cfh);
//if (firstDeletedKey != null && lastDeletedKey != null) {
//db.compactRange(cfh, firstDeletedKey, lastDeletedKey, new CompactRangeOptions().setChangeLevel(false));
//}
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), cfh);
db.flushWal(true);
@ -745,7 +749,7 @@ public class LLLocalDictionary implements LLDictionary {
}
if (fast) {
readOpts.setIgnoreRangeDeletions(true);
readOpts.setPinData(false);
}
try (var iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
@ -815,7 +819,7 @@ public class LLLocalDictionary implements LLDictionary {
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) {
var rocksdbSnapshot = resolveSnapshot(snapshot).setFillCache(false).setVerifyChecksums(false);
var rocksdbSnapshot = resolveSnapshot(snapshot);
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
try {
return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
@ -826,8 +830,11 @@ public class LLLocalDictionary implements LLDictionary {
} else {
rocksdbSnapshot.setFillCache(false);
rocksdbSnapshot.setVerifyChecksums(false);
rocksdbSnapshot.setIgnoreRangeDeletions(true);
rocksdbSnapshot.setPinData(false);
rocksdbSnapshot.setIgnoreRangeDeletions(false);
if (snapshot == null) {
rocksdbSnapshot.setTailing(true);
}
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
iter.seekToFirst();
@ -845,7 +852,7 @@ public class LLLocalDictionary implements LLDictionary {
var readOpts = resolveSnapshot(snapshot);
readOpts.setFillCache(false);
readOpts.setVerifyChecksums(false);
readOpts.setPinData(false);
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
@ -869,7 +876,11 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setIterateUpperBound(new Slice(range.getMax()));
}
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
if (range.hasMin()) {
iter.seek(range.getMin());
} else {
iter.seekToFirst();
}
if (!iter.isValid()) {
return null;
}

View File

@ -0,0 +1,23 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import java.util.Map;
import java.util.Map.Entry;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneEntryReactiveIterator extends LLLocalLuceneReactiveIterator<Entry<byte[], byte[]>> {
public LLLocalLuceneEntryReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
ReadOptions readOptions) {
super(db, cfh, range, readOptions, true);
}
@Override
public Entry<byte[], byte[]> getEntry(byte[] key, byte[] value) {
return Map.entry(key, value);
}
}

View File

@ -0,0 +1,24 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import java.util.Map;
import java.util.Map.Entry;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneGroupedEntryReactiveIterator extends LLLocalLuceneGroupedReactiveIterator<Entry<byte[], byte[]>> {
public LLLocalLuceneGroupedEntryReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
ReadOptions readOptions) {
super(db, cfh, prefixLength, range, readOptions, true);
}
@Override
public Entry<byte[], byte[]> getEntry(byte[] key, byte[] value) {
return Map.entry(key, value);
}
}

View File

@ -0,0 +1,22 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneGroupedKeysReactiveIterator extends LLLocalLuceneGroupedReactiveIterator<byte[]> {
public LLLocalLuceneGroupedKeysReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
ReadOptions readOptions) {
super(db, cfh, prefixLength, range, readOptions, false);
}
@Override
public byte[] getEntry(byte[] key, byte[] value) {
return key;
}
}

View File

@ -0,0 +1,86 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.util.Arrays;
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.Slice;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
public abstract class LLLocalLuceneGroupedReactiveIterator<T> extends Flux<List<T>> {
private static final byte[] EMPTY = new byte[0];
private final RocksDB db;
private final ColumnFamilyHandle cfh;
private final int prefixLength;
private final LLRange range;
private final ReadOptions readOptions;
private final boolean readValues;
public LLLocalLuceneGroupedReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh,
int prefixLength,
LLRange range,
ReadOptions readOptions,
boolean readValues) {
this.db = db;
this.cfh = cfh;
this.prefixLength = prefixLength;
this.range = range;
this.readOptions = readOptions;
this.readValues = readValues;
}
@Override
public void subscribe(@NotNull CoreSubscriber<? super List<T>> actual) {
Flux<List<T>> flux = Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
readOptions.setPrefixSameAsStart(true);
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}, (rocksIterator, sink) -> {
ObjectArrayList<T> values = new ObjectArrayList<>();
byte[] firstGroupKey = null;
while (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
if (firstGroupKey == null) {
firstGroupKey = key;
} else if (!Arrays.equals(firstGroupKey, 0, prefixLength, key, 0, prefixLength)) {
break;
}
byte[] value = readValues ? rocksIterator.value() : EMPTY;
rocksIterator.next();
values.add(getEntry(key, value));
}
if (!values.isEmpty()) {
sink.next(values);
} else {
sink.complete();
}
return rocksIterator;
}, tuple -> {});
flux.subscribe(actual);
}
public abstract T getEntry(byte[] key, byte[] value);
}

View File

@ -0,0 +1,21 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
public class LLLocalLuceneKeysReactiveIterator extends LLLocalLuceneReactiveIterator<byte[]> {
public LLLocalLuceneKeysReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
ReadOptions readOptions) {
super(db, cfh, range, readOptions, false);
}
@Override
public byte[] getEntry(byte[] key, byte[] value) {
return key;
}
}

View File

@ -0,0 +1,69 @@
package it.cavallium.dbengine.database.disk;
import it.cavallium.dbengine.database.LLRange;
import org.jetbrains.annotations.NotNull;
import org.rocksdb.AbstractImmutableNativeReference;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.Slice;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
public abstract class LLLocalLuceneReactiveIterator<T> extends Flux<T> {
private static final byte[] EMPTY = new byte[0];
private final RocksDB db;
private final ColumnFamilyHandle cfh;
private final LLRange range;
private final ReadOptions readOptions;
private final boolean readValues;
public LLLocalLuceneReactiveIterator(RocksDB db,
ColumnFamilyHandle cfh,
LLRange range,
ReadOptions readOptions,
boolean readValues) {
this.db = db;
this.cfh = cfh;
this.range = range;
this.readOptions = readOptions;
this.readValues = readValues;
}
@Override
public void subscribe(@NotNull CoreSubscriber<? super T> actual) {
Flux<T> flux = Flux
.generate(() -> {
var readOptions = new ReadOptions(this.readOptions);
readOptions.setFillCache(range.hasMin() && range.hasMax());
if (range.hasMin()) {
readOptions.setIterateLowerBound(new Slice(range.getMin()));
}
if (range.hasMax()) {
readOptions.setIterateUpperBound(new Slice(range.getMax()));
}
var rocksIterator = db.newIterator(cfh, readOptions);
if (range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator;
}, (rocksIterator, sink) -> {
if (rocksIterator.isValid()) {
byte[] key = rocksIterator.key();
byte[] value = readValues ? rocksIterator.value() : EMPTY;
rocksIterator.next();
sink.next(getEntry(key, value));
} else {
sink.complete();
}
return rocksIterator;
}, AbstractImmutableNativeReference::close);
flux.subscribe(actual);
}
public abstract T getEntry(byte[] key, byte[] value);
}

View File

@ -8,17 +8,19 @@ public interface Serializer<A, B> {
@NotNull B serialize(@NotNull A deserialized);
static Serializer<byte[], byte[]> noop() {
return new Serializer<>() {
@Override
public byte @NotNull [] deserialize(byte @NotNull [] serialized) {
return serialized;
}
Serializer<byte[], byte[]> NOOP_SERIALIZER = new Serializer<>() {
@Override
public byte @NotNull [] deserialize(byte @NotNull [] serialized) {
return serialized;
}
@Override
public byte @NotNull [] serialize(byte @NotNull [] deserialized) {
return deserialized;
}
};
@Override
public byte @NotNull [] serialize(byte @NotNull [] deserialized) {
return deserialized;
}
};
static Serializer<byte[], byte[]> noop() {
return NOOP_SERIALIZER;
}
}

View File

@ -1,5 +1,183 @@
package it.cavallium.dbengine.client;
import static it.cavallium.dbengine.client.CompositeDatabasePartLocation.CompositeDatabasePartType.KV_DATABASE;
import it.cavallium.dbengine.database.Column;
import it.cavallium.dbengine.database.LLKeyValueDatabase;
import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.util.function.Tuples;
public class Database {
@Test
public void testDeepDatabaseAddKeysAndConvertToLongerOnes() {
LinkedHashSet<String> originalSuperKeys = new LinkedHashSet<>(List.of("K1a", "K1b", "K1c"));
LinkedHashSet<String> originalSubKeys = new LinkedHashSet<>(List.of("K2aa", "K2bb", "K2cc"));
String newPrefix = "xxx";
StepVerifier
.create(
tempDb()
.flatMapMany(db -> addKeysAndConvertToLongerOnes(db, originalSuperKeys, originalSubKeys, newPrefix))
)
.expectNextSequence(originalSuperKeys
.stream()
.flatMap(superKey -> originalSubKeys
.stream()
.map(subKey -> Map.entry(newPrefix + superKey, newPrefix + subKey)))
.collect(Collectors.toList())
)
.verifyComplete();
}
public static <U> Mono<? extends LLKeyValueDatabase> tempDb() {
var wrkspcPath = Path.of("/tmp/.cache/tempdb/");
return Mono
.fromCallable(() -> {
if (Files.exists(wrkspcPath)) {
Files.walk(wrkspcPath)
.sorted(Comparator.reverseOrder())
.forEach(file -> {
try {
Files.delete(file);
} catch (IOException ex) {
throw new CompletionException(ex);
}
});
}
Files.createDirectories(wrkspcPath);
return null;
})
.subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(wrkspcPath, true).connect())
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), false));
}
private static final byte[] DUMMY_VALUE = new byte[] {0x01, 0x03};
private Flux<Entry<String, String>> addKeysAndConvertToLongerOnes(LLKeyValueDatabase db,
LinkedHashSet<String> originalSuperKeys,
LinkedHashSet<String> originalSubKeys,
String newPrefix) {
return Flux
.defer(() -> Mono
.zip(
db
.getDictionary("testmap", UpdateMode.DISALLOW)
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(3),
4,
new SubStageGetterMap<>(new FixedStringSerializer(4), Serializer.noop())
)),
db
.getDictionary("testmap", UpdateMode.DISALLOW)
.map(dictionary -> DatabaseMapDictionaryDeep.deepTail(dictionary,
new FixedStringSerializer(6),
7,
new SubStageGetterMap<>(new FixedStringSerializer(7), Serializer.noop())
))
)
.single()
.flatMap(tuple -> {
var db1 = tuple.getT1();
return Flux
.fromIterable(originalSuperKeys)
.flatMapSequential(superKey -> db1.at(null, superKey))
.flatMapSequential(at -> Flux
.fromIterable(originalSubKeys)
.flatMapSequential(subKey -> at.at(null, subKey))
.flatMapSequential(at2 -> at2.set(DUMMY_VALUE))
)
.then(db
.takeSnapshot()
.map(snapshot -> new CompositeSnapshot(Map.of(CompositeDatabasePartLocation.of(KV_DATABASE,
db.getDatabaseName()), snapshot)))
)
.map(snapshot -> Tuples.of(tuple.getT1(), tuple.getT2(), snapshot))
.single();
})
.single()
.flatMap(tuple -> tuple.getT1().clear().thenReturn(tuple))
.flatMap(tuple -> tuple
.getT1()
.leavesCount(null, false)
.flatMap(count -> count == 0 ? Mono.just(tuple) : Mono.error(new IllegalStateException(
"Failed to clear map. Remaining elements after clear: " + count)))
)
.flatMapMany(tuple -> {
var oldDb = tuple.getT1();
var newDb = tuple.getT2();
var snapshot = tuple.getT3();
return oldDb
.getAllStages(snapshot)
.flatMapSequential(parentEntry -> Mono
.fromCallable(() -> newPrefix + parentEntry.getKey())
.flatMapMany(newId1 -> parentEntry.getValue()
.getAllValues(snapshot)
.flatMapSequential(entry -> Mono
.fromCallable(() -> newPrefix + entry.getKey())
.flatMap(newId2 -> newDb
.at(null, newId1)
.flatMap(newStage -> newStage.putValue(newId2, entry.getValue()))
.thenReturn(Map.entry(newId1, newId2))
)
)
)
)
.concatWith(db
.releaseSnapshot(snapshot.getSnapshot(db))
.then(oldDb.close())
.then(newDb.close())
.then(Mono.empty())
);
})
);
}
private static class FixedStringSerializer implements SerializerFixedBinaryLength<String, byte[]> {
private final int size;
public FixedStringSerializer(int i) {
this.size = i;
}
@Override
public int getSerializedBinaryLength() {
return size;
}
@Override
public @NotNull String deserialize(byte @NotNull [] serialized) {
return new String(serialized, StandardCharsets.US_ASCII);
}
@Override
public byte @NotNull [] serialize(@NotNull String deserialized) {
return deserialized.getBytes(StandardCharsets.US_ASCII);
}
}
}