From 046c08e5bf1217effc807e6339386e4339f90def Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 15 Apr 2022 02:41:06 +0200 Subject: [PATCH] Support persistent cache, ensure that all snapshots are closed in time --- src/main/data-generator/quic-rpc.yaml | 10 ++ .../client/DefaultDatabaseOptions.java | 8 +- .../disk/LLLocalKeyValueDatabase.java | 169 +++++++++++++----- .../database/disk/RocksLog4jLogger.java | 28 +++ 4 files changed, 173 insertions(+), 42 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksLog4jLogger.java diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index 3b3f434..1c5fa50 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -21,6 +21,7 @@ interfacesData: partitionFilters: -boolean filter: -Filter blockSize: -int + persistentCacheId: -String # versions must have only numbers, lowercase letters, dots, dashes. Maximum: 99.999.9999 versions: 0.0.0: @@ -248,6 +249,7 @@ versions: maxOpenFiles: -int blockCache: -long compressedBlockCache: -long + persistentCaches: PersistentCache[] writeBufferManager: -long spinning: boolean defaultColumnOptions: DefaultColumnOptions @@ -261,6 +263,7 @@ versions: partitionFilters: -boolean filter: -Filter blockSize: -int + persistentCacheId: -String # Remember to update ColumnOptions common getters NamedColumnOptions: data: @@ -271,10 +274,17 @@ versions: partitionFilters: -boolean filter: -Filter blockSize: -int + persistentCacheId: -String BloomFilter: data: bitsPerKey: int optimizeForHits: -boolean + PersistentCache: + data: + id: String + path: String + size: long + optimizeForNvm: boolean DatabaseVolume: data: volumePath: Path diff --git a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java index 217df66..aaad1a0 100644 --- a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java @@ -1,5 +1,6 @@ package it.cavallium.dbengine.client; +import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.data.generator.nativedata.Nullableboolean; import it.cavallium.data.generator.nativedata.Nullableint; import it.cavallium.data.generator.nativedata.Nullablelong; @@ -24,7 +25,8 @@ public class DefaultDatabaseOptions { Nullableboolean.empty(), Nullableboolean.empty(), NullableFilter.empty(), - Nullableint.empty() + Nullableint.empty(), + NullableString.empty() ); public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions( @@ -34,7 +36,8 @@ public class DefaultDatabaseOptions { Nullableboolean.empty(), Nullableboolean.empty(), NullableFilter.empty(), - Nullableint.empty() + Nullableint.empty(), + NullableString.empty() ); public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(), @@ -48,6 +51,7 @@ public class DefaultDatabaseOptions { Nullableint.empty(), Nullablelong.empty(), Nullablelong.empty(), + Collections.emptyList(), Nullablelong.empty(), false, DEFAULT_DEFAULT_COLUMN_OPTIONS, diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 11b8bf3..78a02d0 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -9,6 +9,7 @@ import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; import io.netty5.buffer.api.BufferAllocator; import io.netty5.util.internal.PlatformDependent; +import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.client.MemoryStats; import it.cavallium.dbengine.database.ColumnUtils; import it.cavallium.dbengine.database.LLKeyValueDatabase; @@ -42,6 +43,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; @@ -60,11 +63,13 @@ import org.rocksdb.CompressionOptions; import org.rocksdb.CompressionType; import org.rocksdb.DBOptions; import org.rocksdb.DbPath; +import org.rocksdb.Env; import org.rocksdb.FlushOptions; import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; import org.rocksdb.LRUCache; import org.rocksdb.OptimisticTransactionDB; +import org.rocksdb.PersistentCache; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.Snapshot; @@ -112,8 +117,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private Cache standardCache; private Cache compressedCache; private final Map handles; + + private final HashMap persistentCaches; private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); + private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock(); private volatile boolean closed = false; @SuppressWarnings("SwitchStatementWithTooFewBranches") @@ -163,6 +171,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } + this.persistentCaches = new HashMap<>(); + for (Column column : columns) { var columnFamilyOptions = new ColumnFamilyOptions(); @@ -269,6 +279,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setCacheIndexAndFilterBlocksWithHighPriority(true) .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + // Enabling partition filters increase the reads by 2x .setPartitionFilters(columnOptions.partitionFilters().orElse(false)) // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters .setIndexType(IndexType.kTwoLevelIndexSearch) @@ -279,7 +290,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html .setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024)) .setBlockCacheCompressed(optionsWithCache.compressedCache()) - .setBlockCache(optionsWithCache.standardCache()); + .setBlockCache(optionsWithCache.standardCache()) + .setPersistentCache(resolvePersistentCache(persistentCaches, rocksdbOptions, databaseOptions.persistentCaches(), columnOptions.persistentCacheId())); columnFamilyOptions.setTableFormatConfig(tableOptions); columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); @@ -446,6 +458,42 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { registerGauge(meterRegistry, name, "rocksdb.bloom.filter.full.true.positive", false); } + private synchronized PersistentCache resolvePersistentCache(HashMap caches, + DBOptions rocksdbOptions, + List persistentCaches, + NullableString persistentCacheId) throws RocksDBException { + if (persistentCacheId.isEmpty()) { + return null; + } + var existingPersistentCache = caches.get(persistentCacheId.get()); + if (existingPersistentCache != null) { + return existingPersistentCache; + } + + var foundCaches = persistentCaches + .stream() + .filter(cache -> cache.id().equals(persistentCacheId.get())) + .toList(); + if (foundCaches.size() > 1) { + throw new IllegalArgumentException("There are " + foundCaches.size() + + " defined persistent caches with the id \"" + persistentCacheId.get() + "\""); + } + for (it.cavallium.dbengine.rpc.current.data.PersistentCache foundCache : foundCaches) { + var persistentCache = new PersistentCache(Env.getDefault(), + foundCache.path(), + foundCache.size(), + new RocksLog4jLogger(rocksdbOptions, logger), + foundCache.optimizeForNvm() + ); + var prev = caches.put(persistentCacheId.get(), persistentCache); + if (prev != null) { + throw new IllegalStateException(); + } + return persistentCache; + } + throw new IllegalArgumentException("Persistent cache " + persistentCacheId.get() + " is not defined"); + } + public Map getAllColumnFamilyHandles() { return this.handles; } @@ -491,33 +539,56 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List handles) throws RocksDBException { - if (db.isOwningHandle()) { - flushDb(db, handles); - } + var shutdownWriteLock = shutdownLock.writeLock(); + shutdownWriteLock.lock(); + try { + if (closed) { + return; + } + closed = true; + if (db.isOwningHandle()) { + flushDb(db, handles); + } - for (ColumnFamilyHandle handle : handles) { - try { - handle.close(); - } catch (Exception ex) { - logger.error("Can't close column family", ex); - } - } - snapshotsHandles.forEach((id, snapshot) -> { - try { - if (db.isOwningHandle() && snapshot.isOwningHandle()) { - db.releaseSnapshot(snapshot); + for (ColumnFamilyHandle handle : handles) { + try { + handle.close(); + } catch (Exception ex) { + logger.error("Can't close column family", ex); } - } catch (Exception ex2) { - // ignore exception - logger.debug("Failed to release snapshot " + id, ex2); } - }); - db.close(); - if (compressedCache != null) { - compressedCache.close(); - } - if (standardCache != null) { - standardCache.close(); + snapshotsHandles.forEach((id, snapshot) -> { + try { + if (db.isOwningHandle() && snapshot.isOwningHandle()) { + db.releaseSnapshot(snapshot); + } + } catch (Exception ex2) { + // ignore exception + logger.debug("Failed to release snapshot " + id, ex2); + } + }); + snapshotsHandles.clear(); + try { + db.closeE(); + } catch (Exception ex) { + logger.error("Can't close database " + name + " at " + dbPath, ex); + } + if (compressedCache != null) { + compressedCache.close(); + } + if (standardCache != null) { + standardCache.close(); + } + + for (PersistentCache persistentCache : persistentCaches.values()) { + try { + persistentCache.close(); + } catch (Exception ex) { + logger.error("Can't close persistent cache", ex); + } + } + } finally { + shutdownWriteLock.unlock(); } } @@ -692,7 +763,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); } - options.setRowCache(blockCache); if (databaseOptions.writeBufferManager().isPresent()) { options.setWriteBufferManager(new WriteBufferManager(writeBufferManagerSize, blockCache)); } @@ -941,10 +1011,19 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono takeSnapshot() { return Mono .fromCallable(() -> snapshotTime.recordCallable(() -> { - var snapshot = db.getSnapshot(); - long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); - this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); - return new LLSnapshot(currentSnapshotSequenceNumber); + var shutdownReadLock = shutdownLock.readLock(); + shutdownReadLock.lock(); + try { + if (closed) { + throw new IllegalStateException("Database closed"); + } + var snapshot = db.getSnapshot(); + long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); + this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); + return new LLSnapshot(currentSnapshotSequenceNumber); + } finally { + shutdownReadLock.unlock(); + } })) .subscribeOn(dbRScheduler); } @@ -953,18 +1032,27 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono releaseSnapshot(LLSnapshot snapshot) { return Mono .fromCallable(() -> { - Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); - if (dbSnapshot == null) { - throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); - } - if (!db.isOwningHandle()) { + var shutdownReadLock = shutdownLock.readLock(); + shutdownReadLock.lock(); + try { + if (closed) { + throw new IllegalStateException("Database closed"); + } + Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); + if (dbSnapshot == null) { + throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); + } + if (!db.isOwningHandle()) { + return null; + } + if (!dbSnapshot.isOwningHandle()) { + return null; + } + db.releaseSnapshot(dbSnapshot); return null; + } finally { + shutdownReadLock.unlock(); } - if (!dbSnapshot.isOwningHandle()) { - return null; - } - db.releaseSnapshot(dbSnapshot); - return null; }) .subscribeOn(dbRScheduler); } @@ -1061,4 +1149,5 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { logger.error(MARKER_ROCKSDB, "Failed to delete unused log files", ex); } } + } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksLog4jLogger.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksLog4jLogger.java new file mode 100644 index 0000000..2d72b71 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksLog4jLogger.java @@ -0,0 +1,28 @@ +package it.cavallium.dbengine.database.disk; + +import org.apache.logging.log4j.Logger; +import org.rocksdb.DBOptions; +import org.rocksdb.InfoLogLevel; + +class RocksLog4jLogger extends org.rocksdb.Logger { + + private final Logger logger; + + public RocksLog4jLogger(DBOptions rocksdbOptions, Logger logger) { + super(rocksdbOptions); + this.logger = logger; + } + + @Override + protected void log(InfoLogLevel infoLogLevel, String logMsg) { + switch (infoLogLevel) { + case DEBUG_LEVEL -> logger.debug(logMsg); + case INFO_LEVEL -> logger.info(logMsg); + case WARN_LEVEL -> logger.warn(logMsg); + case ERROR_LEVEL -> logger.error(logMsg); + case FATAL_LEVEL -> logger.fatal(logMsg); + case HEADER_LEVEL -> logger.trace(logMsg); + default -> throw new UnsupportedOperationException(infoLogLevel + " level not supported"); + } + } +}