From 96de3023a0d5e4561b04c5f941ac773d2de67a88 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sun, 22 May 2022 16:48:08 +0200 Subject: [PATCH] Fix double-free, close all properties --- .../cavallium/dbengine/database/LLUtils.java | 5 +- .../disk/LLLocalKeyValueDatabase.java | 60 +++++++++++++------ .../dbengine/database/disk/RocksDBRefs.java | 41 +++++++++++++ 3 files changed, 86 insertions(+), 20 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksDBRefs.java diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index 5f21710..c87e86f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -809,8 +809,8 @@ public class LLUtils { public static Mono> mapLLDelta(Mono mono, SerializationFunction<@NotNull Buffer, @Nullable U> mapper) { return Mono.usingWhen(mono, delta -> Mono.fromCallable(() -> { - try (Buffer prev = delta.previousUnsafe(); - Buffer curr = delta.currentUnsafe()) { + Buffer prev = delta.previousUnsafe(); + Buffer curr = delta.currentUnsafe(); U newPrev; U newCurr; if (prev != null) { @@ -824,7 +824,6 @@ public class LLUtils { newCurr = null; } return new Delta<>(newPrev, newCurr); - } }), delta -> Mono.fromRunnable(delta::close)); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 2be906b..2626516 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -9,7 +9,6 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; import io.micrometer.core.instrument.Timer; import io.netty5.buffer.api.BufferAllocator; -import io.netty5.buffer.api.internal.ResourceSupport; import io.netty5.util.internal.PlatformDependent; import it.cavallium.data.generator.nativedata.NullableString; import it.cavallium.dbengine.client.MemoryStats; @@ -42,7 +41,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -121,6 +119,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final DatabaseOptions databaseOptions; private final boolean enableColumnsBug; + private final RocksDBRefs refs = new RocksDBRefs(); private RocksDB db; private Statistics statistics; private Cache standardCache; @@ -170,12 +169,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } - OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions); + OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions, refs); var rocksdbOptions = optionsWithCache.options(); try { List descriptors = new ArrayList<>(); - descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); + var defaultColumnOptions = new ColumnFamilyOptions(); + refs.track(defaultColumnOptions); + descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultColumnOptions)); // Check column names validity for (NamedColumnOptions columnOption : databaseOptions.columnOptions()) { @@ -192,6 +193,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { for (Column column : columns) { var columnFamilyOptions = new ColumnFamilyOptions(); + refs.track(columnFamilyOptions); var columnOptions = databaseOptions .columnOptions() @@ -250,13 +252,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (!columnOptions.levels().isEmpty()) { columnFamilyOptions.setNumLevels(columnOptions.levels().size()); - var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0)); + var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0), refs); columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType); columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions); var lastLevelOptions = getRocksLevelOptions(columnOptions .levels() - .get(columnOptions.levels().size() - 1)); + .get(columnOptions.levels().size() - 1), refs); columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType); columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions); @@ -276,9 +278,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION); - columnFamilyOptions.setBottommostCompressionOptions(new CompressionOptions() + var compressionOptions = new CompressionOptions() .setEnabled(true) - .setMaxDictBytes(32768)); + .setMaxDictBytes(32768); + refs.track(compressionOptions); + columnFamilyOptions.setBottommostCompressionOptions(compressionOptions); columnFamilyOptions.setCompressionPerLevel(compressionTypes); } @@ -300,6 +304,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1) // If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys final BloomFilter bloomFilter = new BloomFilter(bloomFilterOptions.bitsPerKey()); + refs.track(bloomFilter); tableOptions.setFilterPolicy(bloomFilter); } } @@ -341,7 +346,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024)) .setBlockCacheCompressed(optionsWithCache.compressedCache()) .setBlockCache(optionsWithCache.standardCache()) - .setPersistentCache(resolvePersistentCache(persistentCaches, rocksdbOptions, databaseOptions.persistentCaches(), columnOptions.persistentCacheId())); + .setPersistentCache(resolvePersistentCache(persistentCaches, + rocksdbOptions, + databaseOptions.persistentCaches(), + columnOptions.persistentCacheId(), + refs + )); columnFamilyOptions.setTableFormatConfig(tableOptions); columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); @@ -437,11 +447,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (databaseOptions.optimistic()) { this.db = OptimisticTransactionDB.open(rocksdbOptions, dbPathString, descriptors, handles); } else { + var transactionOptions = new TransactionDBOptions() + .setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED) + .setTransactionLockTimeout(5000) + .setDefaultLockTimeout(5000); + refs.track(transactionOptions); this.db = TransactionDB.open(rocksdbOptions, - new TransactionDBOptions() - .setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED) - .setTransactionLockTimeout(5000) - .setDefaultLockTimeout(5000), + transactionOptions, dbPathString, descriptors, handles @@ -480,6 +492,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } + handles.forEach(refs::track); + // compactDb(db, handles); flushDb(db, handles); } catch (RocksDBException ex) { @@ -534,7 +548,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private synchronized PersistentCache resolvePersistentCache(HashMap caches, DBOptions rocksdbOptions, List persistentCaches, - NullableString persistentCacheId) throws RocksDBException { + NullableString persistentCacheId, + RocksDBRefs refs) throws RocksDBException { if (persistentCacheId.isEmpty()) { return null; } @@ -558,6 +573,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { new RocksLog4jLogger(rocksdbOptions, logger), foundCache.optimizeForNvm() ); + refs.track(persistentCache); var prev = caches.put(persistentCacheId.get(), persistentCache); if (prev != null) { throw new IllegalStateException(); @@ -625,9 +641,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} - private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions) { + private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) { var compressionType = levelOptions.compression().getType(); var compressionOptions = new CompressionOptions(); + refs.track(compressionOptions); if (compressionType != CompressionType.NO_COMPRESSION) { compressionOptions.setEnabled(true); compressionOptions.setMaxDictBytes(levelOptions.maxDictBytes()); @@ -752,6 +769,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { logger.error("Can't close persistent cache", ex); } } + refs.close(); } finally { closeLock.unlockWrite(closeWriteLock); } @@ -809,7 +827,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { record OptionsWithCache(DBOptions options, @Nullable Cache standardCache, @Nullable Cache compressedCache) {} - private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException { + private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions, RocksDBRefs refs) + throws IOException { // Get databases directory path Path databasesDirPath; if (path != null) { @@ -825,6 +844,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // the Options class contains a set of configurable DB options // that determines the behaviour of the database. var options = new DBOptions(); + refs.track(options); options.setEnablePipelinedWrite(true); var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1")); if (maxSubCompactions >= 0) { @@ -897,8 +917,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { ; // DO NOT USE ClockCache! IT'S BROKEN! blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 8L * SizeUnit.MB)); + refs.track(blockCache); if (databaseOptions.compressedBlockCache().isPresent()) { compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get()); + refs.track(compressedCache); } else { compressedCache = null; } @@ -932,8 +954,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { ; // DO NOT USE ClockCache! IT'S BROKEN! blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 512 * SizeUnit.MB)); + refs.track(blockCache); if (databaseOptions.compressedBlockCache().isPresent()) { compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get()); + refs.track(compressedCache); } else { compressedCache = null; } @@ -951,7 +975,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } if (databaseOptions.writeBufferManager().isPresent()) { - options.setWriteBufferManager(new WriteBufferManager(writeBufferManagerSize, blockCache, false)); + var writeBufferManager = new WriteBufferManager(writeBufferManagerSize, blockCache, false); + refs.track(writeBufferManager); + options.setWriteBufferManager(writeBufferManager); } if (databaseOptions.useDirectIO()) { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBRefs.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBRefs.java new file mode 100644 index 0000000..36c0c63 --- /dev/null +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBRefs.java @@ -0,0 +1,41 @@ +package it.cavallium.dbengine.database.disk; + +import it.cavallium.dbengine.database.SafeCloseable; +import java.util.ArrayList; +import org.rocksdb.AbstractImmutableNativeReference; + +public final class RocksDBRefs implements SafeCloseable { + + private final ArrayList list = new ArrayList<>(); + private boolean closed; + + public RocksDBRefs() { + } + + public RocksDBRefs(Iterable it) { + it.forEach(list::add); + } + + public synchronized void track(AbstractImmutableNativeReference ref) { + if (closed) { + throw new IllegalStateException("Closed"); + } + list.add(ref); + } + + @Override + public synchronized void close() { + if (!closed) { + closed = true; + for (int i = list.size() - 1; i >= 0; i--) { + var ref = list.get(i); + + if (ref.isOwningHandle()) { + ref.close(); + } + } + list.clear(); + list.trimToSize(); + } + } +}