From a505b3500f6c44757026a6f2c29434ec02ecef30 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Thu, 7 Dec 2023 00:14:28 +0100 Subject: [PATCH] Partial commit --- .../it/cavallium/rockserver/core/Main.java | 14 +- .../rockserver/core/client/ClientBuilder.java | 17 +- .../core/client/EmbeddedConnection.java | 9 +- .../core/common/RocksDBException.java | 2 +- .../rockserver/core/common/Utils.java | 40 +- .../core/config/BloomFilterConfig.java | 8 +- .../core/config/ColumnLevelConfig.java | 12 + .../rockserver/core/config/ConfigPrinter.java | 126 ++-- .../core/config/DatabaseConfig.java | 4 +- .../rockserver/core/config/DatabaseLevel.java | 11 - .../core/config/FallbackColumnConfig.java | 28 + .../core/config/FallbackColumnOptions.java | 19 - .../core/config/GlobalDatabaseConfig.java | 36 +- .../core/config/NamedColumnConfig.java | 9 + .../core/config/NamedColumnOptions.java | 7 - .../rockserver/core/config/VolumeConfig.java | 12 + .../rockserver/core/impl/ColumnInstance.java | 36 +- .../rockserver/core/impl/DataSizeDecoder.java | 2 +- .../core/impl/DbCompressionDecoder.java | 2 +- .../rockserver/core/impl/EmbeddedDB.java | 178 ++++-- .../core/impl/rocksdb/CacheFactory.java | 8 + .../core/impl/rocksdb/ClockCacheFactory.java | 12 + .../core/impl/rocksdb/LRUCacheFactory.java | 12 + .../core/impl/rocksdb/RocksDBLoader.java | 547 ++++++++++++++++++ .../core/impl/rocksdb/RocksLogger.java | 11 + .../core/impl/rocksdb/TransactionalDB.java | 26 +- .../rockserver/core/resources/default.conf | 7 + .../core/impl/test/EmbeddedDBTest.java | 53 +- 28 files changed, 1036 insertions(+), 212 deletions(-) create mode 100644 src/main/java/it/cavallium/rockserver/core/config/ColumnLevelConfig.java delete mode 100644 src/main/java/it/cavallium/rockserver/core/config/DatabaseLevel.java create mode 100644 src/main/java/it/cavallium/rockserver/core/config/FallbackColumnConfig.java delete mode 100644 src/main/java/it/cavallium/rockserver/core/config/FallbackColumnOptions.java create mode 100644 src/main/java/it/cavallium/rockserver/core/config/NamedColumnConfig.java delete mode 100644 src/main/java/it/cavallium/rockserver/core/config/NamedColumnOptions.java create mode 100644 src/main/java/it/cavallium/rockserver/core/config/VolumeConfig.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/CacheFactory.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/ClockCacheFactory.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/LRUCacheFactory.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksLogger.java diff --git a/src/main/java/it/cavallium/rockserver/core/Main.java b/src/main/java/it/cavallium/rockserver/core/Main.java index 52727c8..4f6ccae 100644 --- a/src/main/java/it/cavallium/rockserver/core/Main.java +++ b/src/main/java/it/cavallium/rockserver/core/Main.java @@ -1,20 +1,15 @@ package it.cavallium.rockserver.core; +import static it.cavallium.rockserver.core.client.EmbeddedConnection.PRIVATE_MEMORY_URL; import static java.util.Objects.requireNonNull; import inet.ipaddr.HostName; -import it.cavallium.rockserver.core.client.ClientBuilder; + import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.UnixDomainSocketAddress; -import java.net.spi.InetAddressResolver; -import java.net.spi.InetAddressResolverProvider; -import java.nio.file.Files; import java.nio.file.Path; -import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; @@ -34,7 +29,7 @@ public class Main { .description("RocksDB server core"); parser.addArgument("-u", "--url") .type(String.class) - .setDefault("file://" + System.getProperty("user.home") + "/rockserver-core-db") + .setDefault(PRIVATE_MEMORY_URL.toString()) .help("Specify database rocksdb://hostname:port, or unix://, or file://"); parser.addArgument("-n", "--name") .type(String.class) @@ -85,7 +80,8 @@ public class Main { switch (url.getScheme()) { case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(url.getPath()))); - case "file" -> clientBuilder.setEmbedded(Path.of(url.getPath())); + case "file" -> clientBuilder.setEmbeddedPath(Path.of(url.getPath())); + case "memory" -> clientBuilder.setEmbeddedInMemory(true); case "rocksdb" -> clientBuilder.setAddress(new HostName(url.getHost()).asInetSocketAddress()); default -> throw new IllegalArgumentException("Invalid scheme: " + url.getScheme()); } diff --git a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java index 77fe955..547c3c9 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java +++ b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java @@ -8,12 +8,17 @@ public class ClientBuilder { private InetSocketAddress iNetAddress; private UnixDomainSocketAddress unixAddress; - private Path embedded; + private Path embeddedPath; private String name; private Path embeddedConfig; + private boolean embeddedInMemory; - public void setEmbedded(Path path) { - this.embedded = path; + public void setEmbeddedPath(Path path) { + this.embeddedPath = path; + } + + public void setEmbeddedInMemory(boolean inMemory) { + this.embeddedInMemory = inMemory; } public void setUnixSocket(UnixDomainSocketAddress address) { @@ -33,8 +38,10 @@ public class ClientBuilder { } public RocksDBConnection build() { - if (embedded != null) { - return new EmbeddedConnection(embedded, name, embeddedConfig); + if (embeddedInMemory) { + return new EmbeddedConnection(null, name, embeddedConfig); + } else if (embeddedPath != null) { + return new EmbeddedConnection(embeddedPath, name, embeddedConfig); } else if (unixAddress != null) { return new SocketConnectionUnix(unixAddress, name); } else if (iNetAddress != null) { diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index c8f5f90..9067263 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -10,15 +10,18 @@ import java.io.IOException; import java.lang.foreign.MemorySegment; import java.net.URI; import java.nio.file.Path; +import java.util.Optional; + import org.jetbrains.annotations.Nullable; public class EmbeddedConnection extends BaseConnection { private final EmbeddedDB db; + public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private"); - public EmbeddedConnection(Path path, String name, Path embeddedConfig) { + public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) { super(name); - this.db = new EmbeddedDB(path, embeddedConfig); + this.db = new EmbeddedDB(path, name, embeddedConfig); } @Override @@ -29,7 +32,7 @@ public class EmbeddedConnection extends BaseConnection { @Override public URI getUrl() { - return db.getPath().toUri(); + return Optional.ofNullable(db.getPath()).map(Path::toUri).orElse(PRIVATE_MEMORY_URL); } @Override diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java index c89b26e..718d6b7 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -5,7 +5,7 @@ public class RocksDBException extends RuntimeException { private final RocksDBErrorType errorUniqueId; public enum RocksDBErrorType { - PUT_UNKNOWN, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, VALUE_MUST_BE_NULL + PUT_UNKNOWN, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, ROCKSDB_CONFIG_ERROR, VALUE_MUST_BE_NULL, DIRECTORY_DELETE, KEY_LENGTH_MISMATCH, UNSUPPORTED_HASH_SIZE, RAW_KEY_LENGTH_MISMATCH, KEYS_COUNT_MISMATCH, ROCKSDB_LOAD_ERROR } diff --git a/src/main/java/it/cavallium/rockserver/core/common/Utils.java b/src/main/java/it/cavallium/rockserver/core/common/Utils.java index d7ed266..a2a30de 100644 --- a/src/main/java/it/cavallium/rockserver/core/common/Utils.java +++ b/src/main/java/it/cavallium/rockserver/core/common/Utils.java @@ -2,8 +2,17 @@ package it.cavallium.rockserver.core.common; import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES; -import java.lang.foreign.Arena; +import java.io.IOException; import java.lang.foreign.MemorySegment; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Stream; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -40,11 +49,36 @@ public class Utils { } @NotNull - public static MemorySegment toMemorySegment(Arena arena, byte @Nullable [] array) { + public static MemorySegment toMemorySegment(byte @Nullable [] array) { if (array != null) { - return arena.allocateArray(BIG_ENDIAN_BYTES, array); + return MemorySegment.ofArray(array); } else { return MemorySegment.NULL; } } + + public static byte[] toByteArray(MemorySegment memorySegment) { + return memorySegment.toArray(BIG_ENDIAN_BYTES); + } + + public static List mapList(Collection input, Function mapper) { + var result = new ArrayList(input.size()); + input.forEach(t -> result.add(mapper.apply(t))); + return result; + } + + public static void deleteDirectory(String path) throws RocksDBException { + try (Stream pathStream = Files.walk(Path.of(path))) { + pathStream.sorted(Comparator.reverseOrder()) + .forEach(f -> { + try { + Files.deleteIfExists(f); + } catch (IOException e) { + throw new RocksDBException(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e); + } + }); + } catch (IOException e) { + throw new RocksDBException(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e); + } + } } diff --git a/src/main/java/it/cavallium/rockserver/core/config/BloomFilterConfig.java b/src/main/java/it/cavallium/rockserver/core/config/BloomFilterConfig.java index 1ec6f03..ba15c50 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/BloomFilterConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/BloomFilterConfig.java @@ -1,9 +1,13 @@ package it.cavallium.rockserver.core.config; +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.Nullable; + public interface BloomFilterConfig { - int bitsPerKey(); + int bitsPerKey() throws GestaltException; - boolean optimizeForHits(); + @Nullable + Boolean optimizeForHits() throws GestaltException; } diff --git a/src/main/java/it/cavallium/rockserver/core/config/ColumnLevelConfig.java b/src/main/java/it/cavallium/rockserver/core/config/ColumnLevelConfig.java new file mode 100644 index 0000000..b6a57db --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/ColumnLevelConfig.java @@ -0,0 +1,12 @@ +package it.cavallium.rockserver.core.config; + +import org.github.gestalt.config.exceptions.GestaltException; +import org.rocksdb.CompressionType; + +public interface ColumnLevelConfig { + + CompressionType compression() throws GestaltException; + + DataSize maxDictBytes() throws GestaltException; + +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java b/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java index 17afde3..7855b5a 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java +++ b/src/main/java/it/cavallium/rockserver/core/config/ConfigPrinter.java @@ -1,13 +1,12 @@ package it.cavallium.rockserver.core.config; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; +import org.github.gestalt.config.exceptions.GestaltException; + +import java.util.*; public class ConfigPrinter { - public static String stringifyBloomFilter(BloomFilterConfig o) { + public static String stringifyBloomFilter(BloomFilterConfig o) throws GestaltException { return """ { "bits-per-key": %d, @@ -16,14 +15,14 @@ public class ConfigPrinter { """.formatted(o.bitsPerKey(), o.optimizeForHits()); } - public static String stringifyDatabase(DatabaseConfig o) { + public static String stringifyDatabase(DatabaseConfig o) throws GestaltException { return """ { "global": %s }""".formatted(stringifyGlobalDatabase(o.global())); } - public static String stringifyLevel(DatabaseLevel o) { + public static String stringifyLevel(ColumnLevelConfig o) throws GestaltException { return """ { "compression": "%s", @@ -32,29 +31,29 @@ public class ConfigPrinter { """.formatted(o.compression(), o.maxDictBytes()); } - public static String stringifyFallbackColumn(FallbackColumnOptions o) { - return """ - { - "levels": %s, - "memtable-memory-budget-bytes": "%s", - "cache-index-and-filter-blocks": %b, - "partition-filters": %s, - "bloom-filter": %s, - "block-size": "%s", - "write-buffer-size": "%s" - }\ - """.formatted(Arrays.stream(Objects.requireNonNullElse(o.levels(), new DatabaseLevel[0])) - .map(ConfigPrinter::stringifyLevel).collect(Collectors.joining(",", "[", "]")), - o.memtableMemoryBudgetBytes(), - o.cacheIndexAndFilterBlocks(), - o.partitionFilters(), - stringifyBloomFilter(o.bloomFilter()), - o.blockSize(), - o.writeBufferSize() - ); + private static List getVolumeConfigs(GlobalDatabaseConfig g) throws GestaltException { + try { + return List.of(g.volumes()); + } catch (GestaltException ex) { + if (ex.getMessage().equals("Failed to get proxy config while calling method: volumes in path: database.global.")) { + return List.of(); + } else { + throw ex; + } + } } - public static String stringifyGlobalDatabase(GlobalDatabaseConfig o) { + public static String stringifyGlobalDatabase(GlobalDatabaseConfig o) throws GestaltException { + StringJoiner joiner = new StringJoiner(",", "[", "]"); + for (NamedColumnConfig namedColumnConfig : Objects.requireNonNullElse(o.columnOptions(), new NamedColumnConfig[0])) { + String s = stringifyNamedColumn(namedColumnConfig); + joiner.add(s); + } + StringJoiner result = new StringJoiner(",", "[", "]"); + for (VolumeConfig volumeConfig : getVolumeConfigs(o)) { + String s = stringifyVolume(volumeConfig); + result.add(s); + } return """ { "spinning": %b, @@ -66,6 +65,9 @@ public class ConfigPrinter { "block-cache": "%s", "write-buffer-manager": "%s", "log-path": "%s", + "wal-path": "%s", + "absolute-consistency": %b, + "volumes": %s, "fallback-column-options": %s, "column-options": %s }\ @@ -78,14 +80,69 @@ public class ConfigPrinter { o.blockCache(), o.writeBufferManager(), o.logPath(), + o.walPath(), + o.absoluteConsistency(), + result.toString(), stringifyFallbackColumn(o.fallbackColumnOptions()), - Arrays.stream(Objects.requireNonNullElse(o.columnOptions(), new NamedColumnOptions[0])) - .map(ConfigPrinter::stringifyNamedColumn) - .collect(Collectors.joining(",", "[", "]")) + joiner.toString() ); } - public static String stringifyNamedColumn(NamedColumnOptions o) { + private static String stringifyVolume(VolumeConfig o) throws GestaltException { + return """ + { + "volume-path": "%s", + "target-size-bytes": %b + }\ + """.formatted(o.volumePath(), + o.targetSizeBytes() + ); + } + + public static String stringifyFallbackColumn(FallbackColumnConfig o) throws GestaltException { + StringJoiner joiner = new StringJoiner(",", "[", "]"); + for (ColumnLevelConfig columnLevelConfig : Objects.requireNonNullElse(o.levels(), new ColumnLevelConfig[0])) { + String s = stringifyLevel(columnLevelConfig); + joiner.add(s); + } + String bloom = null; + BloomFilterConfig value = o.bloomFilter(); + if (value != null) { + String s = stringifyBloomFilter(value); + if (s != null) bloom = s; + } + return """ + { + "levels": %s, + "memtable-memory-budget-bytes": "%s", + "cache-index-and-filter-blocks": %b, + "partition-filters": %s, + "bloom-filter": %s, + "block-size": "%s", + "write-buffer-size": "%s" + }\ + """.formatted(joiner.toString(), + o.memtableMemoryBudgetBytes(), + o.cacheIndexAndFilterBlocks(), + o.partitionFilters(), + bloom, + o.blockSize(), + o.writeBufferSize() + ); + } + + public static String stringifyNamedColumn(NamedColumnConfig o) throws GestaltException { + StringJoiner joiner = new StringJoiner(",", "[", "]"); + for (ColumnLevelConfig columnLevelConfig : (o.levels() != null ? List.of(o.levels()) : List.of())) { + String s = stringifyLevel(columnLevelConfig); + joiner.add(s); + } + String bloom = null; + BloomFilterConfig value = o.bloomFilter(); + if (value != null) { + String s = stringifyBloomFilter(value); + if (s != null) bloom = s; + } return """ { "name": "%s", @@ -98,12 +155,11 @@ public class ConfigPrinter { "write-buffer-size": "%s" }\ """.formatted(o.name(), - (o.levels() != null ? List.of(o.levels()) : List.of()).stream() - .map(ConfigPrinter::stringifyLevel).collect(Collectors.joining(",", "[", "]")), + joiner.toString(), o.memtableMemoryBudgetBytes(), o.cacheIndexAndFilterBlocks(), o.partitionFilters(), - stringifyBloomFilter(o.bloomFilter()), + bloom, o.blockSize(), o.writeBufferSize() ); diff --git a/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java index fc01d49..3534e36 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java @@ -1,7 +1,9 @@ package it.cavallium.rockserver.core.config; +import org.github.gestalt.config.exceptions.GestaltException; + public interface DatabaseConfig { - GlobalDatabaseConfig global(); + GlobalDatabaseConfig global() throws GestaltException; } diff --git a/src/main/java/it/cavallium/rockserver/core/config/DatabaseLevel.java b/src/main/java/it/cavallium/rockserver/core/config/DatabaseLevel.java deleted file mode 100644 index b2a557d..0000000 --- a/src/main/java/it/cavallium/rockserver/core/config/DatabaseLevel.java +++ /dev/null @@ -1,11 +0,0 @@ -package it.cavallium.rockserver.core.config; - -import org.rocksdb.CompressionType; - -public interface DatabaseLevel { - - CompressionType compression(); - - DataSize maxDictBytes(); - -} diff --git a/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnConfig.java b/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnConfig.java new file mode 100644 index 0000000..c9b310e --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnConfig.java @@ -0,0 +1,28 @@ +package it.cavallium.rockserver.core.config; + +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.Nullable; + +public interface FallbackColumnConfig { + + ColumnLevelConfig[] levels() throws GestaltException; + + @Nullable + DataSize memtableMemoryBudgetBytes() throws GestaltException; + + @Nullable + Boolean cacheIndexAndFilterBlocks() throws GestaltException; + + @Nullable + Boolean partitionFilters() throws GestaltException; + + @Nullable + BloomFilterConfig bloomFilter() throws GestaltException; + + @Nullable + DataSize blockSize() throws GestaltException; + + @Nullable + DataSize writeBufferSize() throws GestaltException; + +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnOptions.java b/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnOptions.java deleted file mode 100644 index 040e8e7..0000000 --- a/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnOptions.java +++ /dev/null @@ -1,19 +0,0 @@ -package it.cavallium.rockserver.core.config; - -public interface FallbackColumnOptions { - - DatabaseLevel[] levels(); - - DataSize memtableMemoryBudgetBytes(); - - boolean cacheIndexAndFilterBlocks(); - - boolean partitionFilters(); - - BloomFilterConfig bloomFilter(); - - DataSize blockSize(); - - DataSize writeBufferSize(); - -} diff --git a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java index 028e683..cad7a5e 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java @@ -1,28 +1,42 @@ package it.cavallium.rockserver.core.config; +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.Nullable; + import java.nio.file.Path; public interface GlobalDatabaseConfig { - boolean spinning(); + boolean spinning() throws GestaltException; - boolean checksum(); + boolean checksum() throws GestaltException; - boolean useDirectIo(); + boolean useDirectIo() throws GestaltException; - boolean allowRocksdbMemoryMapping(); + boolean allowRocksdbMemoryMapping() throws GestaltException; - int maximumOpenFiles(); + @Nullable + Integer maximumOpenFiles() throws GestaltException; - boolean optimistic(); + boolean optimistic() throws GestaltException; - DataSize blockCache(); + @Nullable + DataSize blockCache() throws GestaltException; - DataSize writeBufferManager(); + @Nullable + DataSize writeBufferManager() throws GestaltException; - Path logPath(); + @Nullable + Path logPath() throws GestaltException; - FallbackColumnOptions fallbackColumnOptions(); - NamedColumnOptions[] columnOptions(); + @Nullable + Path walPath() throws GestaltException; + + boolean absoluteConsistency() throws GestaltException; + + VolumeConfig[] volumes() throws GestaltException; + + FallbackColumnConfig fallbackColumnOptions() throws GestaltException; + NamedColumnConfig[] columnOptions() throws GestaltException; } diff --git a/src/main/java/it/cavallium/rockserver/core/config/NamedColumnConfig.java b/src/main/java/it/cavallium/rockserver/core/config/NamedColumnConfig.java new file mode 100644 index 0000000..a2e08e8 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/NamedColumnConfig.java @@ -0,0 +1,9 @@ +package it.cavallium.rockserver.core.config; + +import org.github.gestalt.config.exceptions.GestaltException; + +public interface NamedColumnConfig extends FallbackColumnConfig { + + String name() throws GestaltException; + +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/NamedColumnOptions.java b/src/main/java/it/cavallium/rockserver/core/config/NamedColumnOptions.java deleted file mode 100644 index 91b62fa..0000000 --- a/src/main/java/it/cavallium/rockserver/core/config/NamedColumnOptions.java +++ /dev/null @@ -1,7 +0,0 @@ -package it.cavallium.rockserver.core.config; - -public interface NamedColumnOptions extends FallbackColumnOptions { - - String name(); - -} diff --git a/src/main/java/it/cavallium/rockserver/core/config/VolumeConfig.java b/src/main/java/it/cavallium/rockserver/core/config/VolumeConfig.java new file mode 100644 index 0000000..ee6fe87 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/VolumeConfig.java @@ -0,0 +1,12 @@ +package it.cavallium.rockserver.core.config; + +import org.github.gestalt.config.exceptions.GestaltException; + +import java.nio.file.Path; + +public interface VolumeConfig { + + Path volumePath() throws GestaltException; + + long targetSizeBytes() throws GestaltException; +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java b/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java index 33cf761..3f6ef51 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java @@ -82,10 +82,16 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi private MemorySegment computeKeyAt(Arena arena, int i, MemorySegment[] keys) { if (i < schema.keys().length - schema.variableLengthKeysCount()) { + if (keys[i].byteSize() != schema.keys()[i]) { + throw new RocksDBException(RocksDBErrorType.KEY_LENGTH_MISMATCH, + "Key at index " + i + " has a different length than expected! Expected: " + schema.keys()[i] + + ", received: " + keys[i].byteSize()); + } return keys[i]; } else { if (schema.keys()[i] != Integer.BYTES) { - throw new UnsupportedOperationException("Hash size different than 32-bit is currently unsupported"); + throw new RocksDBException(RocksDBErrorType.UNSUPPORTED_HASH_SIZE, + "Hash size different than 32-bit is currently unsupported"); } else { return XXHash32.getInstance().hash(arena, keys[i], 0, 0, 0); } @@ -94,17 +100,17 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi private void validateFinalKeySize(MemorySegment key) { if (finalKeySizeBytes != key.byteSize()) { - throw new IllegalArgumentException( - "Keys size must be equal to the column keys size. Expected " + finalKeySizeBytes + ", got " - + key.byteSize()); + throw new RocksDBException(RocksDBErrorType.RAW_KEY_LENGTH_MISMATCH, + "Keys size must be equal to the column keys size. Expected: " + + finalKeySizeBytes + ", got: " + key.byteSize()); } } private void validateKeyCount(MemorySegment[] keys) { if (schema.keys().length != keys.length) { - throw new IllegalArgumentException( - "Keys count must be equal to the column keys count. Expected " + schema.keys().length + ", got " - + keys.length); + throw new RocksDBException(RocksDBErrorType.KEYS_COUNT_MISMATCH, + "Keys count must be equal to the column keys count. Expected: " + schema.keys().length + + ", got: " + keys.length); } } @@ -118,8 +124,10 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi long offset = 0; for (MemorySegment keyI : variableKeys) { var keyISize = keyI.byteSize(); - bucketElementKey.set(BIG_ENDIAN_CHAR_UNALIGNED, offset += Character.BYTES, toCharExact(keyISize)); - MemorySegment.copy(keyI, 0, bucketElementKey, offset += keyISize, keyISize); + bucketElementKey.set(BIG_ENDIAN_CHAR_UNALIGNED, offset, toCharExact(keyISize)); + offset += Character.BYTES; + MemorySegment.copy(keyI, 0, bucketElementKey, offset, keyISize); + offset += keyISize; } assert offset == totalSize; return bucketElementKey; @@ -135,11 +143,13 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi } public void checkNullableValue(MemorySegment value) { - if (schema.hasValue() == (value == null || value == MemorySegment.NULL)) { + if (schema.hasValue() == (value == null)) { if (schema.hasValue()) { - throw new RocksDBException(RocksDBErrorType.UNEXPECTED_NULL_VALUE, "Schema expects a value, but a null value has been passed"); + throw new RocksDBException(RocksDBErrorType.UNEXPECTED_NULL_VALUE, + "Schema expects a value, but a null value has been passed"); } else { - throw new RocksDBException(RocksDBErrorType.VALUE_MUST_BE_NULL, "Schema expects no value, but a non-null value has been passed"); + throw new RocksDBException(RocksDBErrorType.VALUE_MUST_BE_NULL, + "Schema expects no value, but a non-null value has been passed"); } } } @@ -149,7 +159,7 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi checkNullableValue(computedBucketElementValue); var keySize = computedBucketElementKey.byteSize(); var valueSize = computedBucketElementValue != null ? computedBucketElementValue.byteSize() : 0; - var totalSize = keySize + valueSize; + var totalSize = Integer.BYTES + keySize + valueSize; var computedBucketElementKV = arena.allocate(totalSize); computedBucketElementKV.set(BIG_ENDIAN_INT, 0, toIntExact(totalSize)); MemorySegment.copy(computedBucketElementKey, 0, computedBucketElementKV, Integer.BYTES, keySize); diff --git a/src/main/java/it/cavallium/rockserver/core/impl/DataSizeDecoder.java b/src/main/java/it/cavallium/rockserver/core/impl/DataSizeDecoder.java index 2a4ec08..293c712 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/DataSizeDecoder.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/DataSizeDecoder.java @@ -24,7 +24,7 @@ class DataSizeDecoder implements Decoder { @Override public boolean matches(TypeCapture klass) { - return klass.isAssignableFrom(DataSize.class); + return klass != null && klass.isAssignableFrom(DataSize.class); } @Override diff --git a/src/main/java/it/cavallium/rockserver/core/impl/DbCompressionDecoder.java b/src/main/java/it/cavallium/rockserver/core/impl/DbCompressionDecoder.java index eb511c6..03f9020 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/DbCompressionDecoder.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/DbCompressionDecoder.java @@ -26,7 +26,7 @@ public class DbCompressionDecoder implements Decoder { @Override public boolean matches(TypeCapture klass) { - return klass.isAssignableFrom(CompressionType.class); + return klass != null && klass.isAssignableFrom(CompressionType.class); } @Override diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index 4abf851..9ce0aad 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -19,6 +19,7 @@ import it.cavallium.rockserver.core.common.Utils; import it.cavallium.rockserver.core.config.ConfigPrinter; import it.cavallium.rockserver.core.config.DatabaseConfig; import it.cavallium.rockserver.core.impl.rocksdb.REntry; +import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader; import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; @@ -42,34 +43,31 @@ import org.github.gestalt.config.exceptions.GestaltException; import org.github.gestalt.config.source.ClassPathConfigSource; import org.github.gestalt.config.source.FileConfigSource; import org.jetbrains.annotations.Nullable; -import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.Transaction; -import org.rocksdb.WriteOptions; +import org.rocksdb.*; public class EmbeddedDB implements RocksDBAPI, Closeable { private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; private static final boolean USE_FAST_GET = true; private final Logger logger; - private final Path path; - private final Path embeddedConfigPath; + private final @Nullable Path path; + private final @Nullable Path embeddedConfigPath; private final DatabaseConfig config; private TransactionalDB db; private final NonBlockingHashMapLong columns; private final ConcurrentMap columnNamesIndex; private final NonBlockingHashMapLong> txs; + private final NonBlockingHashMapLong> its; private final SafeShutdown ops; private final Object columnEditLock = new Object(); - public EmbeddedDB(Path path, @Nullable Path embeddedConfigPath) { + public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) { this.path = path; this.embeddedConfigPath = embeddedConfigPath; - this.logger = Logger.getLogger("db"); + this.logger = Logger.getLogger("db." + name); this.columns = new NonBlockingHashMapLong<>(); this.txs = new NonBlockingHashMapLong<>(); + this.its = new NonBlockingHashMapLong<>(); this.columnNamesIndex = new ConcurrentHashMap<>(); this.ops = new SafeShutdown(); @@ -88,7 +86,10 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { gestalt.loadConfigs(); this.config = gestalt.getConfig("database", DatabaseConfig.class); - logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringifyDatabase(this.config)); + this.db = RocksDBLoader.load(path, config, logger); + if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) { + logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringifyDatabase(this.config)); + } } catch (GestaltException e) { throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.CONFIG_ERROR, e); } @@ -143,6 +144,9 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { // Wait for 10 seconds try { ops.closeAndWait(10_000); + if (path == null) { + Utils.deleteDirectory(db.getPath()); + } } catch (TimeoutException e) { logger.log(Level.SEVERE, "Some operations lasted more than 10 seconds, forcing database shutdown..."); } @@ -150,61 +154,83 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { @Override public long openTransaction(long timeoutMs) { + // Open the transaction operation, do not close until the transaction has been closed ops.beginOp(); - TransactionalOptions txOpts = db.createTransactionalOptions(); - var writeOpts = new WriteOptions(); - var tx = new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts)); - return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2); + try { + TransactionalOptions txOpts = db.createTransactionalOptions(); + var writeOpts = new WriteOptions(); + var tx = new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts)); + return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2); + } catch (Throwable ex) { + ops.endOp(); + throw ex; + } } @Override public void closeTransaction(long transactionId) { - var tx = txs.remove(transactionId); - if (tx != null) { - try { - tx.close(); - } finally { - ops.endOp(); + ops.beginOp(); + try { + var tx = txs.remove(transactionId); + if (tx != null) { + try { + tx.close(); + } finally { + // Close the transaction operation + ops.endOp(); + } + } else { + throw new NoSuchElementException("Transaction not found: " + transactionId); } - } else { - throw new NoSuchElementException("Transaction not found: " + transactionId); + } finally { + ops.endOp(); } } @Override public long createColumn(String name, ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException { - synchronized (columnEditLock) { - var colId = getColumnIdOrNull(name); - var col = colId != null ? getColumn(colId) : null; - if (col != null) { - if (schema.equals(col.schema())) { - return colId; + ops.beginOp(); + try { + synchronized (columnEditLock) { + var colId = getColumnIdOrNull(name); + var col = colId != null ? getColumn(colId) : null; + if (col != null) { + if (schema.equals(col.schema())) { + return colId; + } else { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_EXISTS, + "Column exists, with a different schema: " + name + ); + } } else { - throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_EXISTS, - "Column exists, with a different schema: " + name - ); - } - } else { - try { - var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8))); - return registerColumn(new ColumnInstance(cf, schema)); - } catch (RocksDBException e) { - throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_CREATE_FAIL, e); + try { + var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8))); + return registerColumn(new ColumnInstance(cf, schema)); + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_CREATE_FAIL, e); + } } } + } finally { + ops.endOp(); } } @Override public void deleteColumn(long columnId) throws it.cavallium.rockserver.core.common.RocksDBException { - synchronized (columnEditLock) { - var col = getColumn(columnId); - try { - db.get().dropColumnFamily(col.cfh()); - unregisterColumn(columnId).close(); - } catch (RocksDBException e) { - throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_DELETE_FAIL, e); + ops.beginOp(); + try { + synchronized (columnEditLock) { + var col = getColumn(columnId); + try { + db.get().dropColumnFamily(col.cfh()); + unregisterColumn(columnId).close(); + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_DELETE_FAIL, e); + } } + } finally { + ops.endOp(); } } @@ -255,10 +281,10 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { var bucketElementKeys = col.getBucketElementKeys(keys); try (var readOptions = new ReadOptions()) { var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); - MemorySegment previousRawBucket = Utils.toMemorySegment(arena, previousRawBucketByteArray); + MemorySegment previousRawBucket = toMemorySegment(previousRawBucketByteArray); var bucket = new Bucket(col, previousRawBucket); previousValue = bucket.addElement(bucketElementKeys, value); - tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(bucket.toSegment(arena))); + tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(bucket.toSegment(arena))); } catch (RocksDBException e) { throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_1, e); } @@ -281,7 +307,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { } else { previousValueByteArray = db.get().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); } - previousValue = toMemorySegment(arena, previousValueByteArray); + previousValue = toMemorySegment(previousValueByteArray); } catch (RocksDBException e) { throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e); } @@ -289,7 +315,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { previousValue = null; } if (tx != null) { - tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(requireNonNullElse(value, NULL))); + tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(requireNonNullElse(value, NULL))); } else { try (var w = new WriteOptions()) { db.get().put(col.cfh(), w, calculatedKey.asByteBuffer(), requireNonNullElse(value, NULL).asByteBuffer()); @@ -384,18 +410,45 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { @Nullable MemorySegment[] endKeysExclusive, boolean reverse, long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { - return 0; + // Open an operation that ends when the iterator is closed + ops.beginOp(); + try { + var col = getColumn(columnId); + RocksIterator it; + var ro = new ReadOptions(); + if (transactionId > 0L) { + //noinspection resource + it = getTransaction(transactionId).val().getIterator(ro, col.cfh()); + } else { + it = db.get().newIterator(col.cfh()); + } + var itEntry = new REntry<>(it, new RocksDBObjects(ro)); + return FastRandomUtils.allocateNewValue(its, itEntry, 1, Long.MAX_VALUE); + } catch (Throwable ex) { + ops.endOp(); + throw ex; + } } @Override public void closeIterator(long iteratorId) throws it.cavallium.rockserver.core.common.RocksDBException { - + ops.beginOp(); + try { + // Should close the iterator operation + throw new UnsupportedOperationException(); + } finally { + ops.endOp(); + } } @Override - public void seekTo(long iterationId, MemorySegment[] keys) - throws it.cavallium.rockserver.core.common.RocksDBException { - + public void seekTo(long iterationId, MemorySegment[] keys) throws it.cavallium.rockserver.core.common.RocksDBException { + ops.beginOp(); + try { + throw new UnsupportedOperationException(); + } finally { + ops.endOp(); + } } @Override @@ -403,7 +456,12 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { long skipCount, long takeCount, IteratorCallback callback) throws it.cavallium.rockserver.core.common.RocksDBException { - + ops.beginOp(); + try { + throw new UnsupportedOperationException(); + } finally { + ops.endOp(); + } } private MemorySegment dbGet(REntry tx, @@ -413,14 +471,14 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { MemorySegment calculatedKey) throws RocksDBException { if (tx != null) { var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); - return Utils.toMemorySegment(arena, previousRawBucketByteArray); + return toMemorySegment(previousRawBucketByteArray); } else { var db = this.db.get(); if (USE_FAST_GET) { return dbGetDirect(arena, readOptions, calculatedKey); } else { var previousRawBucketByteArray = db.get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); - return Utils.toMemorySegment(arena, previousRawBucketByteArray); + return toMemorySegment(previousRawBucketByteArray); } } } @@ -463,10 +521,6 @@ public class EmbeddedDB implements RocksDBAPI, Closeable { return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize()) != -1; } - public static byte[] toByteArray(MemorySegment memorySegment) { - return memorySegment.toArray(BIG_ENDIAN_BYTES); - } - private ColumnInstance getColumn(long columnId) { var col = columns.get(columnId); if (col != null) { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/CacheFactory.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/CacheFactory.java new file mode 100644 index 0000000..bd53d93 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/CacheFactory.java @@ -0,0 +1,8 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import org.rocksdb.Cache; + +public interface CacheFactory { + + Cache newCache(long size); +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/ClockCacheFactory.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/ClockCacheFactory.java new file mode 100644 index 0000000..f04f050 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/ClockCacheFactory.java @@ -0,0 +1,12 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import org.rocksdb.Cache; +import org.rocksdb.ClockCache; + +public class ClockCacheFactory implements CacheFactory { + + @Override + public Cache newCache(long size) { + return new ClockCache(size); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/LRUCacheFactory.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/LRUCacheFactory.java new file mode 100644 index 0000000..9894549 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/LRUCacheFactory.java @@ -0,0 +1,12 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import org.rocksdb.Cache; +import org.rocksdb.LRUCache; + +public class LRUCacheFactory implements CacheFactory { + + @Override + public Cache newCache(long size) { + return new LRUCache(size); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java new file mode 100644 index 0000000..7a6aa9b --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java @@ -0,0 +1,547 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import it.cavallium.rockserver.core.config.*; +import org.github.gestalt.config.exceptions.GestaltException; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.*; +import org.rocksdb.util.SizeUnit; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static it.cavallium.rockserver.core.common.Utils.mapList; +import static java.lang.Boolean.parseBoolean; +import static java.util.Objects.requireNonNull; +import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET; + +public class RocksDBLoader { + + private static final boolean FOLLOW_ROCKSDB_OPTIMIZATIONS = true; + + private static final boolean PARANOID_CHECKS + = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.paranoid", "true")); + private static final boolean USE_CLOCK_CACHE + = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.clockcache.enable", "false")); + private static final CacheFactory CACHE_FACTORY = USE_CLOCK_CACHE ? new ClockCacheFactory() : new LRUCacheFactory(); + + public static TransactionalDB load(@Nullable Path path, DatabaseConfig config, Logger logger) { + var refs = new RocksDBObjects(); + var optionsWithCache = makeRocksDBOptions(path, config, refs, logger); + return loadDb(path, config, optionsWithCache, refs, logger); + } + + record OptionsWithCache(DBOptions options, @Nullable Cache standardCache) {} + + private static OptionsWithCache makeRocksDBOptions(@Nullable Path path, DatabaseConfig databaseOptions, RocksDBObjects refs, Logger logger) { + try { + // Get databases directory path + Path databasesDirPath; + if (path != null) { + databasesDirPath = path.toAbsolutePath().getParent(); + // Create base directories + if (Files.notExists(databasesDirPath)) { + Files.createDirectories(databasesDirPath); + } + } else { + databasesDirPath = null; + } + + List volumeConfigs = getVolumeConfigs(databaseOptions); + + // the Options class contains a set of configurable DB options + // that determines the behaviour of the database. + var options = new DBOptions(); + refs.add(options); + options.setParanoidChecks(PARANOID_CHECKS); + options.setSkipCheckingSstFileSizesOnDbOpen(true); + options.setEnablePipelinedWrite(true); + var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1")); + if (maxSubCompactions > 0) { + options.setMaxSubcompactions(maxSubCompactions); + } + var customWriteRate = Long.parseLong(System.getProperty("it.cavallium.dbengine.write.delayedrate", "-1")); + if (customWriteRate >= 0) { + options.setDelayedWriteRate(customWriteRate); + } + + Optional.ofNullable(databaseOptions.global().logPath()) + .map(Path::toString) + .ifPresent(options::setDbLogDir); + + if (path != null) { + Optional.ofNullable(databaseOptions.global().walPath()) + .map(Path::toString) + .ifPresent(options::setWalDir); + } + + options.setCreateIfMissing(true); + options.setSkipStatsUpdateOnDbOpen(true); + options.setCreateMissingColumnFamilies(true); + options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL); + // todo: automatically flush every x seconds? + + options.setManualWalFlush(true); + + options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown + options.setAvoidFlushDuringRecovery(true); // Flush all WALs during startup + options.setWalRecoveryMode(databaseOptions.global().absoluteConsistency() + ? WALRecoveryMode.AbsoluteConsistency + : WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords + options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds + options.setKeepLogFileNum(10); + + if (databasesDirPath != null) { + requireNonNull(databasesDirPath); + requireNonNull(path.getFileName()); + List paths = mapList(convertPaths(databasesDirPath, path.getFileName(), volumeConfigs), + p -> new DbPath(p.path, p.targetSize) + ); + options.setDbPaths(paths); + } else if (!volumeConfigs.isEmpty()) { + throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "in-memory databases should not have any volume configured"); + } + options.setMaxOpenFiles(Optional.ofNullable(databaseOptions.global().maximumOpenFiles()).orElse(-1)); + options.setMaxFileOpeningThreads(Runtime.getRuntime().availableProcessors()); + if (databaseOptions.global().spinning()) { + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + options.setUseFsync(false); + } + + long writeBufferManagerSize = Optional.ofNullable(databaseOptions.global().writeBufferManager()) + .map(DataSize::longValue) + .orElse(0L); + + if (isDisableAutoCompactions()) { + options.setMaxBackgroundCompactions(0); + options.setMaxBackgroundJobs(0); + } else { + var backgroundJobs = Integer.parseInt(System.getProperty("it.cavallium.dbengine.jobs.background.num", "-1")); + if (backgroundJobs >= 0) { + options.setMaxBackgroundJobs(backgroundJobs); + } + } + + Cache blockCache; + final boolean useDirectIO = path != null && databaseOptions.global().useDirectIo(); + final boolean allowMmapReads = (path == null) || (!useDirectIO && databaseOptions.global().allowRocksdbMemoryMapping()); + final boolean allowMmapWrites = (path != null) && (!useDirectIO && (databaseOptions.global().allowRocksdbMemoryMapping() + || parseBoolean(System.getProperty("it.cavallium.dbengine.mmapwrites.enable", "false")))); + + // todo: replace with a real option called database-write-buffer-size + // 0 = default = disabled + long dbWriteBufferSize = Long.parseLong(System.getProperty("it.cavallium.dbengine.dbwritebuffer.size", "0")); + + options + .setDbWriteBufferSize(dbWriteBufferSize) + .setBytesPerSync(64 * SizeUnit.MB) + .setWalBytesPerSync(64 * SizeUnit.MB) + + .setWalTtlSeconds(80) // Auto + .setWalSizeLimitMB(0) // Auto + .setMaxTotalWalSize(0) // AUto + ; + if (path != null) { + blockCache = CACHE_FACTORY.newCache(writeBufferManagerSize + Optional.ofNullable(databaseOptions.global().blockCache()).map(DataSize::longValue).orElse( 512 * SizeUnit.MB)); + refs.add(blockCache); + } else { + blockCache = null; + } + + if (useDirectIO) { + options + // Option to enable readahead in compaction + // If not set, it will be set to 2MB internally + .setCompactionReadaheadSize(4 * SizeUnit.MB) // recommend at least 2MB + // Option to tune write buffer for direct writes + .setWritableFileMaxBufferSize(2 * SizeUnit.MB) + ; + } + if (databaseOptions.global().spinning()) { + options + // method documentation + .setCompactionReadaheadSize(16 * SizeUnit.MB) + // guessed + .setWritableFileMaxBufferSize(8 * SizeUnit.MB); + } + options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors()); + + if (path != null && writeBufferManagerSize > 0L) { + var writeBufferManager = new WriteBufferManager(writeBufferManagerSize, blockCache, false); + refs.add(writeBufferManager); + options.setWriteBufferManager(writeBufferManager); + } + + if (useDirectIO) { + options + .setAllowMmapReads(false) + .setAllowMmapWrites(false) + .setUseDirectReads(true) + ; + } else { + options + .setAllowMmapReads(allowMmapReads) + .setAllowMmapWrites(allowMmapWrites); + } + + if (useDirectIO || !allowMmapWrites) { + options.setUseDirectIoForFlushAndCompaction(true); + } + + return new OptionsWithCache(options, blockCache); + } catch (IOException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, e); + } catch (GestaltException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, e); + } + } + + private static List getVolumeConfigs(DatabaseConfig databaseOptions) throws GestaltException { + try { + return List.of(databaseOptions.global().volumes()); + } catch (GestaltException ex) { + if (ex.getMessage().equals("Failed to get proxy config while calling method: volumes in path: database.global.")) { + return List.of(); + } else { + throw ex; + } + } + } + + private static TransactionalDB loadDb(@Nullable Path path, DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) { + var rocksdbOptions = optionsWithCache.options(); + try { + List volumeConfigs = getVolumeConfigs(databaseOptions); + List descriptors = new ArrayList<>(); + + var defaultColumnOptions = new ColumnFamilyOptions(); + refs.add(defaultColumnOptions); + descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultColumnOptions)); + + var rocksLogger = new RocksLogger(rocksdbOptions, logger); + + var columnConfigs = databaseOptions.global().columnOptions(); + + SequencedMap columnConfigMap = new LinkedHashMap<>(); + + for (NamedColumnConfig columnConfig : columnConfigs) { + columnConfigMap.put(columnConfig.name(), columnConfig); + } + if (path != null) { + List existingColumnFamilies; + try (var options = new Options()) { + options.setCreateIfMissing(true); + existingColumnFamilies = mapList(RocksDB.listColumnFamilies(options, path.toString()), b -> new String(b, StandardCharsets.UTF_8)); + } + for (String existingColumnFamily : existingColumnFamilies) { + columnConfigMap.putIfAbsent(existingColumnFamily, databaseOptions.global().fallbackColumnOptions()); + } + } + + for (Map.Entry entry : columnConfigMap.entrySet()) { + String name = entry.getKey(); + FallbackColumnConfig columnOptions = entry.getValue(); + if (columnOptions instanceof NamedColumnConfig namedColumnConfig && !namedColumnConfig.name().equals(name)) { + throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name); + } + + var columnFamilyOptions = new ColumnFamilyOptions(); + refs.add(columnFamilyOptions); + + //noinspection ConstantConditions + if (columnOptions.memtableMemoryBudgetBytes() != null) { + // about 512MB of ram will be used for level style compaction + columnFamilyOptions.optimizeLevelStyleCompaction(Optional.ofNullable(columnOptions.memtableMemoryBudgetBytes()) + .map(DataSize::longValue) + .orElse(DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET)); + } + + if (isDisableAutoCompactions()) { + columnFamilyOptions.setDisableAutoCompactions(true); + } + try { + columnFamilyOptions.setPrepopulateBlobCache(PrepopulateBlobCache.PREPOPULATE_BLOB_FLUSH_ONLY); + } catch (Throwable ex) { + logger.log(Level.SEVERE, "Failed to set prepopulate blob cache", ex); + } + + // This option is not supported with multiple db paths + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + boolean dynamicLevelBytes = volumeConfigs.size() <= 1; + if (dynamicLevelBytes) { + columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true); + } else { + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB); + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setMaxBytesForLevelMultiplier(10); + } + if (isDisableAutoCompactions()) { + columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1); + } else if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { + // ArangoDB uses a value of 2: https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + // Higher values speed up writes, but slow down reads + columnFamilyOptions.setLevel0FileNumCompactionTrigger(2); + } + if (isDisableSlowdown()) { + columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1); + columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE); + columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE); + columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE); + } + { + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setLevel0SlowdownWritesTrigger(20); + // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html + columnFamilyOptions.setLevel0StopWritesTrigger(36); + } + + if (columnOptions.levels().length > 0) { + columnFamilyOptions.setNumLevels(columnOptions.levels().length); + var firstLevelOptions = getRocksLevelOptions(columnOptions.levels()[0], refs); + columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType); + columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions); + + var lastLevelOptions = getRocksLevelOptions(columnOptions + .levels()[columnOptions.levels().length - 1], refs); + columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType); + columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions); + + List compressionPerLevel = new ArrayList<>(); + for (ColumnLevelConfig columnLevelConfig : List.of(columnOptions.levels())) { + CompressionType compression = columnLevelConfig.compression(); + compressionPerLevel.add(compression); + } + columnFamilyOptions.setCompressionPerLevel(compressionPerLevel); + } else { + columnFamilyOptions.setNumLevels(7); + List compressionTypes = new ArrayList<>(7); + for (int i = 0; i < 7; i++) { + if (i < 2) { + compressionTypes.add(CompressionType.NO_COMPRESSION); + } else { + compressionTypes.add(CompressionType.LZ4_COMPRESSION); + } + } + columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION); + var compressionOptions = new CompressionOptions() + .setEnabled(true) + .setMaxDictBytes(32768); + refs.add(compressionOptions); + columnFamilyOptions.setBottommostCompressionOptions(compressionOptions); + columnFamilyOptions.setCompressionPerLevel(compressionTypes); + } + + final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); + + if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { + columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB); + } + Optional.ofNullable(columnOptions.writeBufferSize()) + .map(DataSize::longValue) + .ifPresent(columnFamilyOptions::setWriteBufferSize); + + columnFamilyOptions.setMaxWriteBufferNumberToMaintain(1); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setVerifyCompression(false); + } + // If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1) + // If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys + BloomFilterConfig filter = null; + BloomFilterConfig bloomFilterConfig = columnOptions.bloomFilter(); + if (bloomFilterConfig != null) filter = bloomFilterConfig; + if (filter == null) { + if (path == null) { + throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases"); + } + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setFilterPolicy(null); + } + } else { + final BloomFilter bloomFilter = new BloomFilter(filter.bitsPerKey()); + refs.add(bloomFilter); + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig.setFilterPolicy(bloomFilter); + } + } + boolean cacheIndexAndFilterBlocks = path != null && Optional.ofNullable(columnOptions.cacheIndexAndFilterBlocks()) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .orElse(true); + if (databaseOptions.global().spinning()) { + if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // cacheIndexAndFilterBlocks = true; + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions.setMinWriteBufferNumberToMerge(3); + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions.setMaxWriteBufferNumber(4); + } + } + if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) { + blockBasedTableConfig + // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html + .setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash) + // http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html + .setDataBlockHashTableUtilRatio(0.75) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setPinTopLevelIndexAndFilter(true) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setPinL0FilterAndIndexBlocksInCache(path != null) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setCacheIndexAndFilterBlocksWithHighPriority(true) + .setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + // Enabling partition filters increase the reads by 2x + .setPartitionFilters(Optional.ofNullable(columnOptions.partitionFilters()).orElse(false)) + // https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters + .setIndexType(path == null ? IndexType.kHashSearch : Optional.ofNullable(columnOptions.partitionFilters()).orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch) + .setChecksumType(path == null ? ChecksumType.kNoChecksum : ChecksumType.kXXH3) + // Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + .setBlockSize(path == null ? 4096 : Optional.ofNullable(columnOptions.blockSize()).map(DataSize::longValue).orElse((databaseOptions.global().spinning() ? 128 : 16) * 1024L)) + .setBlockCache(optionsWithCache.standardCache()) + .setNoBlockCache(optionsWithCache.standardCache() == null); + } + if (path == null) { + columnFamilyOptions.useCappedPrefixExtractor(4); + tableOptions.setBlockRestartInterval(4); + } + + columnFamilyOptions.setTableFormatConfig(tableOptions); + columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio); + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions= + BloomFilterConfig bloomFilterOptions = columnOptions.bloomFilter(); + if (bloomFilterOptions != null) { + // https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks + // https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions= + boolean optimizeForHits = databaseOptions.global().spinning(); + Boolean value = bloomFilterOptions.optimizeForHits(); + if (value != null) optimizeForHits = value; + columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits); + } + + if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) { + // // Increasing this value can reduce the frequency of compaction and reduce write amplification, + // // but it will also cause old data to be unable to be cleaned up in time, thus increasing read amplification. + // // This parameter is not easy to adjust. It is generally not recommended to set it above 256MB. + // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html + columnFamilyOptions.setTargetFileSizeBase(64 * SizeUnit.MB); + // // For each level up, the threshold is multiplied by the factor target_file_size_multiplier + // // (but the default value is 1, which means that the maximum sstable of each level is the same). + columnFamilyOptions.setTargetFileSizeMultiplier(2); + } + + descriptors.add(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), columnFamilyOptions)); + } + + // Get databases directory path + String definitiveDbPathString; + if (path != null) { + Path databasesDirPath = path.toAbsolutePath().getParent(); + definitiveDbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName(); + } else { + try { + definitiveDbPathString = Files.createTempDirectory("temp-rocksdb").toString(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + var handles = new ArrayList(); + RocksDB db; + // a factory method that returns a RocksDB instance + if (databaseOptions.global().optimistic()) { + db = OptimisticTransactionDB.open(rocksdbOptions, definitiveDbPathString, descriptors, handles); + } else { + var transactionOptions = new TransactionDBOptions() + .setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED) + .setTransactionLockTimeout(5000) + .setDefaultLockTimeout(5000); + refs.add(transactionOptions); + db = TransactionDB.open(rocksdbOptions, + transactionOptions, + definitiveDbPathString, + descriptors, + handles + ); + } + + handles.forEach(refs::add); + + try { + for (ColumnFamilyHandle cfh : handles) { + var props = db.getProperty(cfh, "rocksdb.stats"); + logger.log(Level.FINEST, "Stats for database column {1}: {2}", + new Object[]{new String(cfh.getName(), StandardCharsets.UTF_8), + props} + ); + } + } catch (RocksDBException ex) { + logger.log(Level.FINE, "Failed to obtain stats", ex); + } + return TransactionalDB.create(definitiveDbPathString, db); + } catch (RocksDBException ex) { + throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex); + } catch (GestaltException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Failed to load rocksdb", e); + } + } + + record DbPathRecord(Path path, long targetSize) {} + + private static List convertPaths(Path databasesDirPath, Path path, List volumes) throws GestaltException { + var paths = new ArrayList(volumes.size()); + if (volumes.isEmpty()) { + return List.of(new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_hot"), + 0), // Legacy + new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_cold"), + 0), // Legacy + new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_colder"), + 1000L * 1024L * 1024L * 1024L) // 1000GiB + ); // Legacy + } + for (var volume : volumes) { + Path volumePath; + if (volume.volumePath().isAbsolute()) { + volumePath = volume.volumePath(); + } else { + volumePath = databasesDirPath.resolve(volume.volumePath()); + } + paths.add(new DbPathRecord(volumePath, volume.targetSizeBytes())); + } + return paths; + } + + public static boolean isDisableAutoCompactions() { + return parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false")); + } + + public static boolean isDisableSlowdown() { + return isDisableAutoCompactions() + || parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false")); + } + + private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} + private static RocksLevelOptions getRocksLevelOptions(ColumnLevelConfig levelOptions, RocksDBObjects refs) throws GestaltException { + var compressionType = levelOptions.compression(); + var compressionOptions = new CompressionOptions(); + refs.add(compressionOptions); + if (compressionType != CompressionType.NO_COMPRESSION) { + compressionOptions.setEnabled(true); + compressionOptions.setMaxDictBytes(Math.toIntExact(levelOptions.maxDictBytes().longValue())); + } else { + compressionOptions.setEnabled(false); + } + return new RocksLevelOptions(compressionType, compressionOptions); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksLogger.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksLogger.java new file mode 100644 index 0000000..0fa0fd8 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksLogger.java @@ -0,0 +1,11 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import org.rocksdb.DBOptions; + +import java.util.logging.Logger; + +public class RocksLogger { + public RocksLogger(DBOptions rocksdbOptions, Logger logger) { + + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java index 0966346..50dd44e 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java @@ -13,16 +13,18 @@ import org.rocksdb.WriteOptions; public sealed interface TransactionalDB extends Closeable { - static TransactionalDB create(RocksDB db) { + static TransactionalDB create(String path, RocksDB db) { return switch (db) { - case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(optimisticTransactionDB); - case TransactionDB transactionDB -> new PessimisticTransactionalDB(transactionDB); + case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB); + case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB); default -> throw new UnsupportedOperationException("This database is not transactional"); }; } TransactionalOptions createTransactionalOptions(); + String getPath(); + RocksDB get(); /** * Starts a new Transaction. @@ -89,9 +91,11 @@ public sealed interface TransactionalDB extends Closeable { final class PessimisticTransactionalDB implements TransactionalDB { + private final String path; private final TransactionDB db; - public PessimisticTransactionalDB(TransactionDB db) { + public PessimisticTransactionalDB(String path, TransactionDB db) { + this.path = path; this.db = db; } @@ -100,6 +104,11 @@ public sealed interface TransactionalDB extends Closeable { return new TransactionalOptionsPessimistic(new TransactionOptions()); } + @Override + public String getPath() { + return path; + } + @Override public RocksDB get() { return db; @@ -153,9 +162,11 @@ public sealed interface TransactionalDB extends Closeable { final class OptimisticTransactionalDB implements TransactionalDB { + private final String path; private final OptimisticTransactionDB db; - public OptimisticTransactionalDB(OptimisticTransactionDB db) { + public OptimisticTransactionalDB(String path, OptimisticTransactionDB db) { + this.path = path; this.db = db; } @@ -164,6 +175,11 @@ public sealed interface TransactionalDB extends Closeable { return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions()); } + @Override + public String getPath() { + return path; + } + @Override public RocksDB get() { return db; diff --git a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf index cd993da..6e8c345 100644 --- a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf +++ b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf @@ -4,6 +4,8 @@ database: { enable-column-bug: false # Enable to adapt the database to spinning disk spinning: false + # Enable to require absolute consistency after a crash. False to use the PointInTime recovery strategy + absolute-consistency: true # Error checking checksum: true # Use direct I/O in RocksDB databases (Higher I/O read throughput but OS cache is not used, less swapping, less memory pressure) @@ -14,6 +16,9 @@ database: { # If the maximum open files count is -1, the initial startup time will be slower. # If "cacheIndexAndFilterBlocks" is false, the memory will rise when the number of open files rises. maximum-open-files: -1 + # RocksDB data volumes. + volumes: [] + # Optimistic transactions optimistic: true # Database block cache size block-cache: 512MiB @@ -85,6 +90,8 @@ database: { # or if you want to use the "memtable-memory-budget-size" logic. # Remember that there are "max-write-buffer-number" in memory, 2 by default write-buffer-size: 200MiB + # Enable blob files + blob-files: false } column-options: [ ${database.global.fallback-column-options} { diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java index b558454..04ffde8 100644 --- a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java @@ -3,50 +3,57 @@ package it.cavallium.rockserver.core.impl.test; import it.cavallium.rockserver.core.client.EmbeddedConnection; import it.cavallium.rockserver.core.common.Callback.CallbackDelta; import it.cavallium.rockserver.core.common.ColumnSchema; -import it.cavallium.rockserver.core.common.Delta; -import java.io.File; +import it.cavallium.rockserver.core.common.Utils; +import org.junit.jupiter.api.Assertions; + import java.io.IOException; import java.lang.foreign.MemorySegment; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Comparator; -import java.util.stream.Stream; +import java.util.concurrent.atomic.AtomicInteger; class EmbeddedDBTest { - private Path dir; private EmbeddedConnection db; + private long colId; @org.junit.jupiter.api.BeforeEach void setUp() throws IOException { - this.dir = Files.createTempDirectory("db-test"); - db = new EmbeddedConnection(dir, "test", null); + if (System.getProperty("rockserver.core.print-config", null) == null) { + System.setProperty("rockserver.core.print-config", "false"); + } + db = new EmbeddedConnection(null, "test", null); + this.colId = db.createColumn("put-1", new ColumnSchema(new int[]{1, 2, 1, Integer.BYTES, Integer.BYTES}, 2, true)); } @org.junit.jupiter.api.AfterEach void tearDown() throws IOException { - try (Stream walk = Files.walk(dir)) { - db.close(); - walk.sorted(Comparator.reverseOrder()) - .map(Path::toFile) - .peek(System.out::println) - .forEach(File::delete); - } + db.deleteColumn(colId); + db.close(); } @org.junit.jupiter.api.Test void put() { - var colId = db.createColumn("put-1", new ColumnSchema(new int[]{16, 16, 16, 32, 32}, 2, true)); - db.put(0, colId, null, null, new CallbackDelta() { - @Override - public void onSuccess(Delta previous) { - - } + var key = new MemorySegment[] { + Utils.toMemorySegment(new byte[] {3}), + Utils.toMemorySegment(new byte[] {4, 6}), + Utils.toMemorySegment(new byte[] {3}), + Utils.toMemorySegment(new byte[] {1, 2, 3}), + Utils.toMemorySegment(new byte[] {0, 0, 3, 6, 7, 8}) + }; + var value1 = MemorySegment.ofArray(new byte[] {0, 0, 3}); + AtomicInteger callbackCalled = new AtomicInteger(); + db.put(0, colId, key, value1, (CallbackDelta) prev -> { + callbackCalled.incrementAndGet(); + Assertions.assertNull(prev); + }); + Assertions.assertEquals(1, callbackCalled.get()); + db.put(0, colId, key, MemorySegment.ofArray(new byte[] {0, 0, 5}), (CallbackDelta) prev -> { + callbackCalled.incrementAndGet(); + Utils.(value1); }); - db.deleteColumn(colId); } @org.junit.jupiter.api.Test void get() { + throw new UnsupportedOperationException(); } } \ No newline at end of file