Fix double-free, close all properties
This commit is contained in:
parent
fe31f9b1c7
commit
96de3023a0
@ -809,8 +809,8 @@ public class LLUtils {
|
||||
public static <U> Mono<Delta<U>> mapLLDelta(Mono<LLDelta> mono,
|
||||
SerializationFunction<@NotNull Buffer, @Nullable U> mapper) {
|
||||
return Mono.usingWhen(mono, delta -> Mono.fromCallable(() -> {
|
||||
try (Buffer prev = delta.previousUnsafe();
|
||||
Buffer curr = delta.currentUnsafe()) {
|
||||
Buffer prev = delta.previousUnsafe();
|
||||
Buffer curr = delta.currentUnsafe();
|
||||
U newPrev;
|
||||
U newCurr;
|
||||
if (prev != null) {
|
||||
@ -824,7 +824,6 @@ public class LLUtils {
|
||||
newCurr = null;
|
||||
}
|
||||
return new Delta<>(newPrev, newCurr);
|
||||
}
|
||||
}), delta -> Mono.fromRunnable(delta::close));
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,6 @@ import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import io.netty5.buffer.api.BufferAllocator;
|
||||
import io.netty5.buffer.api.internal.ResourceSupport;
|
||||
import io.netty5.util.internal.PlatformDependent;
|
||||
import it.cavallium.data.generator.nativedata.NullableString;
|
||||
import it.cavallium.dbengine.client.MemoryStats;
|
||||
@ -42,7 +41,6 @@ import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -121,6 +119,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
private final DatabaseOptions databaseOptions;
|
||||
|
||||
private final boolean enableColumnsBug;
|
||||
private final RocksDBRefs refs = new RocksDBRefs();
|
||||
private RocksDB db;
|
||||
private Statistics statistics;
|
||||
private Cache standardCache;
|
||||
@ -170,12 +169,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions);
|
||||
OptionsWithCache optionsWithCache = openRocksDb(path, databaseOptions, refs);
|
||||
var rocksdbOptions = optionsWithCache.options();
|
||||
try {
|
||||
List<ColumnFamilyDescriptor> descriptors = new ArrayList<>();
|
||||
|
||||
descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
|
||||
var defaultColumnOptions = new ColumnFamilyOptions();
|
||||
refs.track(defaultColumnOptions);
|
||||
descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultColumnOptions));
|
||||
|
||||
// Check column names validity
|
||||
for (NamedColumnOptions columnOption : databaseOptions.columnOptions()) {
|
||||
@ -192,6 +193,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
|
||||
for (Column column : columns) {
|
||||
var columnFamilyOptions = new ColumnFamilyOptions();
|
||||
refs.track(columnFamilyOptions);
|
||||
|
||||
var columnOptions = databaseOptions
|
||||
.columnOptions()
|
||||
@ -250,13 +252,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
|
||||
if (!columnOptions.levels().isEmpty()) {
|
||||
columnFamilyOptions.setNumLevels(columnOptions.levels().size());
|
||||
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0));
|
||||
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0), refs);
|
||||
columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType);
|
||||
columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions);
|
||||
|
||||
var lastLevelOptions = getRocksLevelOptions(columnOptions
|
||||
.levels()
|
||||
.get(columnOptions.levels().size() - 1));
|
||||
.get(columnOptions.levels().size() - 1), refs);
|
||||
columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType);
|
||||
columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions);
|
||||
|
||||
@ -276,9 +278,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
}
|
||||
}
|
||||
columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
|
||||
columnFamilyOptions.setBottommostCompressionOptions(new CompressionOptions()
|
||||
var compressionOptions = new CompressionOptions()
|
||||
.setEnabled(true)
|
||||
.setMaxDictBytes(32768));
|
||||
.setMaxDictBytes(32768);
|
||||
refs.track(compressionOptions);
|
||||
columnFamilyOptions.setBottommostCompressionOptions(compressionOptions);
|
||||
columnFamilyOptions.setCompressionPerLevel(compressionTypes);
|
||||
}
|
||||
|
||||
@ -300,6 +304,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
// If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1)
|
||||
// If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys
|
||||
final BloomFilter bloomFilter = new BloomFilter(bloomFilterOptions.bitsPerKey());
|
||||
refs.track(bloomFilter);
|
||||
tableOptions.setFilterPolicy(bloomFilter);
|
||||
}
|
||||
}
|
||||
@ -341,7 +346,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
.setBlockSize(columnOptions.blockSize().orElse((databaseOptions.spinning() ? 128 : 16) * 1024))
|
||||
.setBlockCacheCompressed(optionsWithCache.compressedCache())
|
||||
.setBlockCache(optionsWithCache.standardCache())
|
||||
.setPersistentCache(resolvePersistentCache(persistentCaches, rocksdbOptions, databaseOptions.persistentCaches(), columnOptions.persistentCacheId()));
|
||||
.setPersistentCache(resolvePersistentCache(persistentCaches,
|
||||
rocksdbOptions,
|
||||
databaseOptions.persistentCaches(),
|
||||
columnOptions.persistentCacheId(),
|
||||
refs
|
||||
));
|
||||
|
||||
columnFamilyOptions.setTableFormatConfig(tableOptions);
|
||||
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
|
||||
@ -437,11 +447,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
if (databaseOptions.optimistic()) {
|
||||
this.db = OptimisticTransactionDB.open(rocksdbOptions, dbPathString, descriptors, handles);
|
||||
} else {
|
||||
var transactionOptions = new TransactionDBOptions()
|
||||
.setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED)
|
||||
.setTransactionLockTimeout(5000)
|
||||
.setDefaultLockTimeout(5000);
|
||||
refs.track(transactionOptions);
|
||||
this.db = TransactionDB.open(rocksdbOptions,
|
||||
new TransactionDBOptions()
|
||||
.setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED)
|
||||
.setTransactionLockTimeout(5000)
|
||||
.setDefaultLockTimeout(5000),
|
||||
transactionOptions,
|
||||
dbPathString,
|
||||
descriptors,
|
||||
handles
|
||||
@ -480,6 +492,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
}
|
||||
}
|
||||
|
||||
handles.forEach(refs::track);
|
||||
|
||||
// compactDb(db, handles);
|
||||
flushDb(db, handles);
|
||||
} catch (RocksDBException ex) {
|
||||
@ -534,7 +548,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches,
|
||||
DBOptions rocksdbOptions,
|
||||
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
|
||||
NullableString persistentCacheId) throws RocksDBException {
|
||||
NullableString persistentCacheId,
|
||||
RocksDBRefs refs) throws RocksDBException {
|
||||
if (persistentCacheId.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
@ -558,6 +573,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
new RocksLog4jLogger(rocksdbOptions, logger),
|
||||
foundCache.optimizeForNvm()
|
||||
);
|
||||
refs.track(persistentCache);
|
||||
var prev = caches.put(persistentCacheId.get(), persistentCache);
|
||||
if (prev != null) {
|
||||
throw new IllegalStateException();
|
||||
@ -625,9 +641,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
}
|
||||
|
||||
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
|
||||
private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions) {
|
||||
private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) {
|
||||
var compressionType = levelOptions.compression().getType();
|
||||
var compressionOptions = new CompressionOptions();
|
||||
refs.track(compressionOptions);
|
||||
if (compressionType != CompressionType.NO_COMPRESSION) {
|
||||
compressionOptions.setEnabled(true);
|
||||
compressionOptions.setMaxDictBytes(levelOptions.maxDictBytes());
|
||||
@ -752,6 +769,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
logger.error("Can't close persistent cache", ex);
|
||||
}
|
||||
}
|
||||
refs.close();
|
||||
} finally {
|
||||
closeLock.unlockWrite(closeWriteLock);
|
||||
}
|
||||
@ -809,7 +827,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
|
||||
record OptionsWithCache(DBOptions options, @Nullable Cache standardCache, @Nullable Cache compressedCache) {}
|
||||
|
||||
private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions) throws IOException {
|
||||
private static OptionsWithCache openRocksDb(@Nullable Path path, DatabaseOptions databaseOptions, RocksDBRefs refs)
|
||||
throws IOException {
|
||||
// Get databases directory path
|
||||
Path databasesDirPath;
|
||||
if (path != null) {
|
||||
@ -825,6 +844,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
// the Options class contains a set of configurable DB options
|
||||
// that determines the behaviour of the database.
|
||||
var options = new DBOptions();
|
||||
refs.track(options);
|
||||
options.setEnablePipelinedWrite(true);
|
||||
var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1"));
|
||||
if (maxSubCompactions >= 0) {
|
||||
@ -897,8 +917,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
;
|
||||
// DO NOT USE ClockCache! IT'S BROKEN!
|
||||
blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 8L * SizeUnit.MB));
|
||||
refs.track(blockCache);
|
||||
if (databaseOptions.compressedBlockCache().isPresent()) {
|
||||
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get());
|
||||
refs.track(compressedCache);
|
||||
} else {
|
||||
compressedCache = null;
|
||||
}
|
||||
@ -932,8 +954,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
;
|
||||
// DO NOT USE ClockCache! IT'S BROKEN!
|
||||
blockCache = new LRUCache(writeBufferManagerSize + databaseOptions.blockCache().orElse( 512 * SizeUnit.MB));
|
||||
refs.track(blockCache);
|
||||
if (databaseOptions.compressedBlockCache().isPresent()) {
|
||||
compressedCache = new LRUCache(databaseOptions.compressedBlockCache().get());
|
||||
refs.track(compressedCache);
|
||||
} else {
|
||||
compressedCache = null;
|
||||
}
|
||||
@ -951,7 +975,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
||||
}
|
||||
|
||||
if (databaseOptions.writeBufferManager().isPresent()) {
|
||||
options.setWriteBufferManager(new WriteBufferManager(writeBufferManagerSize, blockCache, false));
|
||||
var writeBufferManager = new WriteBufferManager(writeBufferManagerSize, blockCache, false);
|
||||
refs.track(writeBufferManager);
|
||||
options.setWriteBufferManager(writeBufferManager);
|
||||
}
|
||||
|
||||
if (databaseOptions.useDirectIO()) {
|
||||
|
@ -0,0 +1,41 @@
|
||||
package it.cavallium.dbengine.database.disk;
|
||||
|
||||
import it.cavallium.dbengine.database.SafeCloseable;
|
||||
import java.util.ArrayList;
|
||||
import org.rocksdb.AbstractImmutableNativeReference;
|
||||
|
||||
public final class RocksDBRefs implements SafeCloseable {
|
||||
|
||||
private final ArrayList<AbstractImmutableNativeReference> list = new ArrayList<>();
|
||||
private boolean closed;
|
||||
|
||||
public RocksDBRefs() {
|
||||
}
|
||||
|
||||
public RocksDBRefs(Iterable<? extends AbstractImmutableNativeReference> it) {
|
||||
it.forEach(list::add);
|
||||
}
|
||||
|
||||
public synchronized void track(AbstractImmutableNativeReference ref) {
|
||||
if (closed) {
|
||||
throw new IllegalStateException("Closed");
|
||||
}
|
||||
list.add(ref);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
for (int i = list.size() - 1; i >= 0; i--) {
|
||||
var ref = list.get(i);
|
||||
|
||||
if (ref.isOwningHandle()) {
|
||||
ref.close();
|
||||
}
|
||||
}
|
||||
list.clear();
|
||||
list.trimToSize();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user