This commit is contained in:
Andrea Cavalli 2022-04-15 16:49:01 +02:00
parent 80ef0394b1
commit 4b627664aa
6 changed files with 49 additions and 22 deletions

View File

@ -322,7 +322,7 @@
<dependency> <dependency>
<groupId>org.rocksdb</groupId> <groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId> <artifactId>rocksdbjni</artifactId>
<version>7.0.4</version> <version>7.1.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.lucene</groupId> <groupId>org.apache.lucene</groupId>

View File

@ -86,6 +86,9 @@ public class LLUtils {
public static final AtomicBoolean hookRegistered = new AtomicBoolean(); public static final AtomicBoolean hookRegistered = new AtomicBoolean();
public static final boolean MANUAL_READAHEAD = false; public static final boolean MANUAL_READAHEAD = false;
public static final boolean FORCE_DISABLE_CHECKSUM_VERIFICATION
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checksum.disable.force", "false"));
static { static {
for (int i1 = 0; i1 < 256; i1++) { for (int i1 = 0; i1 < 256; i1++) {
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
@ -742,6 +745,10 @@ public class LLUtils {
readOptions.setVerifyChecksums(false); readOptions.setVerifyChecksums(false);
} }
if (FORCE_DISABLE_CHECKSUM_VERIFICATION) {
readOptions.setVerifyChecksums(false);
}
return readOptions; return readOptions;
} }

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database.disk;
import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP; import static io.netty5.buffer.api.StandardAllocationTypes.OFF_HEAP;
import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB; import static it.cavallium.dbengine.database.LLUtils.MARKER_ROCKSDB;
import static it.cavallium.dbengine.database.LLUtils.fromByteArray; import static it.cavallium.dbengine.database.LLUtils.fromByteArray;
import static it.cavallium.dbengine.database.LLUtils.isBoundedRange;
import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect; import static it.cavallium.dbengine.database.LLUtils.isReadOnlyDirect;
import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static it.cavallium.dbengine.database.LLUtils.toStringSafe;
import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA;
@ -82,7 +83,7 @@ public class LLLocalDictionary implements LLDictionary {
* It used to be false, * It used to be false,
* now it's true to avoid crashes during iterations on completely corrupted files * now it's true to avoid crashes during iterations on completely corrupted files
*/ */
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false; static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = !LLUtils.FORCE_DISABLE_CHECKSUM_VERIFICATION;
/** /**
* Default: true. Use false to debug problems with windowing. * Default: true. Use false to debug problems with windowing.
*/ */
@ -229,7 +230,7 @@ public class LLLocalDictionary implements LLDictionary {
*/ */
private ReadOptions getReadOptions(Snapshot snapshot) { private ReadOptions getReadOptions(Snapshot snapshot) {
if (snapshot != null) { if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshot); return LLUtils.generateCustomReadOptions(null, true, true, true).setSnapshot(snapshot);
} else { } else {
return EMPTY_READ_OPTIONS; return EMPTY_READ_OPTIONS;
} }
@ -290,7 +291,11 @@ public class LLLocalDictionary implements LLDictionary {
AbstractSlice<?> slice1 = null; AbstractSlice<?> slice1 = null;
AbstractSlice<?> slice2 = null; AbstractSlice<?> slice2 = null;
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot),
true,
isBoundedRange(range),
true
)) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
readOpts.setFillCache(fillCache); readOpts.setFillCache(fillCache);
if (range.hasMin()) { if (range.hasMin()) {
@ -881,7 +886,11 @@ public class LLLocalDictionary implements LLDictionary {
.<BadBlock>create(sink -> { .<BadBlock>create(sink -> {
var range = rangeSend.receive(); var range = rangeSend.receive();
sink.onDispose(range::close); sink.onDispose(range::close);
try (var ro = new ReadOptions(getReadOptions(null))) { try (var ro = LLUtils.generateCustomReadOptions(getReadOptions(null),
false,
isBoundedRange(range),
false
)) {
ro.setFillCache(false); ro.setFillCache(false);
if (!range.isSingle()) { if (!range.isSingle()) {
if (LLUtils.MANUAL_READAHEAD) { if (LLUtils.MANUAL_READAHEAD) {
@ -968,7 +977,11 @@ public class LLLocalDictionary implements LLDictionary {
assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread"; assert !Schedulers.isInNonBlockingThread() : "Called setRange in a nonblocking thread";
if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) { if (!USE_WRITE_BATCH_IN_SET_RANGE_DELETE || !USE_WRITE_BATCHES_IN_SET_RANGE) {
assert EMPTY_READ_OPTIONS.isOwningHandle(); assert EMPTY_READ_OPTIONS.isOwningHandle();
try (var opts = new ReadOptions(EMPTY_READ_OPTIONS)) { try (var opts = LLUtils.generateCustomReadOptions(EMPTY_READ_OPTIONS,
true,
isBoundedRange(range),
smallRange
)) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe()); minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, opts, IterateBound.LOWER, range.getMinUnsafe());
@ -1187,7 +1200,7 @@ public class LLLocalDictionary implements LLDictionary {
private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, Send<LLRange> rangeToReceive) private void deleteSmallRangeWriteBatch(WriteBatch writeBatch, Send<LLRange> rangeToReceive)
throws RocksDBException { throws RocksDBException {
try (var range = rangeToReceive.receive()) { try (var range = rangeToReceive.receive()) {
try (var readOpts = new ReadOptions(getReadOptions(null))) { try (var readOpts = LLUtils.generateCustomReadOptions(getReadOptions(null), true, isBoundedRange(range), true)) {
readOpts.setFillCache(false); readOpts.setFillCache(false);
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
@ -1251,7 +1264,7 @@ public class LLLocalDictionary implements LLDictionary {
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread"; assert !Schedulers.isInNonBlockingThread() : "Called clear in a nonblocking thread";
boolean shouldCompactLater = false; boolean shouldCompactLater = false;
try (var readOpts = new ReadOptions(getReadOptions(null))) { try (var readOpts = LLUtils.generateCustomReadOptions(getReadOptions(null), false, false, false)) {
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
// readOpts.setIgnoreRangeDeletions(true); // readOpts.setIgnoreRangeDeletions(true);
@ -1323,7 +1336,7 @@ public class LLLocalDictionary implements LLDictionary {
if (range.isAll()) { if (range.isAll()) {
sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot)); sink.next(fast ? fastSizeAll(snapshot) : exactSizeAll(snapshot));
} else { } else {
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), false, isBoundedRange(range), false)) {
readOpts.setFillCache(false); readOpts.setFillCache(false);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
ReleasableSlice minBound; ReleasableSlice minBound;
@ -1384,7 +1397,7 @@ public class LLLocalDictionary implements LLDictionary {
return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread"; assert !Schedulers.isInNonBlockingThread() : "Called getOne in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), true, true, true)) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
@ -1439,7 +1452,7 @@ public class LLLocalDictionary implements LLDictionary {
return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> { return rangeMono.publishOn(dbRScheduler).handle((rangeSend, sink) -> {
try (var range = rangeSend.receive()) { try (var range = rangeSend.receive()) {
assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread"; assert !Schedulers.isInNonBlockingThread() : "Called getOneKey in a nonblocking thread";
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), true, true, true)) {
ReleasableSlice minBound; ReleasableSlice minBound;
if (range.hasMin()) { if (range.hasMin()) {
minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe()); minBound = AbstractRocksDBColumn.setIterateBound(nettyDirect, readOpts, IterateBound.LOWER, range.getMinUnsafe());
@ -1518,8 +1531,7 @@ public class LLLocalDictionary implements LLDictionary {
if (Schedulers.isInNonBlockingThread()) { if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread"); throw new UnsupportedOperationException("Called exactSizeAll in a nonblocking thread");
} }
try (var readOpts = new ReadOptions(resolveSnapshot(snapshot))) { try (var readOpts = LLUtils.generateCustomReadOptions(resolveSnapshot(snapshot), false, false, false)) {
readOpts.setFillCache(false);
if (LLUtils.MANUAL_READAHEAD) { if (LLUtils.MANUAL_READAHEAD) {
readOpts.setReadaheadSize(128 * 1024); // 128KiB readOpts.setReadaheadSize(128 * 1024); // 128KiB
} }

View File

@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -153,6 +154,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
} }
this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
if (!enableColumnsBug) {
if (columns.stream().noneMatch(column -> column.name().equals("default"))) {
columns = Stream.concat(Stream.of(Column.of("default")), columns.stream()).toList();
}
}
OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions); OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions);
var rocksdbOptions = optionsWithCache.options(); var rocksdbOptions = optionsWithCache.options();
try { try {
@ -246,7 +255,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (!databaseOptions.lowMemory()) { if (!databaseOptions.lowMemory()) {
tableOptions.setOptimizeFiltersForMemory(true); // tableOptions.setOptimizeFiltersForMemory(true);
} }
tableOptions.setVerifyCompression(false); tableOptions.setVerifyCompression(false);
if (columnOptions.filter().isPresent()) { if (columnOptions.filter().isPresent()) {
@ -264,7 +273,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.orElse(true); .orElse(true);
if (databaseOptions.spinning()) { if (databaseOptions.spinning()) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
cacheIndexAndFilterBlocks = true; // cacheIndexAndFilterBlocks = true;
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMinWriteBufferNumberToMerge(3); columnFamilyOptions.setMinWriteBufferNumberToMerge(3);
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
@ -282,7 +291,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// Enabling partition filters increase the reads by 2x // Enabling partition filters increase the reads by 2x
.setPartitionFilters(columnOptions.partitionFilters().orElse(false)) .setPartitionFilters(columnOptions.partitionFilters().orElse(false))
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setIndexType(IndexType.kTwoLevelIndexSearch) .setIndexType(columnOptions.partitionFilters().orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch)
//todo: replace with kxxhash3 //todo: replace with kxxhash3
.setChecksumType(ChecksumType.kCRC32c) .setChecksumType(ChecksumType.kCRC32c)
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
@ -371,7 +380,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
); );
} }
} }
this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
createIfNotExists(descriptors, rocksdbOptions, standardCache, compressedCache, inMemory, dbPath, dbPathString); createIfNotExists(descriptors, rocksdbOptions, standardCache, compressedCache, inMemory, dbPath, dbPathString);
@ -790,11 +798,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
var paths = new ArrayList<DbPath>(volumes.size()); var paths = new ArrayList<DbPath>(volumes.size());
if (volumes.isEmpty()) { if (volumes.isEmpty()) {
return List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"), return List.of(new DbPath(databasesDirPath.resolve(path.getFileName() + "_hot"),
100L * 1024L * 1024L * 1024L), // 100GiB 0), // Legacy
new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"), new DbPath(databasesDirPath.resolve(path.getFileName() + "_cold"),
0), // Legacy 0), // Legacy
new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"), new DbPath(databasesDirPath.resolve(path.getFileName() + "_colder"),
0) 1000L * 1024L * 1024L * 1024L) // 1000GiB
); // Legacy ); // Legacy
} }
for (DatabaseVolume volume : volumes) { for (DatabaseVolume volume : volumes) {

View File

@ -8,6 +8,7 @@ import io.netty5.buffer.api.Send;
import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLDelta;
import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSingleton;
import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.LLSnapshot;
import it.cavallium.dbengine.database.LLUtils;
import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.UpdateReturnMode;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -72,7 +73,7 @@ public class LLLocalSingleton implements LLSingleton {
private ReadOptions resolveSnapshot(LLSnapshot snapshot) { private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) { if (snapshot != null) {
return new ReadOptions().setSnapshot(snapshotResolver.apply(snapshot)); return LLUtils.generateCustomReadOptions(null, true, true, true).setSnapshot(snapshotResolver.apply(snapshot));
} else { } else {
return EMPTY_READ_OPTIONS; return EMPTY_READ_OPTIONS;
} }

View File

@ -52,8 +52,7 @@ public sealed interface RocksDBColumn permits AbstractRocksDBColumn {
boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException; boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException;
void put(@NotNull WriteOptions writeOptions, Buffer key, void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException;
Buffer value) throws RocksDBException;
default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value) default void put(@NotNull WriteOptions writeOptions, byte[] key, byte[] value)
throws RocksDBException { throws RocksDBException {