Support persistent cache, ensure that all snapshots are closed in time

This commit is contained in:
Andrea Cavalli 2022-04-15 02:41:06 +02:00
parent 798262d72b
commit 046c08e5bf
4 changed files with 173 additions and 42 deletions

View File

@ -21,6 +21,7 @@ interfacesData:
partitionFilters: -boolean partitionFilters: -boolean
filter: -Filter filter: -Filter
blockSize: -int blockSize: -int
persistentCacheId: -String
# versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999 # versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999
versions: versions:
0.0.0: 0.0.0:
@ -248,6 +249,7 @@ versions:
maxOpenFiles: -int maxOpenFiles: -int
blockCache: -long blockCache: -long
compressedBlockCache: -long compressedBlockCache: -long
persistentCaches: PersistentCache[]
writeBufferManager: -long writeBufferManager: -long
spinning: boolean spinning: boolean
defaultColumnOptions: DefaultColumnOptions defaultColumnOptions: DefaultColumnOptions
@ -261,6 +263,7 @@ versions:
partitionFilters: -boolean partitionFilters: -boolean
filter: -Filter filter: -Filter
blockSize: -int blockSize: -int
persistentCacheId: -String
# Remember to update ColumnOptions common getters # Remember to update ColumnOptions common getters
NamedColumnOptions: NamedColumnOptions:
data: data:
@ -271,10 +274,17 @@ versions:
partitionFilters: -boolean partitionFilters: -boolean
filter: -Filter filter: -Filter
blockSize: -int blockSize: -int
persistentCacheId: -String
BloomFilter: BloomFilter:
data: data:
bitsPerKey: int bitsPerKey: int
optimizeForHits: -boolean optimizeForHits: -boolean
PersistentCache:
data:
id: String
path: String
size: long
optimizeForNvm: boolean
DatabaseVolume: DatabaseVolume:
data: data:
volumePath: Path volumePath: Path

View File

@ -1,5 +1,6 @@
package it.cavallium.dbengine.client; package it.cavallium.dbengine.client;
import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.data.generator.nativedata.Nullableboolean; import it.cavallium.data.generator.nativedata.Nullableboolean;
import it.cavallium.data.generator.nativedata.Nullableint; import it.cavallium.data.generator.nativedata.Nullableint;
import it.cavallium.data.generator.nativedata.Nullablelong; import it.cavallium.data.generator.nativedata.Nullablelong;
@ -24,7 +25,8 @@ public class DefaultDatabaseOptions {
Nullableboolean.empty(), Nullableboolean.empty(),
Nullableboolean.empty(), Nullableboolean.empty(),
NullableFilter.empty(), NullableFilter.empty(),
Nullableint.empty() Nullableint.empty(),
NullableString.empty()
); );
public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions( public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions(
@ -34,7 +36,8 @@ public class DefaultDatabaseOptions {
Nullableboolean.empty(), Nullableboolean.empty(),
Nullableboolean.empty(), Nullableboolean.empty(),
NullableFilter.empty(), NullableFilter.empty(),
Nullableint.empty() Nullableint.empty(),
NullableString.empty()
); );
public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(), public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(),
@ -48,6 +51,7 @@ public class DefaultDatabaseOptions {
Nullableint.empty(), Nullableint.empty(),
Nullablelong.empty(), Nullablelong.empty(),
Nullablelong.empty(), Nullablelong.empty(),
Collections.emptyList(),
Nullablelong.empty(), Nullablelong.empty(),
false, false,
DEFAULT_DEFAULT_COLUMN_OPTIONS, DEFAULT_DEFAULT_COLUMN_OPTIONS,

View File

@ -9,6 +9,7 @@ import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer; import io.micrometer.core.instrument.Timer;
import io.netty5.buffer.api.BufferAllocator; import io.netty5.buffer.api.BufferAllocator;
import io.netty5.util.internal.PlatformDependent; import io.netty5.util.internal.PlatformDependent;
import it.cavallium.data.generator.nativedata.NullableString;
import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.client.MemoryStats;
import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.ColumnUtils;
import it.cavallium.dbengine.database.LLKeyValueDatabase; import it.cavallium.dbengine.database.LLKeyValueDatabase;
@ -42,6 +43,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -60,11 +63,13 @@ import org.rocksdb.CompressionOptions;
import org.rocksdb.CompressionType; import org.rocksdb.CompressionType;
import org.rocksdb.DBOptions; import org.rocksdb.DBOptions;
import org.rocksdb.DbPath; import org.rocksdb.DbPath;
import org.rocksdb.Env;
import org.rocksdb.FlushOptions; import org.rocksdb.FlushOptions;
import org.rocksdb.IndexType; import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel; import org.rocksdb.InfoLogLevel;
import org.rocksdb.LRUCache; import org.rocksdb.LRUCache;
import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException; import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot; import org.rocksdb.Snapshot;
@ -112,8 +117,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private Cache standardCache; private Cache standardCache;
private Cache compressedCache; private Cache compressedCache;
private final Map<Column, ColumnFamilyHandle> handles; private final Map<Column, ColumnFamilyHandle> handles;
private final HashMap<String, PersistentCache> persistentCaches;
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private volatile boolean closed = false; private volatile boolean closed = false;
@SuppressWarnings("SwitchStatementWithTooFewBranches") @SuppressWarnings("SwitchStatementWithTooFewBranches")
@ -163,6 +171,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} }
} }
this.persistentCaches = new HashMap<>();
for (Column column : columns) { for (Column column : columns) {
var columnFamilyOptions = new ColumnFamilyOptions(); var columnFamilyOptions = new ColumnFamilyOptions();
@ -269,6 +279,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setCacheIndexAndFilterBlocksWithHighPriority(true) .setCacheIndexAndFilterBlocksWithHighPriority(true)
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
// Enabling partition filters increase the reads by 2x
.setPartitionFilters(columnOptions.partitionFilters().orElse(false)) .setPartitionFilters(columnOptions.partitionFilters().orElse(false))
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setIndexType(IndexType.kTwoLevelIndexSearch) .setIndexType(IndexType.kTwoLevelIndexSearch)
@ -279,7 +290,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
.setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024)) .setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024))
.setBlockCacheCompressed(optionsWithCache.compressedCache()) .setBlockCacheCompressed(optionsWithCache.compressedCache())
.setBlockCache(optionsWithCache.standardCache()); .setBlockCache(optionsWithCache.standardCache())
.setPersistentCache(resolvePersistentCache(persistentCaches, rocksdbOptions, databaseOptions.persistentCaches(), columnOptions.persistentCacheId()));
columnFamilyOptions.setTableFormatConfig(tableOptions); columnFamilyOptions.setTableFormatConfig(tableOptions);
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
@ -446,6 +458,42 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false); registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false);
} }
private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches,
DBOptions rocksdbOptions,
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
NullableString persistentCacheId) throws RocksDBException {
if (persistentCacheId.isEmpty()) {
return null;
}
var existingPersistentCache = caches.get(persistentCacheId.get());
if (existingPersistentCache != null) {
return existingPersistentCache;
}
var foundCaches = persistentCaches
.stream()
.filter(cache -> cache.id().equals(persistentCacheId.get()))
.toList();
if (foundCaches.size() > 1) {
throw new IllegalArgumentException("There are " + foundCaches.size()
+ " defined persistent caches with the id \"" + persistentCacheId.get() + "\"");
}
for (it.cavallium.dbengine.rpc.current.data.PersistentCache foundCache : foundCaches) {
var persistentCache = new PersistentCache(Env.getDefault(),
foundCache.path(),
foundCache.size(),
new RocksLog4jLogger(rocksdbOptions, logger),
foundCache.optimizeForNvm()
);
var prev = caches.put(persistentCacheId.get(), persistentCache);
if (prev != null) {
throw new IllegalStateException();
}
return persistentCache;
}
throw new IllegalArgumentException("Persistent cache " + persistentCacheId.get() + " is not defined");
}
public Map<Column, ColumnFamilyHandle> getAllColumnFamilyHandles() { public Map<Column, ColumnFamilyHandle> getAllColumnFamilyHandles() {
return this.handles; return this.handles;
} }
@ -491,33 +539,56 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles) private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List<ColumnFamilyHandle> handles)
throws RocksDBException { throws RocksDBException {
if (db.isOwningHandle()) { var shutdownWriteLock = shutdownLock.writeLock();
flushDb(db, handles); shutdownWriteLock.lock();
} try {
if (closed) {
return;
}
closed = true;
if (db.isOwningHandle()) {
flushDb(db, handles);
}
for (ColumnFamilyHandle handle : handles) { for (ColumnFamilyHandle handle : handles) {
try { try {
handle.close(); handle.close();
} catch (Exception ex) { } catch (Exception ex) {
logger.error("Can't close column family", ex); logger.error("Can't close column family", ex);
}
}
snapshotsHandles.forEach((id, snapshot) -> {
try {
if (db.isOwningHandle() && snapshot.isOwningHandle()) {
db.releaseSnapshot(snapshot);
} }
} catch (Exception ex2) {
// ignore exception
logger.debug("Failed to release snapshot " + id, ex2);
} }
}); snapshotsHandles.forEach((id, snapshot) -> {
db.close(); try {
if (compressedCache != null) { if (db.isOwningHandle() && snapshot.isOwningHandle()) {
compressedCache.close(); db.releaseSnapshot(snapshot);
} }
if (standardCache != null) { } catch (Exception ex2) {
standardCache.close(); // ignore exception
logger.debug("Failed to release snapshot " + id, ex2);
}
});
snapshotsHandles.clear();
try {
db.closeE();
} catch (Exception ex) {
logger.error("Can't close database " + name + " at " + dbPath, ex);
}
if (compressedCache != null) {
compressedCache.close();
}
if (standardCache != null) {
standardCache.close();
}
for (PersistentCache persistentCache : persistentCaches.values()) {
try {
persistentCache.close();
} catch (Exception ex) {
logger.error("Can't close persistent cache", ex);
}
}
} finally {
shutdownWriteLock.unlock();
} }
} }
@ -692,7 +763,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
} }
options.setRowCache(blockCache);
if (databaseOptions.writeBufferManager().isPresent()) { if (databaseOptions.writeBufferManager().isPresent()) {
options.setWriteBufferManager(new WriteBufferManager(writeBufferManagerSize, blockCache)); options.setWriteBufferManager(new WriteBufferManager(writeBufferManagerSize, blockCache));
} }
@ -941,10 +1011,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<LLSnapshot> takeSnapshot() { public Mono<LLSnapshot> takeSnapshot() {
return Mono return Mono
.fromCallable(() -> snapshotTime.recordCallable(() -> { .fromCallable(() -> snapshotTime.recordCallable(() -> {
var snapshot = db.getSnapshot(); var shutdownReadLock = shutdownLock.readLock();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); shutdownReadLock.lock();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); try {
return new LLSnapshot(currentSnapshotSequenceNumber); if (closed) {
throw new IllegalStateException("Database closed");
}
var snapshot = db.getSnapshot();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
} finally {
shutdownReadLock.unlock();
}
})) }))
.subscribeOn(dbRScheduler); .subscribeOn(dbRScheduler);
} }
@ -953,18 +1032,27 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<Void> releaseSnapshot(LLSnapshot snapshot) { public Mono<Void> releaseSnapshot(LLSnapshot snapshot) {
return Mono return Mono
.<Void>fromCallable(() -> { .<Void>fromCallable(() -> {
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); var shutdownReadLock = shutdownLock.readLock();
if (dbSnapshot == null) { shutdownReadLock.lock();
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); try {
} if (closed) {
if (!db.isOwningHandle()) { throw new IllegalStateException("Database closed");
}
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
if (dbSnapshot == null) {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");
}
if (!db.isOwningHandle()) {
return null;
}
if (!dbSnapshot.isOwningHandle()) {
return null;
}
db.releaseSnapshot(dbSnapshot);
return null; return null;
} finally {
shutdownReadLock.unlock();
} }
if (!dbSnapshot.isOwningHandle()) {
return null;
}
db.releaseSnapshot(dbSnapshot);
return null;
}) })
.subscribeOn(dbRScheduler); .subscribeOn(dbRScheduler);
} }
@ -1061,4 +1149,5 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
logger.error(MARKER_ROCKSDB, "Failed to delete unused log files", ex); logger.error(MARKER_ROCKSDB, "Failed to delete unused log files", ex);
} }
} }
} }

View File

@ -0,0 +1,28 @@
package it.cavallium.dbengine.database.disk;
import org.apache.logging.log4j.Logger;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
class RocksLog4jLogger extends org.rocksdb.Logger {
private final Logger logger;
public RocksLog4jLogger(DBOptions rocksdbOptions, Logger logger) {
super(rocksdbOptions);
this.logger = logger;
}
@Override
protected void log(InfoLogLevel infoLogLevel, String logMsg) {
switch (infoLogLevel) {
case DEBUG_LEVEL -> logger.debug(logMsg);
case INFO_LEVEL -> logger.info(logMsg);
case WARN_LEVEL -> logger.warn(logMsg);
case ERROR_LEVEL -> logger.error(logMsg);
case FATAL_LEVEL -> logger.fatal(logMsg);
case HEADER_LEVEL -> logger.trace(logMsg);
default -> throw new UnsupportedOperationException(infoLogLevel + " level not supported");
}
}
}