Exclusive range
This commit is contained in:
parent
79ba4d8dd2
commit
32d1d76f69
@ -3,6 +3,9 @@ package it.cavallium.dbengine.database;
|
||||
import java.util.Arrays;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
/**
|
||||
* Range of data, from min (inclusive),to max (exclusive)
|
||||
*/
|
||||
public class LLRange {
|
||||
|
||||
private static final LLRange RANGE_ALL = new LLRange(null, null);
|
||||
|
@ -25,12 +25,35 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
||||
protected final int keyExtLength;
|
||||
protected final LLRange range;
|
||||
|
||||
protected static byte[] firstKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
private static byte[] incrementPrefix(byte[] key, int prefixLength) {
|
||||
boolean remainder = true;
|
||||
final byte ff = (byte) 0xFF;
|
||||
for (int i = prefixLength - 1; i >= 0; i--) {
|
||||
if (key[i] != ff) {
|
||||
key[i]++;
|
||||
remainder = false;
|
||||
break;
|
||||
} else {
|
||||
key[i] = 0x00;
|
||||
remainder = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (remainder) {
|
||||
Arrays.fill(key, 0, prefixLength, (byte) 0xFF);
|
||||
return Arrays.copyOf(key, key.length + 1);
|
||||
} else {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
||||
static byte[] firstRangeKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
}
|
||||
|
||||
protected static byte[] lastKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
return fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
||||
static byte[] nextRangeKey(byte[] prefixKey, int prefixLength, int suffixLength, int extLength) {
|
||||
byte[] nonIncremented = fillKeySuffixAndExt(prefixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
return incrementPrefix(nonIncremented, prefixLength);
|
||||
}
|
||||
|
||||
protected static byte[] fillKeySuffixAndExt(byte[] prefixKey,
|
||||
@ -46,7 +69,7 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
||||
return result;
|
||||
}
|
||||
|
||||
protected static byte[] firstKey(byte[] prefixKey,
|
||||
static byte[] firstRangeKey(byte[] prefixKey,
|
||||
byte[] suffixKey,
|
||||
int prefixLength,
|
||||
int suffixLength,
|
||||
@ -54,12 +77,13 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
||||
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
}
|
||||
|
||||
protected static byte[] lastKey(byte[] prefixKey,
|
||||
static byte[] nextRangeKey(byte[] prefixKey,
|
||||
byte[] suffixKey,
|
||||
int prefixLength,
|
||||
int suffixLength,
|
||||
int extLength) {
|
||||
return fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0xFF);
|
||||
byte[] nonIncremented = fillKeyExt(prefixKey, suffixKey, prefixLength, suffixLength, extLength, (byte) 0x00);
|
||||
return incrementPrefix(nonIncremented, prefixLength + suffixLength);
|
||||
}
|
||||
|
||||
protected static byte[] fillKeyExt(byte[] prefixKey,
|
||||
@ -114,9 +138,9 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
||||
this.keyPrefix = prefixKey;
|
||||
this.keySuffixLength = keySuffixSerializer.getSerializedBinaryLength();
|
||||
this.keyExtLength = keyExtLength;
|
||||
byte[] firstKey = firstKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] lastKey = lastKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, lastKey);
|
||||
byte[] firstKey = firstRangeKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] nextRangeKey = nextRangeKey(keyPrefix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
this.range = keyPrefix.length == 0 ? LLRange.all() : LLRange.of(firstKey, nextRangeKey);
|
||||
assert subStageKeysConsistency(keyPrefix.length + keySuffixLength + keyExtLength);
|
||||
}
|
||||
|
||||
@ -169,8 +193,8 @@ public class DatabaseMapDictionaryDeep<T, U, US extends DatabaseStage<U>> implem
|
||||
}
|
||||
|
||||
protected LLRange toExtRange(byte[] keySuffix) {
|
||||
byte[] first = firstKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] end = lastKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] first = firstRangeKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
byte[] end = nextRangeKey(keyPrefix, keySuffix, keyPrefix.length, keySuffixLength, keyExtLength);
|
||||
return LLRange.of(first, end);
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,7 @@ public class CappedWriteBatch implements WriteBatchInterface, AutoCloseable {
|
||||
this.writeBatch.setMaxBytes(maxWriteBatchSize);
|
||||
}
|
||||
|
||||
private void flushIfNeeded(boolean force) throws RocksDBException {
|
||||
private synchronized void flushIfNeeded(boolean force) throws RocksDBException {
|
||||
if (this.writeBatch.count() >= (force ? 1 : cap)) {
|
||||
db.write(writeOptions, this.writeBatch);
|
||||
this.writeBatch.clear();
|
||||
@ -41,151 +41,151 @@ public class CappedWriteBatch implements WriteBatchInterface, AutoCloseable {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int count() {
|
||||
public synchronized int count() {
|
||||
return writeBatch.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(byte[] key, byte[] value) throws RocksDBException {
|
||||
public synchronized void put(byte[] key, byte[] value) throws RocksDBException {
|
||||
writeBatch.put(key, value);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
|
||||
public synchronized void put(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
|
||||
writeBatch.put(columnFamilyHandle, key, value);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
|
||||
public synchronized void put(ByteBuffer key, ByteBuffer value) throws RocksDBException {
|
||||
writeBatch.put(key, value);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException {
|
||||
public synchronized void put(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key, ByteBuffer value) throws RocksDBException {
|
||||
writeBatch.put(columnFamilyHandle, key, value);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(byte[] key, byte[] value) throws RocksDBException {
|
||||
public synchronized void merge(byte[] key, byte[] value) throws RocksDBException {
|
||||
writeBatch.merge(key, value);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
|
||||
public synchronized void merge(ColumnFamilyHandle columnFamilyHandle, byte[] key, byte[] value) throws RocksDBException {
|
||||
writeBatch.merge(columnFamilyHandle, key, value);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public void remove(byte[] key) throws RocksDBException {
|
||||
public synchronized void remove(byte[] key) throws RocksDBException {
|
||||
writeBatch.remove(key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
@Override
|
||||
public void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
||||
public synchronized void remove(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
||||
writeBatch.remove(columnFamilyHandle, key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(byte[] key) throws RocksDBException {
|
||||
public synchronized void delete(byte[] key) throws RocksDBException {
|
||||
writeBatch.delete(key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
||||
public synchronized void delete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
||||
writeBatch.delete(columnFamilyHandle, key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void singleDelete(byte[] key) throws RocksDBException {
|
||||
public synchronized void singleDelete(byte[] key) throws RocksDBException {
|
||||
writeBatch.singleDelete(key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void singleDelete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
||||
public synchronized void singleDelete(ColumnFamilyHandle columnFamilyHandle, byte[] key) throws RocksDBException {
|
||||
writeBatch.singleDelete(columnFamilyHandle, key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(ByteBuffer key) throws RocksDBException {
|
||||
public synchronized void remove(ByteBuffer key) throws RocksDBException {
|
||||
writeBatch.remove(key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException {
|
||||
public synchronized void remove(ColumnFamilyHandle columnFamilyHandle, ByteBuffer key) throws RocksDBException {
|
||||
writeBatch.remove(columnFamilyHandle, key);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDBException {
|
||||
public synchronized void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDBException {
|
||||
writeBatch.deleteRange(beginKey, endKey);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey)
|
||||
public synchronized void deleteRange(ColumnFamilyHandle columnFamilyHandle, byte[] beginKey, byte[] endKey)
|
||||
throws RocksDBException {
|
||||
writeBatch.deleteRange(columnFamilyHandle, beginKey, endKey);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putLogData(byte[] blob) throws RocksDBException {
|
||||
public synchronized void putLogData(byte[] blob) throws RocksDBException {
|
||||
writeBatch.putLogData(blob);
|
||||
flushIfNeeded(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
public synchronized void clear() {
|
||||
writeBatch.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setSavePoint() {
|
||||
public synchronized void setSavePoint() {
|
||||
writeBatch.setSavePoint();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rollbackToSavePoint() throws RocksDBException {
|
||||
public synchronized void rollbackToSavePoint() throws RocksDBException {
|
||||
writeBatch.rollbackToSavePoint();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void popSavePoint() throws RocksDBException {
|
||||
public synchronized void popSavePoint() throws RocksDBException {
|
||||
writeBatch.popSavePoint();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxBytes(long maxBytes) {
|
||||
public synchronized void setMaxBytes(long maxBytes) {
|
||||
writeBatch.setMaxBytes(maxBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteBatch getWriteBatch() {
|
||||
public synchronized WriteBatch getWriteBatch() {
|
||||
return writeBatch;
|
||||
}
|
||||
|
||||
public void writeToDbAndClose() throws RocksDBException {
|
||||
public synchronized void writeToDbAndClose() throws RocksDBException {
|
||||
flushIfNeeded(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
public synchronized void close() {
|
||||
writeBatch.close();
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ import org.rocksdb.ReadOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
import org.rocksdb.RocksDBException;
|
||||
import org.rocksdb.RocksIterator;
|
||||
import org.rocksdb.Slice;
|
||||
import org.rocksdb.Snapshot;
|
||||
import org.rocksdb.WriteOptions;
|
||||
import org.warp.commonutils.concurrency.atomicity.NotAtomic;
|
||||
@ -167,22 +168,17 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
public Mono<Boolean> containsRange(@Nullable LLSnapshot snapshot, LLRange range) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
|
||||
if (range.hasMin()) {
|
||||
iter.seek(range.getMin());
|
||||
} else {
|
||||
iter.seekToFirst();
|
||||
}
|
||||
if (!iter.isValid()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (range.hasMax()) {
|
||||
byte[] key1 = iter.key();
|
||||
return Arrays.compareUnsigned(key1, range.getMax()) <= 0;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
var readOpts = resolveSnapshot(snapshot);
|
||||
readOpts.setVerifyChecksums(false);
|
||||
if (range.hasMin()) {
|
||||
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
readOpts.setIterateUpperBound(new Slice(range.getMax()));
|
||||
}
|
||||
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
|
||||
iter.seekToFirst();
|
||||
return iter.isValid();
|
||||
}
|
||||
})
|
||||
.onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
|
||||
@ -508,22 +504,22 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
@NotNull
|
||||
private Mono<Entry<byte[], byte[]>> putEntryToWriteBatch(Entry<byte[], byte[]> newEntry, boolean getOldValues,
|
||||
CappedWriteBatch writeBatch) {
|
||||
return Mono.from(Mono
|
||||
.defer(() -> {
|
||||
if (getOldValues) {
|
||||
return get(null, newEntry.getKey());
|
||||
} else {
|
||||
return Mono.empty();
|
||||
}
|
||||
})
|
||||
.concatWith(Mono.<byte[]>fromCallable(() -> {
|
||||
synchronized (writeBatch) {
|
||||
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(dbScheduler))
|
||||
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue)));
|
||||
Mono<byte[]> getOldValueMono;
|
||||
if (getOldValues) {
|
||||
getOldValueMono = get(null, newEntry.getKey());
|
||||
} else {
|
||||
getOldValueMono = Mono.empty();
|
||||
}
|
||||
return getOldValueMono
|
||||
.concatWith(Mono
|
||||
.<byte[]>fromCallable(() -> {
|
||||
writeBatch.put(cfh, newEntry.getKey(), newEntry.getValue());
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(dbScheduler)
|
||||
)
|
||||
.singleOrEmpty()
|
||||
.map(oldValue -> Map.entry(newEntry.getKey(), oldValue));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -637,7 +633,6 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}.generateNonblocking(dbScheduler, 128);
|
||||
}
|
||||
|
||||
//todo: replace implementation with a simple Flux.push
|
||||
@Override
|
||||
public Flux<Entry<byte[], byte[]>> setRange(LLRange range,
|
||||
Flux<Entry<byte[], byte[]>> entries,
|
||||
@ -646,51 +641,49 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
if (range.isAll()) {
|
||||
return clear().thenMany(Flux.empty());
|
||||
} else {
|
||||
return Mono
|
||||
.fromCallable(() -> new CappedWriteBatch(db,
|
||||
CAPPED_WRITE_BATCH_CAP,
|
||||
RESERVED_WRITE_BATCH_SIZE,
|
||||
MAX_WRITE_BATCH_SIZE,
|
||||
BATCH_WRITE_OPTIONS
|
||||
))
|
||||
.subscribeOn(dbScheduler)
|
||||
.flatMapMany(writeBatch -> Mono
|
||||
.fromCallable(() -> {
|
||||
synchronized (writeBatch) {
|
||||
if (range.hasMin() && range.hasMax()) {
|
||||
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
|
||||
writeBatch.delete(cfh, range.getMax());
|
||||
} else if (range.hasMax()) {
|
||||
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
|
||||
writeBatch.delete(cfh, range.getMax());
|
||||
} else {
|
||||
try (var it = db.newIterator(cfh, getReadOptions(null))) {
|
||||
it.seekToLast();
|
||||
if (it.isValid()) {
|
||||
writeBatch.deleteRange(cfh, range.getMin(), it.key());
|
||||
writeBatch.delete(cfh, it.key());
|
||||
return Flux
|
||||
.usingWhen(
|
||||
Mono
|
||||
.fromCallable(() -> new CappedWriteBatch(db,
|
||||
CAPPED_WRITE_BATCH_CAP,
|
||||
RESERVED_WRITE_BATCH_SIZE,
|
||||
MAX_WRITE_BATCH_SIZE,
|
||||
BATCH_WRITE_OPTIONS)
|
||||
)
|
||||
.subscribeOn(dbScheduler),
|
||||
writeBatch -> Mono
|
||||
.fromCallable(() -> {
|
||||
if (range.hasMin() && range.hasMax()) {
|
||||
writeBatch.deleteRange(cfh, range.getMin(), range.getMax());
|
||||
} else if (range.hasMax()) {
|
||||
writeBatch.deleteRange(cfh, FIRST_KEY, range.getMax());
|
||||
} else {
|
||||
// Delete from x to end of column
|
||||
var readOpts = getReadOptions(null);
|
||||
try (var it = db.newIterator(cfh, readOpts)) {
|
||||
it.seekToLast();
|
||||
if (it.isValid()) {
|
||||
writeBatch.deleteRange(cfh, range.getMin(), it.key());
|
||||
// Delete the last key because we are deleting everything from "min" onward
|
||||
writeBatch.delete(it.key());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(dbScheduler)
|
||||
.thenMany(entries)
|
||||
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch))
|
||||
.concatWith(Mono.<Entry<byte[], byte[]>>fromCallable(() -> {
|
||||
synchronized (writeBatch) {
|
||||
writeBatch.writeToDbAndClose();
|
||||
writeBatch.close();
|
||||
}
|
||||
return null;
|
||||
}).subscribeOn(dbScheduler))
|
||||
.doFinally(signalType -> {
|
||||
synchronized (writeBatch) {
|
||||
writeBatch.close();
|
||||
}
|
||||
})
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(dbScheduler)
|
||||
.thenMany(entries)
|
||||
.flatMap(newEntry -> putEntryToWriteBatch(newEntry, getOldValues, writeBatch)),
|
||||
writeBatch -> Mono
|
||||
.fromCallable(() -> {
|
||||
try (writeBatch) {
|
||||
writeBatch.writeToDbAndClose();
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.subscribeOn(dbScheduler)
|
||||
)
|
||||
.subscribeOn(dbScheduler)
|
||||
.onErrorMap(cause -> new IOException("Failed to write range", cause));
|
||||
}
|
||||
});
|
||||
@ -741,21 +734,23 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
} else {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try (var iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
|
||||
if (range.hasMin()) {
|
||||
iter.seek(range.getMin());
|
||||
} else {
|
||||
iter.seekToFirst();
|
||||
}
|
||||
var readOpts = resolveSnapshot(snapshot);
|
||||
readOpts.setFillCache(false);
|
||||
readOpts.setVerifyChecksums(false);
|
||||
if (range.hasMin()) {
|
||||
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
readOpts.setIterateUpperBound(new Slice(range.getMax()));
|
||||
}
|
||||
if (fast) {
|
||||
readOpts.setIgnoreRangeDeletions(true);
|
||||
readOpts.setPinData(false);
|
||||
}
|
||||
try (var iter = db.newIterator(cfh, readOpts)) {
|
||||
iter.seekToFirst();
|
||||
long i = 0;
|
||||
while (iter.isValid()) {
|
||||
if (range.hasMax()) {
|
||||
byte[] key1 = iter.key();
|
||||
if (Arrays.compareUnsigned(key1, range.getMax()) > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
iter.next();
|
||||
i++;
|
||||
}
|
||||
@ -773,18 +768,18 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
public Mono<Entry<byte[], byte[]>> getOne(@Nullable LLSnapshot snapshot, LLRange range) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
|
||||
if (range.hasMin()) {
|
||||
rocksIterator.seek(range.getMin());
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
var readOpts = resolveSnapshot(snapshot);
|
||||
if (range.hasMin()) {
|
||||
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
readOpts.setIterateUpperBound(new Slice(range.getMax()));
|
||||
}
|
||||
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
|
||||
rocksIterator.seekToFirst();
|
||||
byte[] key;
|
||||
if (rocksIterator.isValid()) {
|
||||
key = rocksIterator.key();
|
||||
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
|
||||
return null;
|
||||
}
|
||||
return Map.entry(key, rocksIterator.value());
|
||||
} else {
|
||||
return null;
|
||||
@ -798,18 +793,18 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
public Mono<byte[]> getOneKey(@Nullable LLSnapshot snapshot, LLRange range) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try (var rocksIterator = db.newIterator(cfh, resolveSnapshot(snapshot))) {
|
||||
if (range.hasMin()) {
|
||||
rocksIterator.seek(range.getMin());
|
||||
} else {
|
||||
rocksIterator.seekToFirst();
|
||||
}
|
||||
var readOpts = resolveSnapshot(snapshot);
|
||||
if (range.hasMin()) {
|
||||
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
readOpts.setIterateUpperBound(new Slice(range.getMax()));
|
||||
}
|
||||
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
|
||||
rocksIterator.seekToFirst();
|
||||
byte[] key;
|
||||
if (rocksIterator.isValid()) {
|
||||
key = rocksIterator.key();
|
||||
if (range.hasMax() && Arrays.compareUnsigned(key, range.getMax()) > 0) {
|
||||
return null;
|
||||
}
|
||||
return key;
|
||||
} else {
|
||||
return null;
|
||||
@ -820,7 +815,7 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
private long fastSizeAll(@Nullable LLSnapshot snapshot) {
|
||||
var rocksdbSnapshot = resolveSnapshot(snapshot);
|
||||
var rocksdbSnapshot = resolveSnapshot(snapshot).setFillCache(false).setVerifyChecksums(false);
|
||||
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
|
||||
try {
|
||||
return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
|
||||
@ -829,11 +824,15 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
return 0;
|
||||
}
|
||||
} else {
|
||||
rocksdbSnapshot.setFillCache(false);
|
||||
rocksdbSnapshot.setVerifyChecksums(false);
|
||||
rocksdbSnapshot.setIgnoreRangeDeletions(true);
|
||||
rocksdbSnapshot.setPinData(false);
|
||||
long count = 0;
|
||||
try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
|
||||
iter.seekToFirst();
|
||||
// If it's a fast size of a snapshot, count only up to 1000 elements
|
||||
while (iter.isValid() && count < 1000) {
|
||||
// If it's a fast size of a snapshot, count only up to 1'000'000 elements
|
||||
while (iter.isValid() && count < 1_000_000) {
|
||||
count++;
|
||||
iter.next();
|
||||
}
|
||||
@ -843,8 +842,12 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
}
|
||||
|
||||
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
|
||||
var readOpts = resolveSnapshot(snapshot);
|
||||
readOpts.setFillCache(false);
|
||||
readOpts.setVerifyChecksums(false);
|
||||
readOpts.setPinData(false);
|
||||
long count = 0;
|
||||
try (RocksIterator iter = db.newIterator(cfh, resolveSnapshot(snapshot))) {
|
||||
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
|
||||
iter.seekToFirst();
|
||||
while (iter.isValid()) {
|
||||
count++;
|
||||
@ -858,18 +861,18 @@ public class LLLocalDictionary implements LLDictionary {
|
||||
public Mono<Entry<byte[], byte[]>> removeOne(LLRange range) {
|
||||
return Mono
|
||||
.fromCallable(() -> {
|
||||
try (RocksIterator iter = db.newIterator(cfh)) {
|
||||
if (range.hasMin()) {
|
||||
iter.seek(range.getMin());
|
||||
} else {
|
||||
iter.seekToFirst();
|
||||
}
|
||||
var readOpts = getReadOptions(null);
|
||||
if (range.hasMin()) {
|
||||
readOpts.setIterateLowerBound(new Slice(range.getMin()));
|
||||
}
|
||||
if (range.hasMax()) {
|
||||
readOpts.setIterateUpperBound(new Slice(range.getMax()));
|
||||
}
|
||||
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
|
||||
iter.seekToFirst();
|
||||
if (!iter.isValid()) {
|
||||
return null;
|
||||
}
|
||||
if (range.hasMax() && Arrays.compareUnsigned(iter.key(), range.getMax()) > 0) {
|
||||
return null;
|
||||
}
|
||||
byte[] key = iter.key();
|
||||
byte[] value = iter.value();
|
||||
db.delete(cfh, key);
|
||||
|
@ -0,0 +1,98 @@
|
||||
package it.cavallium.dbengine.database.collections;
|
||||
|
||||
import java.util.Arrays;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TestRanges {
|
||||
@Test
|
||||
public void testNextRangeKey() {
|
||||
testNextRangeKey(new byte[] {0x00, 0x00, 0x00});
|
||||
testNextRangeKey(new byte[] {0x00, 0x00, 0x01});
|
||||
testNextRangeKey(new byte[] {0x00, 0x00, 0x02});
|
||||
testNextRangeKey(new byte[] {0x00, 0x01, 0x02});
|
||||
testNextRangeKey(new byte[] {0x00, 0x00, (byte) 0xFF});
|
||||
testNextRangeKey(new byte[] {0x00, 0x01, (byte) 0xFF});
|
||||
testNextRangeKey(new byte[] {0x00, (byte) 0xFF, (byte) 0xFF});
|
||||
testNextRangeKey(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF});
|
||||
testNextRangeKey(new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF});
|
||||
testNextRangeKey(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0});
|
||||
}
|
||||
|
||||
public void testNextRangeKey(byte[] prefixKey) {
|
||||
|
||||
byte[] firstRangeKey = DatabaseMapDictionaryDeep.firstRangeKey(prefixKey, prefixKey.length, 7, 3);
|
||||
byte[] nextRangeKey = DatabaseMapDictionaryDeep.nextRangeKey(prefixKey, prefixKey.length, 7, 3);
|
||||
|
||||
if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) {
|
||||
Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, nextRangeKey);
|
||||
} else {
|
||||
long biPrefix = 0;
|
||||
var s = 0;
|
||||
for (int i = prefixKey.length - 1; i >= 0; i--) {
|
||||
biPrefix += ((long) (prefixKey[i] & 0xFF)) << s;
|
||||
s += Byte.SIZE;
|
||||
}
|
||||
var nrPrefix = Arrays.copyOf(nextRangeKey, prefixKey.length);
|
||||
|
||||
long biNextPrefix = 0;
|
||||
s = 0;
|
||||
for (int i = prefixKey.length - 1; i >= 0; i--) {
|
||||
biNextPrefix += ((long) (nrPrefix[i] & 0xFF)) << s;
|
||||
s += Byte.SIZE;
|
||||
}
|
||||
Assertions.assertEquals(biPrefix + 1, biNextPrefix);
|
||||
Assertions.assertArrayEquals(
|
||||
new byte[7 + 3],
|
||||
Arrays.copyOfRange(nextRangeKey, prefixKey.length, prefixKey.length + 7 + 3)
|
||||
);
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void testNextRangeKeyWithSuffix() {
|
||||
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x01, (byte) 0xFF}, new byte[] {0x00, 0x00, 0x00});
|
||||
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x00, 0x01}, new byte[] {0x00, 0x00, 0x01});
|
||||
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x00, 0x02}, new byte[] {0x00, 0x00, 0x02});
|
||||
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x01, 0x02}, new byte[] {0x00, 0x01, 0x02});
|
||||
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x00, (byte) 0xFF}, new byte[] {0x00, 0x00, (byte) 0xFF});
|
||||
testNextRangeKeyWithSuffix(new byte[] {0x00, 0x01, (byte) 0xFF}, new byte[] {0x00, 0x01, (byte) 0xFF});
|
||||
testNextRangeKeyWithSuffix(new byte[] {0x00, (byte) 0xFF, (byte) 0xFF}, new byte[] {0x00, (byte) 0xFF, (byte) 0xFF});
|
||||
testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF});
|
||||
testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF}, new byte[] {(byte) 0xFF, (byte) 0, (byte) 0xFF});
|
||||
testNextRangeKeyWithSuffix(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0}, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0});
|
||||
}
|
||||
|
||||
public void testNextRangeKeyWithSuffix(byte[] prefixKey, byte[] suffixKey) {
|
||||
|
||||
byte[] firstRangeKey = DatabaseMapDictionaryDeep.firstRangeKey(prefixKey, suffixKey, prefixKey.length, 3, 7);
|
||||
byte[] nextRangeKey = DatabaseMapDictionaryDeep.nextRangeKey(prefixKey, suffixKey, prefixKey.length, 3, 7);
|
||||
|
||||
if (Arrays.equals(prefixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF}) && Arrays.equals(suffixKey, new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF})) {
|
||||
Assertions.assertArrayEquals(new byte[] {(byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, (byte) 0xFF, 0, 0, 0, 0, 0, 0, 0, 0}, nextRangeKey);
|
||||
} else {
|
||||
long biPrefix = 0;
|
||||
var s = 0;
|
||||
for (int i = (suffixKey.length) - 1; i >= 0; i--) {
|
||||
biPrefix += ((long) (suffixKey[i] & 0xFF)) << s;
|
||||
s += Byte.SIZE;
|
||||
}
|
||||
for (int i = (prefixKey.length) - 1; i >= 0; i--) {
|
||||
biPrefix += ((long) (prefixKey[i] & 0xFF)) << s;
|
||||
s += Byte.SIZE;
|
||||
}
|
||||
var nrPrefix = Arrays.copyOf(nextRangeKey, prefixKey.length + suffixKey.length);
|
||||
|
||||
long biNextPrefix = 0;
|
||||
s = 0;
|
||||
for (int i = (prefixKey.length + suffixKey.length) - 1; i >= 0; i--) {
|
||||
biNextPrefix += ((long) (nrPrefix[i] & 0xFF)) << s;
|
||||
s += Byte.SIZE;
|
||||
}
|
||||
Assertions.assertEquals(biPrefix + 1, biNextPrefix);
|
||||
Assertions.assertArrayEquals(
|
||||
new byte[7],
|
||||
Arrays.copyOfRange(nextRangeKey, prefixKey.length + suffixKey.length, prefixKey.length + suffixKey.length + 7)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user