Add immutable read/write options

This commit is contained in:
Andrea Cavalli 2021-06-19 21:55:20 +02:00
parent 1bd1cb87de
commit 924dbf6bf6
3 changed files with 427 additions and 257 deletions

View File

@ -68,9 +68,9 @@ public class LLLocalDictionary implements LLDictionary {
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
static final int MULTI_GET_WINDOW = 500;
static final ReadOptions EMPTY_READ_OPTIONS = new ReadOptions();
static final WriteOptions EMPTY_WRITE_OPTIONS = new WriteOptions();
static final WriteOptions BATCH_WRITE_OPTIONS = new WriteOptions();
static final ReadOptions EMPTY_READ_OPTIONS = new UnmodifiableReadOptions();
static final WriteOptions EMPTY_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final boolean PREFER_SEEK_TO_FIRST = false;
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
@ -153,6 +153,9 @@ public class LLLocalDictionary implements LLDictionary {
return databaseName;
}
/**
* Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions!
*/
private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) {
return getReadOptions(snapshotResolver.apply(snapshot));
@ -161,6 +164,9 @@ public class LLLocalDictionary implements LLDictionary {
}
}
/**
* Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions!
*/
private ReadOptions getReadOptions(Snapshot snapshot) {
if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshot);
@ -388,7 +394,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
return Mono
.fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot);
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(false);
if (range.hasMin()) {
@ -423,6 +429,7 @@ public class LLLocalDictionary implements LLDictionary {
}
return rocksIterator.isValid();
}
}
})
.onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
.subscribeOn(dbScheduler)
@ -451,11 +458,12 @@ public class LLLocalDictionary implements LLDictionary {
int size = RocksDB.NOT_FOUND;
byte[] keyBytes = LLUtils.toArray(key);
Holder<byte[]> data = new Holder<>();
if (db.keyMayExist(cfh, resolveSnapshot(snapshot), keyBytes, data)) {
var unmodifiableReadOpts = resolveSnapshot(snapshot);
if (db.keyMayExist(cfh, unmodifiableReadOpts, keyBytes, data)) {
if (data.getValue() != null) {
size = data.getValue().length;
} else {
size = db.get(cfh, resolveSnapshot(snapshot), keyBytes, NO_DATA);
size = db.get(cfh, unmodifiableReadOpts, keyBytes, NO_DATA);
}
}
return size != RocksDB.NOT_FOUND;
@ -1296,7 +1304,7 @@ public class LLLocalDictionary implements LLDictionary {
return Mono
.<Void>fromCallable(() -> {
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
var opts = new ReadOptions(EMPTY_READ_OPTIONS);
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(opts, IterateBound.LOWER, range.getMin().retain());
@ -1326,6 +1334,7 @@ public class LLLocalDictionary implements LLDictionary {
} finally {
minBound.release();
}
}
} else if (USE_CAPPED_WRITE_BATCH_IN_SET_RANGE) {
try (var batch = new CappedWriteBatch(db,
CAPPED_WRITE_BATCH_CAP,
@ -1445,8 +1454,7 @@ public class LLLocalDictionary implements LLDictionary {
//todo: this is broken, check why
private void deleteSmallRangeWriteBatch(CappedWriteBatch writeBatch, LLRange range)
throws RocksDBException {
try {
var readOpts = getReadOptions(null);
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
@ -1484,8 +1492,7 @@ public class LLLocalDictionary implements LLDictionary {
private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, LLRange range)
throws RocksDBException {
try {
var readOpts = getReadOptions(null);
try (var readOpts = new ReadOptions(getReadOptions(null))) {
readOpts.setFillCache(false);
ReleasableSlice minBound;
if (range.hasMin()) {
@ -1656,7 +1663,7 @@ public class LLLocalDictionary implements LLDictionary {
} else {
return Mono
.fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot);
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound;
@ -1696,6 +1703,7 @@ public class LLLocalDictionary implements LLDictionary {
} finally {
minBound.release();
}
}
})
.onErrorMap(cause -> new IOException("Failed to get size of range "
+ range.toString(), cause))
@ -1714,7 +1722,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
return Mono
.fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot);
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
@ -1755,6 +1763,7 @@ public class LLLocalDictionary implements LLDictionary {
} finally {
minBound.release();
}
}
})
.subscribeOn(dbScheduler)
.doFirst(range::retain)
@ -1769,7 +1778,7 @@ public class LLLocalDictionary implements LLDictionary {
try {
return Mono
.fromCallable(() -> {
var readOpts = resolveSnapshot(snapshot);
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
@ -1802,6 +1811,7 @@ public class LLLocalDictionary implements LLDictionary {
} finally {
minBound.release();
}
}
})
.subscribeOn(dbScheduler)
.doFirst(range::retain)
@ -1812,7 +1822,7 @@ public class LLLocalDictionary implements LLDictionary {
}
private long fastSizeAll(@Nullable LLSnapshot snapshot) {
var rocksdbSnapshot = resolveSnapshot(snapshot);
try (var rocksdbSnapshot = new ReadOptions(resolveSnapshot(snapshot))) {
if (USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS || rocksdbSnapshot.snapshot() == null) {
try {
return db.getLongProperty(cfh, "rocksdb.estimate-num-keys");
@ -1838,9 +1848,10 @@ public class LLLocalDictionary implements LLDictionary {
}
}
}
}
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
var readOpts = resolveSnapshot(snapshot);
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) {
readOpts.setFillCache(false);
readOpts.setReadaheadSize(32 * 1024); // 32KiB
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
@ -1855,7 +1866,7 @@ public class LLLocalDictionary implements LLDictionary {
))
.map(range -> (Callable<Long>) () -> {
long partialCount = 0;
var rangeReadOpts = new ReadOptions(readOpts);
try (var rangeReadOpts = new ReadOptions(readOpts)) {
Slice sliceBegin;
if (range.getKey() != null) {
sliceBegin = new Slice(range.getKey());
@ -1891,6 +1902,7 @@ public class LLLocalDictionary implements LLDictionary {
sliceEnd.close();
}
}
}
})
.map(commonPool::submit)
.collect(Collectors.toList());
@ -1911,13 +1923,14 @@ public class LLLocalDictionary implements LLDictionary {
}
}
}
}
@Override
public Mono<Entry<ByteBuf, ByteBuf>> removeOne(LLRange range) {
try {
return Mono
.fromCallable(() -> {
var readOpts = getReadOptions(null);
try (var readOpts = new ReadOptions(getReadOptions(null))) {
ReleasableSlice minBound;
if (range.hasMin()) {
minBound = setIterateBound(readOpts, IterateBound.LOWER, range.getMin().retain());
@ -1950,6 +1963,7 @@ public class LLLocalDictionary implements LLDictionary {
} finally {
minBound.release();
}
}
})
.onErrorMap(cause -> new IOException("Failed to delete " + range.toString(), cause))
.subscribeOn(dbScheduler)

View File

@ -0,0 +1,108 @@
package it.cavallium.dbengine.database.disk;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AbstractTableFilter;
import org.rocksdb.ReadOptions;
import org.rocksdb.ReadTier;
import org.rocksdb.Snapshot;
import org.warp.commonutils.range.UnmodifiableRange;
public class UnmodifiableReadOptions extends ReadOptions {
public UnmodifiableReadOptions() {
}
public UnmodifiableReadOptions(ReadOptions readOptions) {
super(readOptions);
}
@Override
public ReadOptions setBackgroundPurgeOnIteratorCleanup(boolean b) {
throw uoe();
}
@Override
public ReadOptions setFillCache(boolean b) {
throw uoe();
}
@Override
public ReadOptions setSnapshot(Snapshot snapshot) {
throw uoe();
}
@Override
public ReadOptions setReadTier(ReadTier readTier) {
throw uoe();
}
@Override
public ReadOptions setTailing(boolean b) {
throw uoe();
}
@Override
public ReadOptions setVerifyChecksums(boolean b) {
throw uoe();
}
@Override
public ReadOptions setManaged(boolean b) {
throw uoe();
}
@Override
public ReadOptions setTotalOrderSeek(boolean b) {
throw uoe();
}
@Override
public ReadOptions setPrefixSameAsStart(boolean b) {
throw uoe();
}
@Override
public ReadOptions setPinData(boolean b) {
throw uoe();
}
@Override
public ReadOptions setReadaheadSize(long l) {
throw uoe();
}
@Override
public ReadOptions setMaxSkippableInternalKeys(long l) {
throw uoe();
}
@Override
public ReadOptions setIgnoreRangeDeletions(boolean b) {
throw uoe();
}
@Override
public ReadOptions setIterateLowerBound(AbstractSlice<?> abstractSlice) {
throw uoe();
}
@Override
public ReadOptions setIterateUpperBound(AbstractSlice<?> abstractSlice) {
throw uoe();
}
@Override
public ReadOptions setIterStartSeqnum(long l) {
throw uoe();
}
@Override
public ReadOptions setTableFilter(AbstractTableFilter abstractTableFilter) {
throw uoe();
}
private UnsupportedOperationException uoe() {
return new UnsupportedOperationException("Unmodifiable read options");
}
}

View File

@ -0,0 +1,48 @@
package it.cavallium.dbengine.database.disk;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AbstractTableFilter;
import org.rocksdb.ReadOptions;
import org.rocksdb.ReadTier;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteOptions;
public class UnmodifiableWriteOptions extends WriteOptions {
public UnmodifiableWriteOptions() {
}
public UnmodifiableWriteOptions(WriteOptions writeOptions) {
super(writeOptions);
}
@Override
public WriteOptions setDisableWAL(boolean b) {
throw uoe();
}
@Override
public WriteOptions setIgnoreMissingColumnFamilies(boolean b) {
throw uoe();
}
@Override
public WriteOptions setLowPri(boolean b) {
throw uoe();
}
@Override
public WriteOptions setNoSlowdown(boolean b) {
throw uoe();
}
@Override
public WriteOptions setSync(boolean b) {
throw uoe();
}
private UnsupportedOperationException uoe() {
return new UnsupportedOperationException("Unmodifiable read options");
}
}