Database options record

This commit is contained in:
Andrea Cavalli 2021-06-27 15:40:56 +02:00
parent 507101e453
commit c6d5beb33b
6 changed files with 80 additions and 60 deletions

View File

@ -3,6 +3,7 @@ package it.cavallium.dbengine.database;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import it.cavallium.dbengine.client.IndicizerAnalyzers; import it.cavallium.dbengine.client.IndicizerAnalyzers;
import it.cavallium.dbengine.client.IndicizerSimilarities; import it.cavallium.dbengine.client.IndicizerSimilarities;
import it.cavallium.dbengine.database.disk.DatabaseOptions;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer; import it.cavallium.dbengine.lucene.analyzer.TextFieldsAnalyzer;
import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity; import it.cavallium.dbengine.lucene.analyzer.TextFieldsSimilarity;
import java.time.Duration; import java.time.Duration;
@ -19,9 +20,7 @@ public interface LLDatabaseConnection {
Mono<? extends LLKeyValueDatabase> getDatabase(String name, Mono<? extends LLKeyValueDatabase> getDatabase(String name,
List<Column> columns, List<Column> columns,
Map<String, String> extraFlags, DatabaseOptions databaseOptions);
boolean lowMemory,
boolean inMemory);
Mono<? extends LLLuceneIndex> getLuceneIndex(String name, Mono<? extends LLLuceneIndex> getLuceneIndex(String name,
int instancesCount, int instancesCount,

View File

@ -0,0 +1,14 @@
package it.cavallium.dbengine.database.disk;
import io.soabase.recordbuilder.core.RecordBuilder;
import it.cavallium.dbengine.database.Column;
import java.util.List;
import java.util.Map;
@RecordBuilder
public record DatabaseOptions(Map<String, String> extraFlags,
boolean absoluteConsistency,
boolean lowMemory,
boolean inMemory,
boolean useDirectIO,
boolean allowMemoryMapping) {}

View File

@ -26,12 +26,10 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
private final ByteBufAllocator allocator; private final ByteBufAllocator allocator;
private final Path basePath; private final Path basePath;
private final boolean crashIfWalError;
public LLLocalDatabaseConnection(ByteBufAllocator allocator, Path basePath, boolean crashIfWalError) { public LLLocalDatabaseConnection(ByteBufAllocator allocator, Path basePath) {
this.allocator = allocator; this.allocator = allocator;
this.basePath = basePath; this.basePath = basePath;
this.crashIfWalError = crashIfWalError;
} }
@Override @Override
@ -54,9 +52,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
@Override @Override
public Mono<LLLocalKeyValueDatabase> getDatabase(String name, public Mono<LLLocalKeyValueDatabase> getDatabase(String name,
List<Column> columns, List<Column> columns,
Map<String, String> extraFlags, DatabaseOptions databaseOptions) {
boolean lowMemory,
boolean inMemory) {
return Mono return Mono
.fromCallable(() -> new LLLocalKeyValueDatabase( .fromCallable(() -> new LLLocalKeyValueDatabase(
allocator, allocator,
@ -64,10 +60,7 @@ public class LLLocalDatabaseConnection implements LLDatabaseConnection {
basePath.resolve("database_" + name), basePath.resolve("database_" + name),
columns, columns,
new LinkedList<>(), new LinkedList<>(),
extraFlags, databaseOptions
crashIfWalError,
lowMemory,
inMemory
)) ))
.subscribeOn(Schedulers.boundedElastic()); .subscribeOn(Schedulers.boundedElastic());
} }

View File

@ -20,7 +20,6 @@ import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -43,12 +42,10 @@ import org.rocksdb.RateLimiter;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WALRecoveryMode; import org.rocksdb.WALRecoveryMode;
import org.rocksdb.WriteBufferManager; import org.rocksdb.WriteBufferManager;
import org.warp.commonutils.log.Logger; import org.warp.commonutils.log.Logger;
import org.warp.commonutils.log.LoggerFactory; import org.warp.commonutils.log.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
@ -59,33 +56,35 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
RocksDB.loadLibrary(); RocksDB.loadLibrary();
} }
private static final boolean USE_DIRECT_IO = true;
protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyValueDatabase.class); protected static final Logger logger = LoggerFactory.getLogger(LLLocalKeyValueDatabase.class);
private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor( private static final ColumnFamilyDescriptor DEFAULT_COLUMN_FAMILY = new ColumnFamilyDescriptor(
RocksDB.DEFAULT_COLUMN_FAMILY); RocksDB.DEFAULT_COLUMN_FAMILY);
private final ByteBufAllocator allocator; private final ByteBufAllocator allocator;
private final Scheduler dbScheduler; private final Scheduler dbScheduler;
// Configurations
private final Path dbPath; private final Path dbPath;
private final boolean inMemory;
private final String name; private final String name;
private final DatabaseOptions databaseOptions;
private final boolean enableColumnsBug; private final boolean enableColumnsBug;
private RocksDB db; private RocksDB db;
private final Map<Column, ColumnFamilyHandle> handles; private final Map<Column, ColumnFamilyHandle> handles;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
@SuppressWarnings("SwitchStatementWithTooFewBranches")
public LLLocalKeyValueDatabase(ByteBufAllocator allocator, public LLLocalKeyValueDatabase(ByteBufAllocator allocator,
String name, String name,
Path path, Path path,
List<Column> columns, List<Column> columns,
List<ColumnFamilyHandle> handles, List<ColumnFamilyHandle> handles,
Map<String, String> extraFlags, DatabaseOptions databaseOptions) throws IOException {
boolean crashIfWalError, this.name = name;
boolean lowMemory,
boolean inMemory) throws IOException {
this.allocator = allocator; this.allocator = allocator;
Options options = openRocksDb(path, crashIfWalError, lowMemory); Options rocksdbOptions = openRocksDb(path, databaseOptions);
try { try {
List<ColumnFamilyDescriptor> descriptors = new LinkedList<>(); List<ColumnFamilyDescriptor> descriptors = new LinkedList<>();
descriptors descriptors
@ -97,32 +96,38 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// Get databases directory path // Get databases directory path
Path databasesDirPath = path.toAbsolutePath().getParent(); Path databasesDirPath = path.toAbsolutePath().getParent();
String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName(); String dbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName();
Path dbPath = Paths.get(dbPathString); Path dbPath = Paths.get(dbPathString);
this.dbPath = dbPath; this.dbPath = dbPath;
this.inMemory = inMemory;
this.name = name; // Set options
this.dbScheduler = Schedulers.newBoundedElastic(lowMemory ? Runtime.getRuntime().availableProcessors() this.databaseOptions = databaseOptions;
: Math.max(8, Runtime.getRuntime().availableProcessors()),
int threadCap;
if (databaseOptions.lowMemory()) {
threadCap = Math.max(8, Runtime.getRuntime().availableProcessors());
} else {
threadCap = Runtime.getRuntime().availableProcessors();
}
this.dbScheduler = Schedulers.newBoundedElastic(threadCap,
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
"db-" + name, "db-" + name,
60, 60,
true true
); );
this.enableColumnsBug = "true".equals(extraFlags.getOrDefault("enableColumnBug", "false")); this.enableColumnsBug = "true".equals(databaseOptions.extraFlags().getOrDefault("enableColumnBug", "false"));
createIfNotExists(descriptors, options, inMemory, this.dbPath, dbPathString); createIfNotExists(descriptors, rocksdbOptions, databaseOptions, dbPath, dbPathString);
// Create all column families that don't exist // Create all column families that don't exist
createAllColumns(descriptors, options, inMemory, dbPathString); createAllColumns(descriptors, rocksdbOptions, databaseOptions, dbPathString);
while (true) { while (true) {
try { try {
// a factory method that returns a RocksDB instance // a factory method that returns a RocksDB instance
this.db = RocksDB.open(new DBOptions(options), this.db = RocksDB.open(new DBOptions(rocksdbOptions),
dbPathString, dbPathString,
inMemory ? List.of(DEFAULT_COLUMN_FAMILY) : descriptors, databaseOptions.inMemory() ? List.of(DEFAULT_COLUMN_FAMILY) : descriptors,
handles handles
); );
break; break;
@ -130,19 +135,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
switch (ex.getMessage()) { switch (ex.getMessage()) {
case "Direct I/O is not supported by the specified DB." -> { case "Direct I/O is not supported by the specified DB." -> {
logger.warn(ex.getLocalizedMessage()); logger.warn(ex.getLocalizedMessage());
options rocksdbOptions
.setUseDirectReads(false) .setUseDirectReads(false)
.setUseDirectIoForFlushAndCompaction(false) .setUseDirectIoForFlushAndCompaction(false)
.setAllowMmapReads(true) .setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(true); .setAllowMmapWrites(databaseOptions.allowMemoryMapping());
} }
default -> throw ex; default -> throw ex;
} }
} }
} }
createInMemoryColumns(descriptors, inMemory, handles); createInMemoryColumns(descriptors, databaseOptions, handles);
this.handles = new HashMap<>(); this.handles = new HashMap<>();
if (enableColumnsBug) { if (enableColumnsBug && !databaseOptions.inMemory()) {
for (int i = 0; i < columns.size(); i++) { for (int i = 0; i < columns.size(); i++) {
this.handles.put(columns.get(i), handles.get(i)); this.handles.put(columns.get(i), handles.get(i));
} }
@ -220,8 +225,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
@SuppressWarnings({"CommentedOutCode", "PointlessArithmeticExpression"}) @SuppressWarnings({"CommentedOutCode", "PointlessArithmeticExpression"})
private static Options openRocksDb(Path path, boolean crashIfWalError, boolean lowMemory) private static Options openRocksDb(Path path, DatabaseOptions databaseOptions) throws IOException {
throws IOException {
// Get databases directory path // Get databases directory path
Path databasesDirPath = path.toAbsolutePath().getParent(); Path databasesDirPath = path.toAbsolutePath().getParent();
// Create base directories // Create base directories
@ -246,7 +250,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setMaxWriteBufferNumber(4); options.setMaxWriteBufferNumber(4);
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup options.setAvoidFlushDuringRecovery(false); // Flush all WALs during startup
options.setWalRecoveryMode(crashIfWalError options.setWalRecoveryMode(databaseOptions.absoluteConsistency()
? WALRecoveryMode.AbsoluteConsistency ? WALRecoveryMode.AbsoluteConsistency
: WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords : WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords
options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
@ -268,7 +272,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
//options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default //options.setWritableFileMaxBufferSize(1024 * 1024); // 1MB by default
//options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB //options.setCompactionReadaheadSize(2 * 1024 * 1024); // recommend at least 2MB
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (lowMemory) { if (databaseOptions.lowMemory()) {
// LOW MEMORY // LOW MEMORY
options options
.setLevelCompactionDynamicLevelBytes(false) .setLevelCompactionDynamicLevelBytes(false)
@ -315,10 +319,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
tableOptions.setFilterPolicy(bloomFilter); tableOptions.setFilterPolicy(bloomFilter);
options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(128L * 1024L * 1024L))); // 128MiB options.setWriteBufferManager(new WriteBufferManager(256L * 1024L * 1024L, new LRUCache(128L * 1024L * 1024L))); // 128MiB
if (USE_DIRECT_IO) { if (databaseOptions.useDirectIO()) {
options options
.setAllowMmapReads(false) .setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(false) .setAllowMmapWrites(databaseOptions.allowMemoryMapping())
.setUseDirectIoForFlushAndCompaction(true) .setUseDirectIoForFlushAndCompaction(true)
.setUseDirectReads(true) .setUseDirectReads(true)
// Option to enable readahead in compaction // Option to enable readahead in compaction
@ -329,8 +333,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
; ;
} else { } else {
options options
.setAllowMmapReads(true) .setAllowMmapReads(databaseOptions.allowMemoryMapping())
.setAllowMmapWrites(true); .setAllowMmapWrites(databaseOptions.allowMemoryMapping());
} }
} }
@ -341,8 +345,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return options; return options;
} }
private void createAllColumns(List<ColumnFamilyDescriptor> totalDescriptors, Options options, boolean inMemory, String dbPathString) throws RocksDBException { private void createAllColumns(List<ColumnFamilyDescriptor> totalDescriptors,
if (inMemory) { Options options,
DatabaseOptions databaseOptions,
String dbPathString) throws RocksDBException {
if (databaseOptions.inMemory()) {
return; return;
} }
List<byte[]> columnFamiliesToCreate = new LinkedList<>(); List<byte[]> columnFamiliesToCreate = new LinkedList<>();
@ -385,10 +392,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
private void createInMemoryColumns(List<ColumnFamilyDescriptor> totalDescriptors, private void createInMemoryColumns(List<ColumnFamilyDescriptor> totalDescriptors,
boolean inMemory, DatabaseOptions databaseOptions,
List<ColumnFamilyHandle> handles) List<ColumnFamilyHandle> handles)
throws RocksDBException { throws RocksDBException {
if (!inMemory) { if (!databaseOptions.inMemory()) {
return; return;
} }
List<byte[]> columnFamiliesToCreate = new LinkedList<>(); List<byte[]> columnFamiliesToCreate = new LinkedList<>();
@ -407,10 +414,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors, private void createIfNotExists(List<ColumnFamilyDescriptor> descriptors,
Options options, Options options,
boolean inMemory, DatabaseOptions databaseOptions,
Path dbPath, Path dbPath,
String dbPathString) throws RocksDBException { String dbPathString) throws RocksDBException {
if (inMemory) { if (databaseOptions.inMemory()) {
return; return;
} }
if (Files.notExists(dbPath)) { if (Files.notExists(dbPath)) {
@ -436,9 +443,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
handles.add(db.createColumnFamily(columnFamilyDescriptor)); handles.add(db.createColumnFamily(columnFamilyDescriptor));
} }
if (!inMemory) { flushAndCloseDb(db, handles);
flushAndCloseDb(db, handles);
}
} }
} }
@ -481,6 +486,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return cfh; return cfh;
} }
public DatabaseOptions getDatabaseOptions() {
return databaseOptions;
}
@Override @Override
public Mono<Long> getProperty(String propertyName) { public Mono<Long> getProperty(String propertyName) {
return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))

View File

@ -14,6 +14,7 @@ import it.cavallium.dbengine.database.collections.DatabaseStageEntry;
import it.cavallium.dbengine.database.collections.DatabaseStageMap; import it.cavallium.dbengine.database.collections.DatabaseStageMap;
import it.cavallium.dbengine.database.collections.SubStageGetterHashMap; import it.cavallium.dbengine.database.collections.SubStageGetterHashMap;
import it.cavallium.dbengine.database.collections.SubStageGetterMap; import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.disk.DatabaseOptions;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@ -54,10 +55,10 @@ public class DbTestUtils {
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath, true).connect()) .then(new LLLocalDatabaseConnection(DbTestUtils.ALLOCATOR, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb", .flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")), List.of(Column.dictionary("testmap"), Column.special("ints"), Column.special("longs")),
Map.of(), false, true new DatabaseOptions(Map.of(), true, false, true, false, true)
)), )),
action, action,
db -> db.close().then(Mono.fromCallable(() -> { db -> db.close().then(Mono.fromCallable(() -> {

View File

@ -13,6 +13,7 @@ import it.cavallium.dbengine.database.UpdateMode;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionary; import it.cavallium.dbengine.database.collections.DatabaseMapDictionary;
import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep; import it.cavallium.dbengine.database.collections.DatabaseMapDictionaryDeep;
import it.cavallium.dbengine.database.collections.SubStageGetterMap; import it.cavallium.dbengine.database.collections.SubStageGetterMap;
import it.cavallium.dbengine.database.disk.DatabaseOptions;
import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection; import it.cavallium.dbengine.database.disk.LLLocalDatabaseConnection;
import it.cavallium.dbengine.database.serialization.Serializer; import it.cavallium.dbengine.database.serialization.Serializer;
import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength;
@ -131,8 +132,11 @@ public class OldDatabaseTests {
return null; return null;
}) })
.subscribeOn(Schedulers.boundedElastic()) .subscribeOn(Schedulers.boundedElastic())
.then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath, true).connect()) .then(new LLLocalDatabaseConnection(PooledByteBufAllocator.DEFAULT, wrkspcPath).connect())
.flatMap(conn -> conn.getDatabase("testdb", List.of(Column.dictionary("testmap")), Map.of(), false, true)); .flatMap(conn -> conn.getDatabase("testdb",
List.of(Column.dictionary("testmap")),
new DatabaseOptions(Map.of(), true, false, true, false, true)
));
} }
private static final ByteBuf DUMMY_VALUE; private static final ByteBuf DUMMY_VALUE;