From 5888bc96b408e539ee8ca74056b1ae341e244ebb Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 18 Apr 2024 14:48:16 +0200 Subject: [PATCH] Performance optimization and code cleanup - Refactor options - Update dependencies - Separate Read-Write pool --- pom.xml | 22 +++++-- src/main/data-generator/quic-rpc.yaml | 39 ++---------- .../client/DefaultDatabaseOptions.java | 23 ++----- .../dbengine/database/LLDictionary.java | 1 + .../database/LLKeyValueDatabaseStructure.java | 6 ++ .../database/LLMultiDatabaseConnection.java | 6 +- .../database/collections/DatabaseInt.java | 11 ++++ .../database/collections/DatabaseLong.java | 11 ++++ .../DatabaseMapDictionaryDeep.java | 11 ++++ .../DatabaseMapDictionaryHashed.java | 11 ++++ .../collections/DatabaseMapSingle.java | 11 ++++ .../collections/DatabaseSingleBucket.java | 11 ++++ .../collections/DatabaseSingleMapped.java | 11 ++++ .../collections/DatabaseSingleton.java | 11 ++++ .../database/collections/DatabaseStage.java | 4 ++ .../collections/DatabaseStageMap.java | 7 +-- .../database/disk/AbstractRocksDBColumn.java | 19 +++++- .../database/disk/LLLocalDictionary.java | 27 +++++--- .../disk/LLLocalKeyValueDatabase.java | 63 +++++++++++++++---- .../database/disk/LLLocalSingleton.java | 11 ++++ .../disk/OptimisticRocksDBColumn.java | 7 ++- .../disk/PessimisticRocksDBColumn.java | 7 ++- .../dbengine/database/disk/RocksDBColumn.java | 5 ++ .../database/disk/StandardRocksDBColumn.java | 7 ++- .../database/memory/LLMemoryDictionary.java | 11 ++++ .../memory/LLMemoryKeyValueDatabase.java | 11 ++++ .../database/memory/LLMemorySingleton.java | 11 ++++ .../cavallium/dbengine/utils/StreamUtils.java | 4 -- .../dbengine/tests/TestVersionsLeak.java | 3 +- 29 files changed, 285 insertions(+), 97 deletions(-) diff --git a/pom.xml b/pom.xml index 17c78ce..2460562 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ it.cavallium dbengine - 4.0.${revision} + 4.2.${revision} jar @@ -13,8 +13,8 @@ 0-SNAPSHOT false 1.10.4 - 9.9.1 - 8.10.0 + 9.10.0 + 9.0.0 5.9.0 1.0.26 @@ -66,11 +66,23 @@ mchv-release-distribution MCHV Release Apache Maven Packages Distribution https://mvn.mchv.eu/repository/mchv + + true + + + false + mchv-snapshot-distribution MCHV Snapshot Apache Maven Packages Distribution https://mvn.mchv.eu/repository/mchv-snapshot + + false + + + true + @@ -92,7 +104,7 @@ com.google.guava guava - 31.1-jre + 33.0.0-jre org.yaml @@ -159,7 +171,7 @@ org.apache.logging.log4j log4j-slf4j2-impl - 2.20.0 + 2.22.1 test diff --git a/src/main/data-generator/quic-rpc.yaml b/src/main/data-generator/quic-rpc.yaml index afee9c9..4b27277 100644 --- a/src/main/data-generator/quic-rpc.yaml +++ b/src/main/data-generator/quic-rpc.yaml @@ -13,20 +13,6 @@ interfacesData: extendInterfaces: [RPCEvent] ServerBoundResponse: extendInterfaces: [RPCEvent] - ColumnOptions: - commonGetters: - levels: DatabaseLevel[] - memtableMemoryBudgetBytes: -long - cacheIndexAndFilterBlocks: -boolean - partitionFilters: -boolean - filter: -Filter - blockSize: -int - persistentCacheId: -String - writeBufferSize: -long - blobFiles: boolean - minBlobSize: -long - blobFileSize: -long - blobCompressionType: -Compression superTypesData: RPCEvent: [ Empty, @@ -94,10 +80,6 @@ superTypesData: NoFilter, BloomFilter ] - ColumnOptions: [ - DefaultColumnOptions, - NamedColumnOptions - ] customTypesData: Path: javaClass: java.nio.file.Path @@ -251,14 +233,13 @@ baseTypesData: persistentCaches: PersistentCache[] writeBufferManager: -long spinning: boolean - defaultColumnOptions: DefaultColumnOptions + defaultColumnOptions: ColumnOptions columnOptions: NamedColumnOptions[] logPath: -String walPath: -String openAsSecondary: boolean secondaryDirectoryName: -String - # Remember to update ColumnOptions common getters - DefaultColumnOptions: + ColumnOptions: data: levels: DatabaseLevel[] memtableMemoryBudgetBytes: -long @@ -272,22 +253,10 @@ baseTypesData: minBlobSize: -long blobFileSize: -long blobCompressionType: -Compression - # Remember to update ColumnOptions common getters NamedColumnOptions: data: - columnName: String - levels: DatabaseLevel[] - memtableMemoryBudgetBytes: -long - cacheIndexAndFilterBlocks: -boolean - partitionFilters: -boolean - filter: -Filter - blockSize: -int - persistentCacheId: -String - writeBufferSize: -long - blobFiles: boolean - minBlobSize: -long - blobFileSize: -long - blobCompressionType: -Compression + name: String + options: ColumnOptions NoFilter: data: {} BloomFilter: diff --git a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java index 18567fe..58a8655 100644 --- a/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java +++ b/src/main/java/it/cavallium/dbengine/client/DefaultDatabaseOptions.java @@ -4,10 +4,10 @@ import it.cavallium.datagen.nativedata.NullableString; import it.cavallium.datagen.nativedata.Nullableboolean; import it.cavallium.datagen.nativedata.Nullableint; import it.cavallium.datagen.nativedata.Nullablelong; +import it.cavallium.dbengine.rpc.current.data.ColumnOptions; +import it.cavallium.dbengine.rpc.current.data.ColumnOptionsBuilder; import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.rpc.current.data.DatabaseOptionsBuilder; -import it.cavallium.dbengine.rpc.current.data.DefaultColumnOptions; -import it.cavallium.dbengine.rpc.current.data.DefaultColumnOptionsBuilder; import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import it.cavallium.dbengine.rpc.current.data.NamedColumnOptionsBuilder; import it.cavallium.dbengine.rpc.current.data.nullables.NullableCompression; @@ -20,7 +20,7 @@ import org.rocksdb.RocksDB; public class DefaultDatabaseOptions { - public static DefaultColumnOptions DEFAULT_DEFAULT_COLUMN_OPTIONS = new DefaultColumnOptions( + public static ColumnOptions DEFAULT_DEFAULT_COLUMN_OPTIONS = new ColumnOptions( Collections.emptyList(), Nullablelong.empty(), Nullableboolean.empty(), @@ -37,18 +37,7 @@ public class DefaultDatabaseOptions { public static NamedColumnOptions DEFAULT_NAMED_COLUMN_OPTIONS = new NamedColumnOptions( new String(RocksDB.DEFAULT_COLUMN_FAMILY, StandardCharsets.UTF_8), - Collections.emptyList(), - Nullablelong.empty(), - Nullableboolean.empty(), - Nullableboolean.empty(), - NullableFilter.empty(), - Nullableint.empty(), - NullableString.empty(), - Nullablelong.empty(), - false, - Nullablelong.empty(), - Nullablelong.empty(), - NullableCompression.empty() + DEFAULT_DEFAULT_COLUMN_OPTIONS ); public static DatabaseOptions DEFAULT_DATABASE_OPTIONS = new DatabaseOptions(List.of(), @@ -75,8 +64,8 @@ public class DefaultDatabaseOptions { return DatabaseOptionsBuilder.builder(DEFAULT_DATABASE_OPTIONS); } - public static DefaultColumnOptionsBuilder defaultColumnOptionsBuilder() { - return DefaultColumnOptionsBuilder.builder(DEFAULT_DEFAULT_COLUMN_OPTIONS); + public static ColumnOptionsBuilder defaultColumnOptionsBuilder() { + return ColumnOptionsBuilder.builder(DEFAULT_DEFAULT_COLUMN_OPTIONS); } public static NamedColumnOptionsBuilder namedColumnOptionsBuilder() { diff --git a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java index 8828ac9..f2e424b 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/LLDictionary.java @@ -6,6 +6,7 @@ import it.cavallium.dbengine.client.SSTVerificationProgress; import it.cavallium.dbengine.database.serialization.KVSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.List; +import java.util.concurrent.ForkJoinPool; import java.util.function.Function; import java.util.stream.Stream; import org.jetbrains.annotations.NotNull; diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabaseStructure.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabaseStructure.java index fe9470d..f132b6f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabaseStructure.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabaseStructure.java @@ -1,6 +1,12 @@ package it.cavallium.dbengine.database; +import java.util.concurrent.ForkJoinPool; + public interface LLKeyValueDatabaseStructure { String getDatabaseName(); + + ForkJoinPool getDbReadPool(); + + ForkJoinPool getDbWritePool(); } diff --git a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java index d941514..7a08f93 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/LLMultiDatabaseConnection.java @@ -1,6 +1,6 @@ package it.cavallium.dbengine.database; -import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; +import static it.cavallium.dbengine.utils.StreamUtils.collect; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; @@ -88,7 +88,7 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { @Override public LLDatabaseConnection connect() { - collectOn(ROCKSDB_POOL, allConnections.stream(), executing(connection -> { + collect(allConnections.stream(), executing(connection -> { try { connection.connect(); } catch (Exception ex) { @@ -166,7 +166,7 @@ public class LLMultiDatabaseConnection implements LLDatabaseConnection { @Override public void disconnect() { - collectOn(ROCKSDB_POOL, allConnections.stream(), executing(connection -> { + collect(allConnections.stream(), executing(connection -> { try { connection.disconnect(); } catch (Exception ex) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java index 9572c6f..08b7de4 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseInt.java @@ -6,6 +6,7 @@ import it.cavallium.dbengine.database.LLKeyValueDatabaseStructure; import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import java.util.concurrent.ForkJoinPool; import org.jetbrains.annotations.Nullable; public class DatabaseInt implements LLKeyValueDatabaseStructure { @@ -33,4 +34,14 @@ public class DatabaseInt implements LLKeyValueDatabaseStructure { public String getDatabaseName() { return singleton.getDatabaseName(); } + + @Override + public ForkJoinPool getDbReadPool() { + return singleton.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return singleton.getDbWritePool(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java index c78e3fc..25ad1c7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseLong.java @@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.LLSingleton; import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializerFixedBinaryLength; +import java.util.concurrent.ForkJoinPool; import org.jetbrains.annotations.Nullable; public class DatabaseLong implements LLKeyValueDatabaseStructure { @@ -81,4 +82,14 @@ public class DatabaseLong implements LLKeyValueDatabaseStructure { public String getDatabaseName() { return singleton.getDatabaseName(); } + + @Override + public ForkJoinPool getDbReadPool() { + return singleton.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return singleton.getDbWritePool(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java index d7fc254..b01e0b0 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryDeep.java @@ -24,6 +24,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletionException; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import org.apache.commons.lang3.function.TriFunction; @@ -310,6 +311,16 @@ public class DatabaseMapDictionaryDeep> implem return resourceStream(() -> this.getAllEntries(null, false), () -> setAllEntries(entries)); } + @Override + public ForkJoinPool getDbReadPool() { + return dictionary.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return dictionary.getDbWritePool(); + } + @Override public void clear() { if (range.isAll()) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java index 71678ad..a4686a7 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapDictionaryHashed.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ForkJoinPool; import java.util.function.Function; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -109,6 +110,16 @@ public class DatabaseMapDictionaryHashed implements DatabaseStageMap get(@Nullable CompositeSnapshot snapshot) { var v = subDictionary.get(snapshot); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java index dc8f8c9..2dc5e19 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseMapSingle.java @@ -17,6 +17,7 @@ import it.cavallium.dbengine.database.disk.CachedSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,6 +68,16 @@ public final class DatabaseMapSingle implements DatabaseStageEntry { return valBuf.asList(); } + @Override + public ForkJoinPool getDbReadPool() { + return dictionary.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return dictionary.getDbWritePool(); + } + @Override public U get(@Nullable CompositeSnapshot snapshot) { var result = dictionary.get(resolveSnapshot(snapshot), key); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java index 374f76c..53e5ead 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleBucket.java @@ -12,6 +12,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,6 +38,16 @@ public class DatabaseSingleBucket implements DatabaseStageEntry { this.bucketStage = (DatabaseStageEntry>>) bucketStage; } + @Override + public ForkJoinPool getDbReadPool() { + return bucketStage.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return bucketStage.getDbWritePool(); + } + @Override public V get(@Nullable CompositeSnapshot snapshot) { var entries = bucketStage.get(snapshot); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java index 8e4fb39..39e693f 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleMapped.java @@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.disk.CachedSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import org.jetbrains.annotations.Nullable; @@ -29,6 +30,16 @@ public class DatabaseSingleMapped implements DatabaseStageEntry { this.serializedSingle = (DatabaseStageEntry) serializedSingle; } + @Override + public ForkJoinPool getDbReadPool() { + return serializedSingle.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return serializedSingle.getDbWritePool(); + } + @Override public A get(@Nullable CompositeSnapshot snapshot) { var data = serializedSingle.get(snapshot); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java index 69dd3d4..5b6209e 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseSingleton.java @@ -14,6 +14,7 @@ import it.cavallium.dbengine.database.disk.CachedSerializationFunction; import it.cavallium.dbengine.database.serialization.SerializationException; import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.database.serialization.Serializer; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -65,6 +66,16 @@ public class DatabaseSingleton implements DatabaseStageEntry { return valBuf.asList(); } + @Override + public ForkJoinPool getDbReadPool() { + return this.singleton.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return this.singleton.getDbWritePool(); + } + @Override public U get(@Nullable CompositeSnapshot snapshot) { Buf result = singleton.get(resolveSnapshot(snapshot)); diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java index f1b13fa..e2daf83 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStage.java @@ -8,11 +8,15 @@ import it.cavallium.dbengine.database.LLUtils; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.util.Objects; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import org.jetbrains.annotations.Nullable; public interface DatabaseStage extends DatabaseStageWithEntry { + ForkJoinPool getDbReadPool(); + ForkJoinPool getDbWritePool(); + @Nullable T get(@Nullable CompositeSnapshot snapshot); default T getOrDefault(@Nullable CompositeSnapshot snapshot, T defaultValue, boolean existsAlmostCertainly) { diff --git a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java index 41d027e..0615127 100644 --- a/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java +++ b/src/main/java/it/cavallium/dbengine/database/collections/DatabaseStageMap.java @@ -1,6 +1,5 @@ package it.cavallium.dbengine.database.collections; -import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.count; import static it.cavallium.dbengine.utils.StreamUtils.executing; @@ -104,7 +103,7 @@ public interface DatabaseStageMap> extends Dat } default void putMulti(Stream> entries) { - collectOn(ROCKSDB_POOL, entries, executing(entry -> this.putValue(entry.getKey(), entry.getValue()))); + collectOn(getDbWritePool(), entries, executing(entry -> this.putValue(entry.getKey(), entry.getValue()))); } Stream> getAllStages(@Nullable CompositeSnapshot snapshot, boolean smallRange); @@ -149,7 +148,7 @@ public interface DatabaseStageMap> extends Dat this.setAllEntries(entries.map(entriesReplacer)); } } else { - collectOn(ROCKSDB_POOL, + collectOn(getDbWritePool(), this.getAllEntries(null, smallRange).map(entriesReplacer), executing(replacedEntry -> this.at(null, replacedEntry.getKey()).set(replacedEntry.getValue())) ); @@ -157,7 +156,7 @@ public interface DatabaseStageMap> extends Dat } default void replaceAll(Consumer> entriesReplacer) { - collectOn(ROCKSDB_POOL, this.getAllStages(null, false), executing(entriesReplacer)); + collectOn(getDbWritePool(), this.getAllStages(null, false), executing(entriesReplacer)); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 497bdb3..738d180 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.CompletionException; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.StampedLock; import java.util.stream.Stream; @@ -55,6 +56,8 @@ public sealed abstract class AbstractRocksDBColumn implements private final ColumnFamilyHandle cfh; protected final MeterRegistry meterRegistry; + private final ForkJoinPool dbReadPool; + private final ForkJoinPool dbWritePool; protected final StampedLock closeLock; protected final String columnName; @@ -89,7 +92,9 @@ public sealed abstract class AbstractRocksDBColumn implements String databaseName, ColumnFamilyHandle cfh, MeterRegistry meterRegistry, - StampedLock closeLock) { + StampedLock closeLock, + ForkJoinPool dbReadPool, + ForkJoinPool dbWritePool) { this.db = db; this.cfh = cfh; String columnName; @@ -100,6 +105,8 @@ public sealed abstract class AbstractRocksDBColumn implements } this.columnName = columnName; this.meterRegistry = meterRegistry; + this.dbReadPool = dbReadPool; + this.dbWritePool = dbWritePool; this.closeLock = closeLock; this.keyBufferSize = DistributionSummary @@ -273,6 +280,16 @@ public sealed abstract class AbstractRocksDBColumn implements return cfh; } + @Override + public ForkJoinPool getDbReadPool() { + return dbReadPool; + } + + @Override + public ForkJoinPool getDbWritePool() { + return dbWritePool; + } + protected void ensureOpen() { RocksDBUtils.ensureOpen(db, cfh); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 25830a8..76c9697 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -5,11 +5,9 @@ import static it.cavallium.dbengine.database.LLUtils.isBoundedRange; import static it.cavallium.dbengine.database.LLUtils.mapList; import static it.cavallium.dbengine.database.LLUtils.toStringSafe; import static it.cavallium.dbengine.database.disk.UpdateAtomicResultMode.DELTA; -import static it.cavallium.dbengine.utils.StreamUtils.ROCKSDB_POOL; import static it.cavallium.dbengine.utils.StreamUtils.collectOn; import static it.cavallium.dbengine.utils.StreamUtils.executing; import static it.cavallium.dbengine.utils.StreamUtils.fastSummingLong; -import static it.cavallium.dbengine.utils.StreamUtils.resourceStream; import static java.util.Objects.requireNonNull; import static it.cavallium.dbengine.utils.StreamUtils.batches; @@ -54,6 +52,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; @@ -200,6 +199,16 @@ public class LLLocalDictionary implements LLDictionary { return columnName; } + @Override + public ForkJoinPool getDbReadPool() { + return db.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return db.getDbWritePool(); + } + @NotNull private LLReadOptions generateReadOptionsOrNew(LLSnapshot snapshot) { return generateReadOptions(snapshot != null ? snapshotResolver.apply(snapshot) : null); @@ -355,9 +364,7 @@ public class LLLocalDictionary implements LLDictionary { } assert result != null; return switch (updateReturnMode) { - case NOTHING -> { - yield null; - } + case NOTHING -> null; case GET_NEW_VALUE -> ((UpdateAtomicResultCurrent) result).current(); case GET_OLD_VALUE -> ((UpdateAtomicResultPrevious) result).previous(); }; @@ -451,7 +458,7 @@ public class LLLocalDictionary implements LLDictionary { @Override public void putMulti(Stream entries) { - collectOn(ROCKSDB_POOL, + collectOn(getDbWritePool(), batches(entries, Math.min(MULTI_GET_WINDOW, CAPPED_WRITE_BATCH_CAP)), executing(entriesWindow -> { try (var writeOptions = new LLWriteOptions()) { @@ -778,7 +785,7 @@ public class LLLocalDictionary implements LLDictionary { throw new DBException("Failed to set a range: " + ex.getMessage()); } - collectOn(ROCKSDB_POOL, batches(entries, MULTI_GET_WINDOW), executing(entriesList -> { + collectOn(getDbWritePool(), batches(entries, MULTI_GET_WINDOW), executing(entriesList -> { try (var writeOptions = new LLWriteOptions()) { if (!USE_WRITE_BATCHES_IN_SET_RANGE) { for (LLEntry entry : entriesList) { @@ -814,7 +821,7 @@ public class LLLocalDictionary implements LLDictionary { if (USE_WRITE_BATCHES_IN_SET_RANGE) { throw new UnsupportedOperationException("Can't use write batches in setRange without window. Please fix the parameters"); } - collectOn(ROCKSDB_POOL, this.getRange(null, range, false, smallRange), executing(oldValue -> { + collectOn(getDbWritePool(), this.getRange(null, range, false, smallRange), executing(oldValue -> { try (var writeOptions = new LLWriteOptions()) { db.delete(writeOptions, oldValue.getKey()); } catch (RocksDBException ex) { @@ -822,7 +829,7 @@ public class LLLocalDictionary implements LLDictionary { } })); - collectOn(ROCKSDB_POOL, entries, executing(entry -> { + collectOn(getDbWritePool(), entries, executing(entry -> { if (entry.getKey() != null && entry.getValue() != null) { this.putInternal(entry.getKey(), entry.getValue()); } @@ -1142,7 +1149,7 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); if (PARALLEL_EXACT_SIZE) { - return collectOn(ROCKSDB_POOL, parallelizeRange(LLRange.all()).map(range -> { + return collectOn(getDbReadPool(), parallelizeRange(LLRange.all()).map(range -> { long partialCount = 0; try (var rangeReadOpts = readOpts.copy()) { try { diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 43a5632..d38fccd 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -31,6 +31,7 @@ import it.cavallium.dbengine.rpc.current.data.DatabaseOptions; import it.cavallium.dbengine.rpc.current.data.DatabaseVolume; import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import it.cavallium.dbengine.rpc.current.data.NoFilter; +import it.cavallium.dbengine.utils.StreamUtils; import java.io.File; import java.io.IOException; import it.cavallium.dbengine.utils.DBException; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -126,6 +128,8 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa protected static final Logger logger = LogManager.getLogger(LLLocalKeyValueDatabase.class); private final MeterRegistry meterRegistry; + private ForkJoinPool dbReadPool; + private ForkJoinPool dbWritePool; private final Timer snapshotTime; @@ -159,6 +163,8 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa DatabaseOptions databaseOptions) { this.name = name; this.meterRegistry = meterRegistry; + this.dbReadPool = StreamUtils.newNamedForkJoinPool("db-" + name, false); + this.dbWritePool = StreamUtils.newNamedForkJoinPool("db-" + name, false); this.snapshotTime = Timer .builder("db.snapshot.timer") @@ -186,9 +192,9 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa // Check column names validity for (NamedColumnOptions columnOption : databaseOptions.columnOptions()) { - if (columns.stream().map(Column::name).noneMatch(columnName -> columnName.equals(columnOption.columnName()))) { + if (columns.stream().map(Column::name).noneMatch(columnName -> columnName.equals(columnOption.name()))) { throw new IllegalArgumentException( - "Column " + columnOption.columnName() + " does not exist. Available columns: " + columns + "Column " + columnOption.name() + " does not exist. Available columns: " + columns .stream() .map(Column::name) .collect(Collectors.joining(", ", "[", "]"))); @@ -209,10 +215,10 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa var columnOptions = databaseOptions .columnOptions() .stream() - .filter(opts -> opts.columnName().equals(column.name())) + .filter(opts -> opts.name().equals(column.name())) .findFirst() - .map(opts -> (ColumnOptions) opts) - .orElse(databaseOptions.defaultColumnOptions()); + .map(NamedColumnOptions::options) + .orElseGet(databaseOptions::defaultColumnOptions); //noinspection ConstantConditions if (columnOptions.memtableMemoryBudgetBytes() != null) { @@ -794,6 +800,16 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa return name; } + @Override + public ForkJoinPool getDbReadPool() { + return dbReadPool; + } + + @Override + public ForkJoinPool getDbWritePool() { + return dbWritePool; + } + public StampedLock getCloseLock() { return closeLock; } @@ -1236,17 +1252,21 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa name, cfh, meterRegistry, - closeLock + closeLock, + dbReadPool, + dbWritePool ); } else if (db instanceof TransactionDB transactionDB) { return new PessimisticRocksDBColumn(transactionDB, name, cfh, meterRegistry, - closeLock + closeLock, + dbReadPool, + dbWritePool ); } else { - return new StandardRocksDBColumn(db, name, cfh, meterRegistry, closeLock); + return new StandardRocksDBColumn(db, name, cfh, meterRegistry, closeLock, dbReadPool, dbWritePool); } } @@ -1558,8 +1578,12 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa public void close() { closeRequested = true; if (statistics != null) { - statistics.close(); - statistics = null; + try { + statistics.close(); + statistics = null; + } catch (Exception ex) { + logger.error("Failed to close db statistics", ex); + } } try { flushAndCloseDb(db, @@ -1575,6 +1599,23 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa deleteUnusedOldLogFiles(); } catch (Exception e) { throw new DBException("Failed to close", e); + } finally { + if (dbReadPool != null) { + try { + dbReadPool.close(); + dbReadPool = null; + } catch (Exception ex) { + logger.error("Failed to close db pool", ex); + } + } + if (dbWritePool != null) { + try { + dbWritePool.close(); + dbWritePool = null; + } catch (Exception ex) { + logger.error("Failed to close db pool", ex); + } + } } } @@ -1590,7 +1631,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa private void resumeWrites() { try { db.continueBackgroundWork(); - db.enableFileDeletions(false); + db.enableFileDeletions(); } catch (RocksDBException e) { throw new DBException(e); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java index 7236dca..0ee0746 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalSingleton.java @@ -16,6 +16,7 @@ import it.cavallium.dbengine.utils.DBException; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinPool; import java.util.function.Function; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -135,6 +136,16 @@ public class LLLocalSingleton implements LLSingleton { return databaseName; } + @Override + public ForkJoinPool getDbReadPool() { + return db.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return db.getDbWritePool(); + } + @Override public String getColumnName() { return columnName; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index b1c229c..e462876 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -13,6 +13,7 @@ import it.cavallium.dbengine.database.serialization.SerializationFunction; import it.cavallium.dbengine.lucene.ExponentialPageLimits; import it.cavallium.dbengine.utils.DBException; import java.io.IOException; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.StampedLock; @@ -38,8 +39,10 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn String dbName, ColumnFamilyHandle cfh, MeterRegistry meterRegistry, - StampedLock closeLock) { - super(db, dbName, cfh, meterRegistry, closeLock); + StampedLock closeLock, + ForkJoinPool dbReadPool, + ForkJoinPool dbWritePool) { + super(db, dbName, cfh, meterRegistry, closeLock, dbReadPool, dbWritePool); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java index e7e6651..95b54e7 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryDictionary.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -449,4 +450,14 @@ public class LLMemoryDictionary implements LLDictionary { public String getDatabaseName() { return databaseName; } + + @Override + public ForkJoinPool getDbReadPool() { + return ForkJoinPool.commonPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return ForkJoinPool.commonPool(); + } } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index da1d459..235378d 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; import org.jetbrains.annotations.Nullable; @@ -162,6 +163,16 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { return name; } + @Override + public ForkJoinPool getDbReadPool() { + return ForkJoinPool.commonPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return ForkJoinPool.commonPool(); + } + @Override public LLSnapshot takeSnapshot() { var snapshotNumber = nextSnapshotNumber.getAndIncrement(); diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java index 86a2eee..0a8906b 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemorySingleton.java @@ -8,6 +8,7 @@ import it.cavallium.dbengine.database.LLSnapshot; import it.cavallium.dbengine.database.UpdateReturnMode; import it.cavallium.dbengine.database.serialization.SerializationFunction; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ForkJoinPool; import org.jetbrains.annotations.Nullable; public class LLMemorySingleton implements LLSingleton { @@ -27,6 +28,16 @@ public class LLMemorySingleton implements LLSingleton { return dict.getDatabaseName(); } + @Override + public ForkJoinPool getDbReadPool() { + return dict.getDbReadPool(); + } + + @Override + public ForkJoinPool getDbWritePool() { + return dict.getDbWritePool(); + } + @Override public Buf get(@Nullable LLSnapshot snapshot) { return dict.get(snapshot, singletonName); diff --git a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java index d434b15..58d6dc3 100644 --- a/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java +++ b/src/main/java/it/cavallium/dbengine/utils/StreamUtils.java @@ -44,10 +44,6 @@ public class StreamUtils { public static final ForkJoinPool LUCENE_POOL = newNamedForkJoinPool("Lucene", false); - public static final ForkJoinPool GRAPH_POOL = newNamedForkJoinPool("Graph", false); - - public static final ForkJoinPool ROCKSDB_POOL = newNamedForkJoinPool("RocksDB", false); - private static final Collector TO_LIST_FAKE_COLLECTOR = new FakeCollector(); private static final Collector COUNT_FAKE_COLLECTOR = new FakeCollector(); private static final Collector FIRST_FAKE_COLLECTOR = new FakeCollector(); diff --git a/src/test/java/it/cavallium/dbengine/tests/TestVersionsLeak.java b/src/test/java/it/cavallium/dbengine/tests/TestVersionsLeak.java index 8b551f5..5686d7b 100644 --- a/src/test/java/it/cavallium/dbengine/tests/TestVersionsLeak.java +++ b/src/test/java/it/cavallium/dbengine/tests/TestVersionsLeak.java @@ -24,6 +24,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.concurrent.ForkJoinPool; import java.util.stream.Stream; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; @@ -70,7 +71,7 @@ public class TestVersionsLeak { var keyF = key; toByteArray(key, keyBytes); - StreamUtils.collectOn(StreamUtils.ROCKSDB_POOL, + StreamUtils.collectOn(ForkJoinPool.commonPool(), Stream.of(1, 2, 3, 4).parallel(), StreamUtils.executing(x -> { dict.put(Buf.wrap(keyBytes), val, LLDictionaryResultType.PREVIOUS_VALUE);