Partial commit

This commit is contained in:
Andrea Cavalli 2023-12-07 00:14:28 +01:00
parent 9a360c2440
commit a505b3500f
28 changed files with 1036 additions and 212 deletions

View File

@ -1,20 +1,15 @@
package it.cavallium.rockserver.core;
import static it.cavallium.rockserver.core.client.EmbeddedConnection.PRIVATE_MEMORY_URL;
import static java.util.Objects.requireNonNull;
import inet.ipaddr.HostName;
import it.cavallium.rockserver.core.client.ClientBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnixDomainSocketAddress;
import java.net.spi.InetAddressResolver;
import java.net.spi.InetAddressResolverProvider;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -34,7 +29,7 @@ public class Main {
.description("RocksDB server core");
parser.addArgument("-u", "--url")
.type(String.class)
.setDefault("file://" + System.getProperty("user.home") + "/rockserver-core-db")
.setDefault(PRIVATE_MEMORY_URL.toString())
.help("Specify database rocksdb://hostname:port, or unix://<path>, or file://<path>");
parser.addArgument("-n", "--name")
.type(String.class)
@ -85,7 +80,8 @@ public class Main {
switch (url.getScheme()) {
case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(url.getPath())));
case "file" -> clientBuilder.setEmbedded(Path.of(url.getPath()));
case "file" -> clientBuilder.setEmbeddedPath(Path.of(url.getPath()));
case "memory" -> clientBuilder.setEmbeddedInMemory(true);
case "rocksdb" -> clientBuilder.setAddress(new HostName(url.getHost()).asInetSocketAddress());
default -> throw new IllegalArgumentException("Invalid scheme: " + url.getScheme());
}

View File

@ -8,12 +8,17 @@ public class ClientBuilder {
private InetSocketAddress iNetAddress;
private UnixDomainSocketAddress unixAddress;
private Path embedded;
private Path embeddedPath;
private String name;
private Path embeddedConfig;
private boolean embeddedInMemory;
public void setEmbedded(Path path) {
this.embedded = path;
public void setEmbeddedPath(Path path) {
this.embeddedPath = path;
}
public void setEmbeddedInMemory(boolean inMemory) {
this.embeddedInMemory = inMemory;
}
public void setUnixSocket(UnixDomainSocketAddress address) {
@ -33,8 +38,10 @@ public class ClientBuilder {
}
public RocksDBConnection build() {
if (embedded != null) {
return new EmbeddedConnection(embedded, name, embeddedConfig);
if (embeddedInMemory) {
return new EmbeddedConnection(null, name, embeddedConfig);
} else if (embeddedPath != null) {
return new EmbeddedConnection(embeddedPath, name, embeddedConfig);
} else if (unixAddress != null) {
return new SocketConnectionUnix(unixAddress, name);
} else if (iNetAddress != null) {

View File

@ -10,15 +10,18 @@ import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.net.URI;
import java.nio.file.Path;
import java.util.Optional;
import org.jetbrains.annotations.Nullable;
public class EmbeddedConnection extends BaseConnection {
private final EmbeddedDB db;
public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private");
public EmbeddedConnection(Path path, String name, Path embeddedConfig) {
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) {
super(name);
this.db = new EmbeddedDB(path, embeddedConfig);
this.db = new EmbeddedDB(path, name, embeddedConfig);
}
@Override
@ -29,7 +32,7 @@ public class EmbeddedConnection extends BaseConnection {
@Override
public URI getUrl() {
return db.getPath().toUri();
return Optional.ofNullable(db.getPath()).map(Path::toUri).orElse(PRIVATE_MEMORY_URL);
}
@Override

View File

@ -5,7 +5,7 @@ public class RocksDBException extends RuntimeException {
private final RocksDBErrorType errorUniqueId;
public enum RocksDBErrorType {
PUT_UNKNOWN, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, VALUE_MUST_BE_NULL
PUT_UNKNOWN, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, ROCKSDB_CONFIG_ERROR, VALUE_MUST_BE_NULL, DIRECTORY_DELETE, KEY_LENGTH_MISMATCH, UNSUPPORTED_HASH_SIZE, RAW_KEY_LENGTH_MISMATCH, KEYS_COUNT_MISMATCH, ROCKSDB_LOAD_ERROR
}

View File

@ -2,8 +2,17 @@ package it.cavallium.rockserver.core.common;
import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import java.lang.foreign.Arena;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@ -40,11 +49,36 @@ public class Utils {
}
@NotNull
public static MemorySegment toMemorySegment(Arena arena, byte @Nullable [] array) {
public static MemorySegment toMemorySegment(byte @Nullable [] array) {
if (array != null) {
return arena.allocateArray(BIG_ENDIAN_BYTES, array);
return MemorySegment.ofArray(array);
} else {
return MemorySegment.NULL;
}
}
public static byte[] toByteArray(MemorySegment memorySegment) {
return memorySegment.toArray(BIG_ENDIAN_BYTES);
}
public static <T, U> List<U> mapList(Collection<T> input, Function<T, U> mapper) {
var result = new ArrayList<U>(input.size());
input.forEach(t -> result.add(mapper.apply(t)));
return result;
}
public static void deleteDirectory(String path) throws RocksDBException {
try (Stream<Path> pathStream = Files.walk(Path.of(path))) {
pathStream.sorted(Comparator.reverseOrder())
.forEach(f -> {
try {
Files.deleteIfExists(f);
} catch (IOException e) {
throw new RocksDBException(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e);
}
});
} catch (IOException e) {
throw new RocksDBException(RocksDBException.RocksDBErrorType.DIRECTORY_DELETE, e);
}
}
}

View File

@ -1,9 +1,13 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
public interface BloomFilterConfig {
int bitsPerKey();
int bitsPerKey() throws GestaltException;
boolean optimizeForHits();
@Nullable
Boolean optimizeForHits() throws GestaltException;
}

View File

@ -0,0 +1,12 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
import org.rocksdb.CompressionType;
public interface ColumnLevelConfig {
CompressionType compression() throws GestaltException;
DataSize maxDictBytes() throws GestaltException;
}

View File

@ -1,13 +1,12 @@
package it.cavallium.rockserver.core.config;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.github.gestalt.config.exceptions.GestaltException;
import java.util.*;
public class ConfigPrinter {
public static String stringifyBloomFilter(BloomFilterConfig o) {
public static String stringifyBloomFilter(BloomFilterConfig o) throws GestaltException {
return """
{
"bits-per-key": %d,
@ -16,14 +15,14 @@ public class ConfigPrinter {
""".formatted(o.bitsPerKey(), o.optimizeForHits());
}
public static String stringifyDatabase(DatabaseConfig o) {
public static String stringifyDatabase(DatabaseConfig o) throws GestaltException {
return """
{
"global": %s
}""".formatted(stringifyGlobalDatabase(o.global()));
}
public static String stringifyLevel(DatabaseLevel o) {
public static String stringifyLevel(ColumnLevelConfig o) throws GestaltException {
return """
{
"compression": "%s",
@ -32,29 +31,29 @@ public class ConfigPrinter {
""".formatted(o.compression(), o.maxDictBytes());
}
public static String stringifyFallbackColumn(FallbackColumnOptions o) {
return """
{
"levels": %s,
"memtable-memory-budget-bytes": "%s",
"cache-index-and-filter-blocks": %b,
"partition-filters": %s,
"bloom-filter": %s,
"block-size": "%s",
"write-buffer-size": "%s"
}\
""".formatted(Arrays.stream(Objects.requireNonNullElse(o.levels(), new DatabaseLevel[0]))
.map(ConfigPrinter::stringifyLevel).collect(Collectors.joining(",", "[", "]")),
o.memtableMemoryBudgetBytes(),
o.cacheIndexAndFilterBlocks(),
o.partitionFilters(),
stringifyBloomFilter(o.bloomFilter()),
o.blockSize(),
o.writeBufferSize()
);
private static List<VolumeConfig> getVolumeConfigs(GlobalDatabaseConfig g) throws GestaltException {
try {
return List.of(g.volumes());
} catch (GestaltException ex) {
if (ex.getMessage().equals("Failed to get proxy config while calling method: volumes in path: database.global.")) {
return List.of();
} else {
throw ex;
}
}
}
public static String stringifyGlobalDatabase(GlobalDatabaseConfig o) {
public static String stringifyGlobalDatabase(GlobalDatabaseConfig o) throws GestaltException {
StringJoiner joiner = new StringJoiner(",", "[", "]");
for (NamedColumnConfig namedColumnConfig : Objects.requireNonNullElse(o.columnOptions(), new NamedColumnConfig[0])) {
String s = stringifyNamedColumn(namedColumnConfig);
joiner.add(s);
}
StringJoiner result = new StringJoiner(",", "[", "]");
for (VolumeConfig volumeConfig : getVolumeConfigs(o)) {
String s = stringifyVolume(volumeConfig);
result.add(s);
}
return """
{
"spinning": %b,
@ -66,6 +65,9 @@ public class ConfigPrinter {
"block-cache": "%s",
"write-buffer-manager": "%s",
"log-path": "%s",
"wal-path": "%s",
"absolute-consistency": %b,
"volumes": %s,
"fallback-column-options": %s,
"column-options": %s
}\
@ -78,14 +80,69 @@ public class ConfigPrinter {
o.blockCache(),
o.writeBufferManager(),
o.logPath(),
o.walPath(),
o.absoluteConsistency(),
result.toString(),
stringifyFallbackColumn(o.fallbackColumnOptions()),
Arrays.stream(Objects.requireNonNullElse(o.columnOptions(), new NamedColumnOptions[0]))
.map(ConfigPrinter::stringifyNamedColumn)
.collect(Collectors.joining(",", "[", "]"))
joiner.toString()
);
}
public static String stringifyNamedColumn(NamedColumnOptions o) {
private static String stringifyVolume(VolumeConfig o) throws GestaltException {
return """
{
"volume-path": "%s",
"target-size-bytes": %b
}\
""".formatted(o.volumePath(),
o.targetSizeBytes()
);
}
public static String stringifyFallbackColumn(FallbackColumnConfig o) throws GestaltException {
StringJoiner joiner = new StringJoiner(",", "[", "]");
for (ColumnLevelConfig columnLevelConfig : Objects.requireNonNullElse(o.levels(), new ColumnLevelConfig[0])) {
String s = stringifyLevel(columnLevelConfig);
joiner.add(s);
}
String bloom = null;
BloomFilterConfig value = o.bloomFilter();
if (value != null) {
String s = stringifyBloomFilter(value);
if (s != null) bloom = s;
}
return """
{
"levels": %s,
"memtable-memory-budget-bytes": "%s",
"cache-index-and-filter-blocks": %b,
"partition-filters": %s,
"bloom-filter": %s,
"block-size": "%s",
"write-buffer-size": "%s"
}\
""".formatted(joiner.toString(),
o.memtableMemoryBudgetBytes(),
o.cacheIndexAndFilterBlocks(),
o.partitionFilters(),
bloom,
o.blockSize(),
o.writeBufferSize()
);
}
public static String stringifyNamedColumn(NamedColumnConfig o) throws GestaltException {
StringJoiner joiner = new StringJoiner(",", "[", "]");
for (ColumnLevelConfig columnLevelConfig : (o.levels() != null ? List.of(o.levels()) : List.<ColumnLevelConfig>of())) {
String s = stringifyLevel(columnLevelConfig);
joiner.add(s);
}
String bloom = null;
BloomFilterConfig value = o.bloomFilter();
if (value != null) {
String s = stringifyBloomFilter(value);
if (s != null) bloom = s;
}
return """
{
"name": "%s",
@ -98,12 +155,11 @@ public class ConfigPrinter {
"write-buffer-size": "%s"
}\
""".formatted(o.name(),
(o.levels() != null ? List.of(o.levels()) : List.<DatabaseLevel>of()).stream()
.map(ConfigPrinter::stringifyLevel).collect(Collectors.joining(",", "[", "]")),
joiner.toString(),
o.memtableMemoryBudgetBytes(),
o.cacheIndexAndFilterBlocks(),
o.partitionFilters(),
stringifyBloomFilter(o.bloomFilter()),
bloom,
o.blockSize(),
o.writeBufferSize()
);

View File

@ -1,7 +1,9 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
public interface DatabaseConfig {
GlobalDatabaseConfig global();
GlobalDatabaseConfig global() throws GestaltException;
}

View File

@ -1,11 +0,0 @@
package it.cavallium.rockserver.core.config;
import org.rocksdb.CompressionType;
public interface DatabaseLevel {
CompressionType compression();
DataSize maxDictBytes();
}

View File

@ -0,0 +1,28 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
public interface FallbackColumnConfig {
ColumnLevelConfig[] levels() throws GestaltException;
@Nullable
DataSize memtableMemoryBudgetBytes() throws GestaltException;
@Nullable
Boolean cacheIndexAndFilterBlocks() throws GestaltException;
@Nullable
Boolean partitionFilters() throws GestaltException;
@Nullable
BloomFilterConfig bloomFilter() throws GestaltException;
@Nullable
DataSize blockSize() throws GestaltException;
@Nullable
DataSize writeBufferSize() throws GestaltException;
}

View File

@ -1,19 +0,0 @@
package it.cavallium.rockserver.core.config;
public interface FallbackColumnOptions {
DatabaseLevel[] levels();
DataSize memtableMemoryBudgetBytes();
boolean cacheIndexAndFilterBlocks();
boolean partitionFilters();
BloomFilterConfig bloomFilter();
DataSize blockSize();
DataSize writeBufferSize();
}

View File

@ -1,28 +1,42 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
import java.nio.file.Path;
public interface GlobalDatabaseConfig {
boolean spinning();
boolean spinning() throws GestaltException;
boolean checksum();
boolean checksum() throws GestaltException;
boolean useDirectIo();
boolean useDirectIo() throws GestaltException;
boolean allowRocksdbMemoryMapping();
boolean allowRocksdbMemoryMapping() throws GestaltException;
int maximumOpenFiles();
@Nullable
Integer maximumOpenFiles() throws GestaltException;
boolean optimistic();
boolean optimistic() throws GestaltException;
DataSize blockCache();
@Nullable
DataSize blockCache() throws GestaltException;
DataSize writeBufferManager();
@Nullable
DataSize writeBufferManager() throws GestaltException;
Path logPath();
@Nullable
Path logPath() throws GestaltException;
FallbackColumnOptions fallbackColumnOptions();
NamedColumnOptions[] columnOptions();
@Nullable
Path walPath() throws GestaltException;
boolean absoluteConsistency() throws GestaltException;
VolumeConfig[] volumes() throws GestaltException;
FallbackColumnConfig fallbackColumnOptions() throws GestaltException;
NamedColumnConfig[] columnOptions() throws GestaltException;
}

View File

@ -0,0 +1,9 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
public interface NamedColumnConfig extends FallbackColumnConfig {
String name() throws GestaltException;
}

View File

@ -1,7 +0,0 @@
package it.cavallium.rockserver.core.config;
public interface NamedColumnOptions extends FallbackColumnOptions {
String name();
}

View File

@ -0,0 +1,12 @@
package it.cavallium.rockserver.core.config;
import org.github.gestalt.config.exceptions.GestaltException;
import java.nio.file.Path;
public interface VolumeConfig {
Path volumePath() throws GestaltException;
long targetSizeBytes() throws GestaltException;
}

View File

@ -82,10 +82,16 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
private MemorySegment computeKeyAt(Arena arena, int i, MemorySegment[] keys) {
if (i < schema.keys().length - schema.variableLengthKeysCount()) {
if (keys[i].byteSize() != schema.keys()[i]) {
throw new RocksDBException(RocksDBErrorType.KEY_LENGTH_MISMATCH,
"Key at index " + i + " has a different length than expected! Expected: " + schema.keys()[i]
+ ", received: " + keys[i].byteSize());
}
return keys[i];
} else {
if (schema.keys()[i] != Integer.BYTES) {
throw new UnsupportedOperationException("Hash size different than 32-bit is currently unsupported");
throw new RocksDBException(RocksDBErrorType.UNSUPPORTED_HASH_SIZE,
"Hash size different than 32-bit is currently unsupported");
} else {
return XXHash32.getInstance().hash(arena, keys[i], 0, 0, 0);
}
@ -94,17 +100,17 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
private void validateFinalKeySize(MemorySegment key) {
if (finalKeySizeBytes != key.byteSize()) {
throw new IllegalArgumentException(
"Keys size must be equal to the column keys size. Expected " + finalKeySizeBytes + ", got "
+ key.byteSize());
throw new RocksDBException(RocksDBErrorType.RAW_KEY_LENGTH_MISMATCH,
"Keys size must be equal to the column keys size. Expected: "
+ finalKeySizeBytes + ", got: " + key.byteSize());
}
}
private void validateKeyCount(MemorySegment[] keys) {
if (schema.keys().length != keys.length) {
throw new IllegalArgumentException(
"Keys count must be equal to the column keys count. Expected " + schema.keys().length + ", got "
+ keys.length);
throw new RocksDBException(RocksDBErrorType.KEYS_COUNT_MISMATCH,
"Keys count must be equal to the column keys count. Expected: " + schema.keys().length
+ ", got: " + keys.length);
}
}
@ -118,8 +124,10 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
long offset = 0;
for (MemorySegment keyI : variableKeys) {
var keyISize = keyI.byteSize();
bucketElementKey.set(BIG_ENDIAN_CHAR_UNALIGNED, offset += Character.BYTES, toCharExact(keyISize));
MemorySegment.copy(keyI, 0, bucketElementKey, offset += keyISize, keyISize);
bucketElementKey.set(BIG_ENDIAN_CHAR_UNALIGNED, offset, toCharExact(keyISize));
offset += Character.BYTES;
MemorySegment.copy(keyI, 0, bucketElementKey, offset, keyISize);
offset += keyISize;
}
assert offset == totalSize;
return bucketElementKey;
@ -135,11 +143,13 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
}
public void checkNullableValue(MemorySegment value) {
if (schema.hasValue() == (value == null || value == MemorySegment.NULL)) {
if (schema.hasValue() == (value == null)) {
if (schema.hasValue()) {
throw new RocksDBException(RocksDBErrorType.UNEXPECTED_NULL_VALUE, "Schema expects a value, but a null value has been passed");
throw new RocksDBException(RocksDBErrorType.UNEXPECTED_NULL_VALUE,
"Schema expects a value, but a null value has been passed");
} else {
throw new RocksDBException(RocksDBErrorType.VALUE_MUST_BE_NULL, "Schema expects no value, but a non-null value has been passed");
throw new RocksDBException(RocksDBErrorType.VALUE_MUST_BE_NULL,
"Schema expects no value, but a non-null value has been passed");
}
}
}
@ -149,7 +159,7 @@ public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int fi
checkNullableValue(computedBucketElementValue);
var keySize = computedBucketElementKey.byteSize();
var valueSize = computedBucketElementValue != null ? computedBucketElementValue.byteSize() : 0;
var totalSize = keySize + valueSize;
var totalSize = Integer.BYTES + keySize + valueSize;
var computedBucketElementKV = arena.allocate(totalSize);
computedBucketElementKV.set(BIG_ENDIAN_INT, 0, toIntExact(totalSize));
MemorySegment.copy(computedBucketElementKey, 0, computedBucketElementKV, Integer.BYTES, keySize);

View File

@ -24,7 +24,7 @@ class DataSizeDecoder implements Decoder<DataSize> {
@Override
public boolean matches(TypeCapture klass) {
return klass.isAssignableFrom(DataSize.class);
return klass != null && klass.isAssignableFrom(DataSize.class);
}
@Override

View File

@ -26,7 +26,7 @@ public class DbCompressionDecoder implements Decoder<CompressionType> {
@Override
public boolean matches(TypeCapture<?> klass) {
return klass.isAssignableFrom(CompressionType.class);
return klass != null && klass.isAssignableFrom(CompressionType.class);
}
@Override

View File

@ -19,6 +19,7 @@ import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.config.ConfigPrinter;
import it.cavallium.rockserver.core.config.DatabaseConfig;
import it.cavallium.rockserver.core.impl.rocksdb.REntry;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBLoader;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
@ -42,34 +43,31 @@ import org.github.gestalt.config.exceptions.GestaltException;
import org.github.gestalt.config.source.ClassPathConfigSource;
import org.github.gestalt.config.source.FileConfigSource;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;
import org.rocksdb.*;
public class EmbeddedDB implements RocksDBAPI, Closeable {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
private static final boolean USE_FAST_GET = true;
private final Logger logger;
private final Path path;
private final Path embeddedConfigPath;
private final @Nullable Path path;
private final @Nullable Path embeddedConfigPath;
private final DatabaseConfig config;
private TransactionalDB db;
private final NonBlockingHashMapLong<ColumnInstance> columns;
private final ConcurrentMap<String, Long> columnNamesIndex;
private final NonBlockingHashMapLong<REntry<Transaction>> txs;
private final NonBlockingHashMapLong<REntry<RocksIterator>> its;
private final SafeShutdown ops;
private final Object columnEditLock = new Object();
public EmbeddedDB(Path path, @Nullable Path embeddedConfigPath) {
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) {
this.path = path;
this.embeddedConfigPath = embeddedConfigPath;
this.logger = Logger.getLogger("db");
this.logger = Logger.getLogger("db." + name);
this.columns = new NonBlockingHashMapLong<>();
this.txs = new NonBlockingHashMapLong<>();
this.its = new NonBlockingHashMapLong<>();
this.columnNamesIndex = new ConcurrentHashMap<>();
this.ops = new SafeShutdown();
@ -88,7 +86,10 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
gestalt.loadConfigs();
this.config = gestalt.getConfig("database", DatabaseConfig.class);
logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringifyDatabase(this.config));
this.db = RocksDBLoader.load(path, config, logger);
if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) {
logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringifyDatabase(this.config));
}
} catch (GestaltException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.CONFIG_ERROR, e);
}
@ -143,6 +144,9 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
// Wait for 10 seconds
try {
ops.closeAndWait(10_000);
if (path == null) {
Utils.deleteDirectory(db.getPath());
}
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "Some operations lasted more than 10 seconds, forcing database shutdown...");
}
@ -150,61 +154,83 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
@Override
public long openTransaction(long timeoutMs) {
// Open the transaction operation, do not close until the transaction has been closed
ops.beginOp();
TransactionalOptions txOpts = db.createTransactionalOptions();
var writeOpts = new WriteOptions();
var tx = new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts));
return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2);
try {
TransactionalOptions txOpts = db.createTransactionalOptions();
var writeOpts = new WriteOptions();
var tx = new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts));
return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2);
} catch (Throwable ex) {
ops.endOp();
throw ex;
}
}
@Override
public void closeTransaction(long transactionId) {
var tx = txs.remove(transactionId);
if (tx != null) {
try {
tx.close();
} finally {
ops.endOp();
ops.beginOp();
try {
var tx = txs.remove(transactionId);
if (tx != null) {
try {
tx.close();
} finally {
// Close the transaction operation
ops.endOp();
}
} else {
throw new NoSuchElementException("Transaction not found: " + transactionId);
}
} else {
throw new NoSuchElementException("Transaction not found: " + transactionId);
} finally {
ops.endOp();
}
}
@Override
public long createColumn(String name, ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException {
synchronized (columnEditLock) {
var colId = getColumnIdOrNull(name);
var col = colId != null ? getColumn(colId) : null;
if (col != null) {
if (schema.equals(col.schema())) {
return colId;
ops.beginOp();
try {
synchronized (columnEditLock) {
var colId = getColumnIdOrNull(name);
var col = colId != null ? getColumn(colId) : null;
if (col != null) {
if (schema.equals(col.schema())) {
return colId;
} else {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_EXISTS,
"Column exists, with a different schema: " + name
);
}
} else {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_EXISTS,
"Column exists, with a different schema: " + name
);
}
} else {
try {
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8)));
return registerColumn(new ColumnInstance(cf, schema));
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
try {
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8)));
return registerColumn(new ColumnInstance(cf, schema));
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
}
}
}
} finally {
ops.endOp();
}
}
@Override
public void deleteColumn(long columnId) throws it.cavallium.rockserver.core.common.RocksDBException {
synchronized (columnEditLock) {
var col = getColumn(columnId);
try {
db.get().dropColumnFamily(col.cfh());
unregisterColumn(columnId).close();
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_DELETE_FAIL, e);
ops.beginOp();
try {
synchronized (columnEditLock) {
var col = getColumn(columnId);
try {
db.get().dropColumnFamily(col.cfh());
unregisterColumn(columnId).close();
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_DELETE_FAIL, e);
}
}
} finally {
ops.endOp();
}
}
@ -255,10 +281,10 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
var bucketElementKeys = col.getBucketElementKeys(keys);
try (var readOptions = new ReadOptions()) {
var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
MemorySegment previousRawBucket = Utils.toMemorySegment(arena, previousRawBucketByteArray);
MemorySegment previousRawBucket = toMemorySegment(previousRawBucketByteArray);
var bucket = new Bucket(col, previousRawBucket);
previousValue = bucket.addElement(bucketElementKeys, value);
tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(bucket.toSegment(arena)));
tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(bucket.toSegment(arena)));
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_1, e);
}
@ -281,7 +307,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
} else {
previousValueByteArray = db.get().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
}
previousValue = toMemorySegment(arena, previousValueByteArray);
previousValue = toMemorySegment(previousValueByteArray);
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e);
}
@ -289,7 +315,7 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
previousValue = null;
}
if (tx != null) {
tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(requireNonNullElse(value, NULL)));
tx.val().put(col.cfh(), Utils.toByteArray(calculatedKey), Utils.toByteArray(requireNonNullElse(value, NULL)));
} else {
try (var w = new WriteOptions()) {
db.get().put(col.cfh(), w, calculatedKey.asByteBuffer(), requireNonNullElse(value, NULL).asByteBuffer());
@ -384,18 +410,45 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
@Nullable MemorySegment[] endKeysExclusive,
boolean reverse,
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
return 0;
// Open an operation that ends when the iterator is closed
ops.beginOp();
try {
var col = getColumn(columnId);
RocksIterator it;
var ro = new ReadOptions();
if (transactionId > 0L) {
//noinspection resource
it = getTransaction(transactionId).val().getIterator(ro, col.cfh());
} else {
it = db.get().newIterator(col.cfh());
}
var itEntry = new REntry<>(it, new RocksDBObjects(ro));
return FastRandomUtils.allocateNewValue(its, itEntry, 1, Long.MAX_VALUE);
} catch (Throwable ex) {
ops.endOp();
throw ex;
}
}
@Override
public void closeIterator(long iteratorId) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try {
// Should close the iterator operation
throw new UnsupportedOperationException();
} finally {
ops.endOp();
}
}
@Override
public void seekTo(long iterationId, MemorySegment[] keys)
throws it.cavallium.rockserver.core.common.RocksDBException {
public void seekTo(long iterationId, MemorySegment[] keys) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try {
throw new UnsupportedOperationException();
} finally {
ops.endOp();
}
}
@Override
@ -403,7 +456,12 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try {
throw new UnsupportedOperationException();
} finally {
ops.endOp();
}
}
private MemorySegment dbGet(REntry<Transaction> tx,
@ -413,14 +471,14 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
MemorySegment calculatedKey) throws RocksDBException {
if (tx != null) {
var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
return Utils.toMemorySegment(arena, previousRawBucketByteArray);
return toMemorySegment(previousRawBucketByteArray);
} else {
var db = this.db.get();
if (USE_FAST_GET) {
return dbGetDirect(arena, readOptions, calculatedKey);
} else {
var previousRawBucketByteArray = db.get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
return Utils.toMemorySegment(arena, previousRawBucketByteArray);
return toMemorySegment(previousRawBucketByteArray);
}
}
}
@ -463,10 +521,6 @@ public class EmbeddedDB implements RocksDBAPI, Closeable {
return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize()) != -1;
}
public static byte[] toByteArray(MemorySegment memorySegment) {
return memorySegment.toArray(BIG_ENDIAN_BYTES);
}
private ColumnInstance getColumn(long columnId) {
var col = columns.get(columnId);
if (col != null) {

View File

@ -0,0 +1,8 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import org.rocksdb.Cache;
public interface CacheFactory {
Cache newCache(long size);
}

View File

@ -0,0 +1,12 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import org.rocksdb.Cache;
import org.rocksdb.ClockCache;
public class ClockCacheFactory implements CacheFactory {
@Override
public Cache newCache(long size) {
return new ClockCache(size);
}
}

View File

@ -0,0 +1,12 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import org.rocksdb.Cache;
import org.rocksdb.LRUCache;
public class LRUCacheFactory implements CacheFactory {
@Override
public Cache newCache(long size) {
return new LRUCache(size);
}
}

View File

@ -0,0 +1,547 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.config.*;
import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.*;
import org.rocksdb.util.SizeUnit;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
import static it.cavallium.rockserver.core.common.Utils.mapList;
import static java.lang.Boolean.parseBoolean;
import static java.util.Objects.requireNonNull;
import static org.rocksdb.ColumnFamilyOptionsInterface.DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET;
public class RocksDBLoader {
private static final boolean FOLLOW_ROCKSDB_OPTIMIZATIONS = true;
private static final boolean PARANOID_CHECKS
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.paranoid", "true"));
private static final boolean USE_CLOCK_CACHE
= Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.clockcache.enable", "false"));
private static final CacheFactory CACHE_FACTORY = USE_CLOCK_CACHE ? new ClockCacheFactory() : new LRUCacheFactory();
public static TransactionalDB load(@Nullable Path path, DatabaseConfig config, Logger logger) {
var refs = new RocksDBObjects();
var optionsWithCache = makeRocksDBOptions(path, config, refs, logger);
return loadDb(path, config, optionsWithCache, refs, logger);
}
record OptionsWithCache(DBOptions options, @Nullable Cache standardCache) {}
private static OptionsWithCache makeRocksDBOptions(@Nullable Path path, DatabaseConfig databaseOptions, RocksDBObjects refs, Logger logger) {
try {
// Get databases directory path
Path databasesDirPath;
if (path != null) {
databasesDirPath = path.toAbsolutePath().getParent();
// Create base directories
if (Files.notExists(databasesDirPath)) {
Files.createDirectories(databasesDirPath);
}
} else {
databasesDirPath = null;
}
List<VolumeConfig> volumeConfigs = getVolumeConfigs(databaseOptions);
// the Options class contains a set of configurable DB options
// that determines the behaviour of the database.
var options = new DBOptions();
refs.add(options);
options.setParanoidChecks(PARANOID_CHECKS);
options.setSkipCheckingSstFileSizesOnDbOpen(true);
options.setEnablePipelinedWrite(true);
var maxSubCompactions = Integer.parseInt(System.getProperty("it.cavallium.dbengine.compactions.max.sub", "-1"));
if (maxSubCompactions > 0) {
options.setMaxSubcompactions(maxSubCompactions);
}
var customWriteRate = Long.parseLong(System.getProperty("it.cavallium.dbengine.write.delayedrate", "-1"));
if (customWriteRate >= 0) {
options.setDelayedWriteRate(customWriteRate);
}
Optional.ofNullable(databaseOptions.global().logPath())
.map(Path::toString)
.ifPresent(options::setDbLogDir);
if (path != null) {
Optional.ofNullable(databaseOptions.global().walPath())
.map(Path::toString)
.ifPresent(options::setWalDir);
}
options.setCreateIfMissing(true);
options.setSkipStatsUpdateOnDbOpen(true);
options.setCreateMissingColumnFamilies(true);
options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
// todo: automatically flush every x seconds?
options.setManualWalFlush(true);
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
options.setAvoidFlushDuringRecovery(true); // Flush all WALs during startup
options.setWalRecoveryMode(databaseOptions.global().absoluteConsistency()
? WALRecoveryMode.AbsoluteConsistency
: WALRecoveryMode.PointInTimeRecovery); // Crash if the WALs are corrupted.Default: TolerateCorruptedTailRecords
options.setDeleteObsoleteFilesPeriodMicros(20 * 1000000); // 20 seconds
options.setKeepLogFileNum(10);
if (databasesDirPath != null) {
requireNonNull(databasesDirPath);
requireNonNull(path.getFileName());
List<DbPath> paths = mapList(convertPaths(databasesDirPath, path.getFileName(), volumeConfigs),
p -> new DbPath(p.path, p.targetSize)
);
options.setDbPaths(paths);
} else if (!volumeConfigs.isEmpty()) {
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "in-memory databases should not have any volume configured");
}
options.setMaxOpenFiles(Optional.ofNullable(databaseOptions.global().maximumOpenFiles()).orElse(-1));
options.setMaxFileOpeningThreads(Runtime.getRuntime().availableProcessors());
if (databaseOptions.global().spinning()) {
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
options.setUseFsync(false);
}
long writeBufferManagerSize = Optional.ofNullable(databaseOptions.global().writeBufferManager())
.map(DataSize::longValue)
.orElse(0L);
if (isDisableAutoCompactions()) {
options.setMaxBackgroundCompactions(0);
options.setMaxBackgroundJobs(0);
} else {
var backgroundJobs = Integer.parseInt(System.getProperty("it.cavallium.dbengine.jobs.background.num", "-1"));
if (backgroundJobs >= 0) {
options.setMaxBackgroundJobs(backgroundJobs);
}
}
Cache blockCache;
final boolean useDirectIO = path != null && databaseOptions.global().useDirectIo();
final boolean allowMmapReads = (path == null) || (!useDirectIO && databaseOptions.global().allowRocksdbMemoryMapping());
final boolean allowMmapWrites = (path != null) && (!useDirectIO && (databaseOptions.global().allowRocksdbMemoryMapping()
|| parseBoolean(System.getProperty("it.cavallium.dbengine.mmapwrites.enable", "false"))));
// todo: replace with a real option called database-write-buffer-size
// 0 = default = disabled
long dbWriteBufferSize = Long.parseLong(System.getProperty("it.cavallium.dbengine.dbwritebuffer.size", "0"));
options
.setDbWriteBufferSize(dbWriteBufferSize)
.setBytesPerSync(64 * SizeUnit.MB)
.setWalBytesPerSync(64 * SizeUnit.MB)
.setWalTtlSeconds(80) // Auto
.setWalSizeLimitMB(0) // Auto
.setMaxTotalWalSize(0) // AUto
;
if (path != null) {
blockCache = CACHE_FACTORY.newCache(writeBufferManagerSize + Optional.ofNullable(databaseOptions.global().blockCache()).map(DataSize::longValue).orElse( 512 * SizeUnit.MB));
refs.add(blockCache);
} else {
blockCache = null;
}
if (useDirectIO) {
options
// Option to enable readahead in compaction
// If not set, it will be set to 2MB internally
.setCompactionReadaheadSize(4 * SizeUnit.MB) // recommend at least 2MB
// Option to tune write buffer for direct writes
.setWritableFileMaxBufferSize(2 * SizeUnit.MB)
;
}
if (databaseOptions.global().spinning()) {
options
// method documentation
.setCompactionReadaheadSize(16 * SizeUnit.MB)
// guessed
.setWritableFileMaxBufferSize(8 * SizeUnit.MB);
}
options.setIncreaseParallelism(Runtime.getRuntime().availableProcessors());
if (path != null && writeBufferManagerSize > 0L) {
var writeBufferManager = new WriteBufferManager(writeBufferManagerSize, blockCache, false);
refs.add(writeBufferManager);
options.setWriteBufferManager(writeBufferManager);
}
if (useDirectIO) {
options
.setAllowMmapReads(false)
.setAllowMmapWrites(false)
.setUseDirectReads(true)
;
} else {
options
.setAllowMmapReads(allowMmapReads)
.setAllowMmapWrites(allowMmapWrites);
}
if (useDirectIO || !allowMmapWrites) {
options.setUseDirectIoForFlushAndCompaction(true);
}
return new OptionsWithCache(options, blockCache);
} catch (IOException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, e);
} catch (GestaltException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_CONFIG_ERROR, e);
}
}
private static List<VolumeConfig> getVolumeConfigs(DatabaseConfig databaseOptions) throws GestaltException {
try {
return List.of(databaseOptions.global().volumes());
} catch (GestaltException ex) {
if (ex.getMessage().equals("Failed to get proxy config while calling method: volumes in path: database.global.")) {
return List.of();
} else {
throw ex;
}
}
}
private static TransactionalDB loadDb(@Nullable Path path, DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) {
var rocksdbOptions = optionsWithCache.options();
try {
List<VolumeConfig> volumeConfigs = getVolumeConfigs(databaseOptions);
List<ColumnFamilyDescriptor> descriptors = new ArrayList<>();
var defaultColumnOptions = new ColumnFamilyOptions();
refs.add(defaultColumnOptions);
descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultColumnOptions));
var rocksLogger = new RocksLogger(rocksdbOptions, logger);
var columnConfigs = databaseOptions.global().columnOptions();
SequencedMap<String, FallbackColumnConfig> columnConfigMap = new LinkedHashMap<>();
for (NamedColumnConfig columnConfig : columnConfigs) {
columnConfigMap.put(columnConfig.name(), columnConfig);
}
if (path != null) {
List<String> existingColumnFamilies;
try (var options = new Options()) {
options.setCreateIfMissing(true);
existingColumnFamilies = mapList(RocksDB.listColumnFamilies(options, path.toString()), b -> new String(b, StandardCharsets.UTF_8));
}
for (String existingColumnFamily : existingColumnFamilies) {
columnConfigMap.putIfAbsent(existingColumnFamily, databaseOptions.global().fallbackColumnOptions());
}
}
for (Map.Entry<String, FallbackColumnConfig> entry : columnConfigMap.entrySet()) {
String name = entry.getKey();
FallbackColumnConfig columnOptions = entry.getValue();
if (columnOptions instanceof NamedColumnConfig namedColumnConfig && !namedColumnConfig.name().equals(name)) {
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Wrong column config name: " + name);
}
var columnFamilyOptions = new ColumnFamilyOptions();
refs.add(columnFamilyOptions);
//noinspection ConstantConditions
if (columnOptions.memtableMemoryBudgetBytes() != null) {
// about 512MB of ram will be used for level style compaction
columnFamilyOptions.optimizeLevelStyleCompaction(Optional.ofNullable(columnOptions.memtableMemoryBudgetBytes())
.map(DataSize::longValue)
.orElse(DEFAULT_COMPACTION_MEMTABLE_MEMORY_BUDGET));
}
if (isDisableAutoCompactions()) {
columnFamilyOptions.setDisableAutoCompactions(true);
}
try {
columnFamilyOptions.setPrepopulateBlobCache(PrepopulateBlobCache.PREPOPULATE_BLOB_FLUSH_ONLY);
} catch (Throwable ex) {
logger.log(Level.SEVERE, "Failed to set prepopulate blob cache", ex);
}
// This option is not supported with multiple db paths
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
boolean dynamicLevelBytes = volumeConfigs.size() <= 1;
if (dynamicLevelBytes) {
columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true);
} else {
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setMaxBytesForLevelMultiplier(10);
}
if (isDisableAutoCompactions()) {
columnFamilyOptions.setLevel0FileNumCompactionTrigger(-1);
} else if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
// ArangoDB uses a value of 2: https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
// Higher values speed up writes, but slow down reads
columnFamilyOptions.setLevel0FileNumCompactionTrigger(2);
}
if (isDisableSlowdown()) {
columnFamilyOptions.setLevel0SlowdownWritesTrigger(-1);
columnFamilyOptions.setLevel0StopWritesTrigger(Integer.MAX_VALUE);
columnFamilyOptions.setHardPendingCompactionBytesLimit(Long.MAX_VALUE);
columnFamilyOptions.setSoftPendingCompactionBytesLimit(Long.MAX_VALUE);
}
{
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0SlowdownWritesTrigger(20);
// https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html
columnFamilyOptions.setLevel0StopWritesTrigger(36);
}
if (columnOptions.levels().length > 0) {
columnFamilyOptions.setNumLevels(columnOptions.levels().length);
var firstLevelOptions = getRocksLevelOptions(columnOptions.levels()[0], refs);
columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType);
columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions);
var lastLevelOptions = getRocksLevelOptions(columnOptions
.levels()[columnOptions.levels().length - 1], refs);
columnFamilyOptions.setBottommostCompressionType(lastLevelOptions.compressionType);
columnFamilyOptions.setBottommostCompressionOptions(lastLevelOptions.compressionOptions);
List<CompressionType> compressionPerLevel = new ArrayList<>();
for (ColumnLevelConfig columnLevelConfig : List.of(columnOptions.levels())) {
CompressionType compression = columnLevelConfig.compression();
compressionPerLevel.add(compression);
}
columnFamilyOptions.setCompressionPerLevel(compressionPerLevel);
} else {
columnFamilyOptions.setNumLevels(7);
List<CompressionType> compressionTypes = new ArrayList<>(7);
for (int i = 0; i < 7; i++) {
if (i < 2) {
compressionTypes.add(CompressionType.NO_COMPRESSION);
} else {
compressionTypes.add(CompressionType.LZ4_COMPRESSION);
}
}
columnFamilyOptions.setBottommostCompressionType(CompressionType.LZ4HC_COMPRESSION);
var compressionOptions = new CompressionOptions()
.setEnabled(true)
.setMaxDictBytes(32768);
refs.add(compressionOptions);
columnFamilyOptions.setBottommostCompressionOptions(compressionOptions);
columnFamilyOptions.setCompressionPerLevel(compressionTypes);
}
final BlockBasedTableConfig tableOptions = new BlockBasedTableConfig();
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB);
}
Optional.ofNullable(columnOptions.writeBufferSize())
.map(DataSize::longValue)
.ifPresent(columnFamilyOptions::setWriteBufferSize);
columnFamilyOptions.setMaxWriteBufferNumberToMaintain(1);
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setVerifyCompression(false);
}
// If OptimizeFiltersForHits == true: memory size = bitsPerKey * (totalKeys * 0.1)
// If OptimizeFiltersForHits == false: memory size = bitsPerKey * totalKeys
BloomFilterConfig filter = null;
BloomFilterConfig bloomFilterConfig = columnOptions.bloomFilter();
if (bloomFilterConfig != null) filter = bloomFilterConfig;
if (filter == null) {
if (path == null) {
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Please set a bloom filter. It's required for in-memory databases");
}
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(null);
}
} else {
final BloomFilter bloomFilter = new BloomFilter(filter.bitsPerKey());
refs.add(bloomFilter);
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig.setFilterPolicy(bloomFilter);
}
}
boolean cacheIndexAndFilterBlocks = path != null && Optional.ofNullable(columnOptions.cacheIndexAndFilterBlocks())
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.orElse(true);
if (databaseOptions.global().spinning()) {
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// cacheIndexAndFilterBlocks = true;
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMinWriteBufferNumberToMerge(3);
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setMaxWriteBufferNumber(4);
}
}
if (tableOptions instanceof BlockBasedTableConfig blockBasedTableConfig) {
blockBasedTableConfig
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
// http://rocksdb.org/blog/2018/08/23/data-block-hash-index.html
.setDataBlockHashTableUtilRatio(0.75)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinTopLevelIndexAndFilter(true)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setPinL0FilterAndIndexBlocksInCache(path != null)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setCacheIndexAndFilterBlocks(cacheIndexAndFilterBlocks)
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
// Enabling partition filters increase the reads by 2x
.setPartitionFilters(Optional.ofNullable(columnOptions.partitionFilters()).orElse(false))
// https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters
.setIndexType(path == null ? IndexType.kHashSearch : Optional.ofNullable(columnOptions.partitionFilters()).orElse(false) ? IndexType.kTwoLevelIndexSearch : IndexType.kBinarySearch)
.setChecksumType(path == null ? ChecksumType.kNoChecksum : ChecksumType.kXXH3)
// Spinning disks: 64KiB to 256KiB (also 512KiB). SSDs: 16KiB
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
.setBlockSize(path == null ? 4096 : Optional.ofNullable(columnOptions.blockSize()).map(DataSize::longValue).orElse((databaseOptions.global().spinning() ? 128 : 16) * 1024L))
.setBlockCache(optionsWithCache.standardCache())
.setNoBlockCache(optionsWithCache.standardCache() == null);
}
if (path == null) {
columnFamilyOptions.useCappedPrefixExtractor(4);
tableOptions.setBlockRestartInterval(4);
}
columnFamilyOptions.setTableFormatConfig(tableOptions);
columnFamilyOptions.setCompactionPriority(CompactionPriority.MinOverlappingRatio);
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
BloomFilterConfig bloomFilterOptions = columnOptions.bloomFilter();
if (bloomFilterOptions != null) {
// https://github.com/facebook/rocksdb/wiki/Tuning-RocksDB-on-Spinning-Disks
// https://github.com/EighteenZi/rocksdb_wiki/blob/master/RocksDB-Tuning-Guide.md#throughput-gap-between-random-read-vs-sequential-read-is-much-higher-in-spinning-disks-suggestions=
boolean optimizeForHits = databaseOptions.global().spinning();
Boolean value = bloomFilterOptions.optimizeForHits();
if (value != null) optimizeForHits = value;
columnFamilyOptions.setOptimizeFiltersForHits(optimizeForHits);
}
if (!FOLLOW_ROCKSDB_OPTIMIZATIONS) {
// // Increasing this value can reduce the frequency of compaction and reduce write amplification,
// // but it will also cause old data to be unable to be cleaned up in time, thus increasing read amplification.
// // This parameter is not easy to adjust. It is generally not recommended to set it above 256MB.
// https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html
columnFamilyOptions.setTargetFileSizeBase(64 * SizeUnit.MB);
// // For each level up, the threshold is multiplied by the factor target_file_size_multiplier
// // (but the default value is 1, which means that the maximum sstable of each level is the same).
columnFamilyOptions.setTargetFileSizeMultiplier(2);
}
descriptors.add(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.US_ASCII), columnFamilyOptions));
}
// Get databases directory path
String definitiveDbPathString;
if (path != null) {
Path databasesDirPath = path.toAbsolutePath().getParent();
definitiveDbPathString = databasesDirPath.toString() + File.separatorChar + path.getFileName();
} else {
try {
definitiveDbPathString = Files.createTempDirectory("temp-rocksdb").toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
var handles = new ArrayList<ColumnFamilyHandle>();
RocksDB db;
// a factory method that returns a RocksDB instance
if (databaseOptions.global().optimistic()) {
db = OptimisticTransactionDB.open(rocksdbOptions, definitiveDbPathString, descriptors, handles);
} else {
var transactionOptions = new TransactionDBOptions()
.setWritePolicy(TxnDBWritePolicy.WRITE_COMMITTED)
.setTransactionLockTimeout(5000)
.setDefaultLockTimeout(5000);
refs.add(transactionOptions);
db = TransactionDB.open(rocksdbOptions,
transactionOptions,
definitiveDbPathString,
descriptors,
handles
);
}
handles.forEach(refs::add);
try {
for (ColumnFamilyHandle cfh : handles) {
var props = db.getProperty(cfh, "rocksdb.stats");
logger.log(Level.FINEST, "Stats for database column {1}: {2}",
new Object[]{new String(cfh.getName(), StandardCharsets.UTF_8),
props}
);
}
} catch (RocksDBException ex) {
logger.log(Level.FINE, "Failed to obtain stats", ex);
}
return TransactionalDB.create(definitiveDbPathString, db);
} catch (RocksDBException ex) {
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
} catch (GestaltException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.CONFIG_ERROR, "Failed to load rocksdb", e);
}
}
record DbPathRecord(Path path, long targetSize) {}
private static List<DbPathRecord> convertPaths(Path databasesDirPath, Path path, List<VolumeConfig> volumes) throws GestaltException {
var paths = new ArrayList<DbPathRecord>(volumes.size());
if (volumes.isEmpty()) {
return List.of(new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_hot"),
0), // Legacy
new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_cold"),
0), // Legacy
new DbPathRecord(databasesDirPath.resolve(path.getFileName() + "_colder"),
1000L * 1024L * 1024L * 1024L) // 1000GiB
); // Legacy
}
for (var volume : volumes) {
Path volumePath;
if (volume.volumePath().isAbsolute()) {
volumePath = volume.volumePath();
} else {
volumePath = databasesDirPath.resolve(volume.volumePath());
}
paths.add(new DbPathRecord(volumePath, volume.targetSizeBytes()));
}
return paths;
}
public static boolean isDisableAutoCompactions() {
return parseBoolean(System.getProperty("it.cavallium.dbengine.compactions.auto.disable", "false"));
}
public static boolean isDisableSlowdown() {
return isDisableAutoCompactions()
|| parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"));
}
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
private static RocksLevelOptions getRocksLevelOptions(ColumnLevelConfig levelOptions, RocksDBObjects refs) throws GestaltException {
var compressionType = levelOptions.compression();
var compressionOptions = new CompressionOptions();
refs.add(compressionOptions);
if (compressionType != CompressionType.NO_COMPRESSION) {
compressionOptions.setEnabled(true);
compressionOptions.setMaxDictBytes(Math.toIntExact(levelOptions.maxDictBytes().longValue()));
} else {
compressionOptions.setEnabled(false);
}
return new RocksLevelOptions(compressionType, compressionOptions);
}
}

View File

@ -0,0 +1,11 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import org.rocksdb.DBOptions;
import java.util.logging.Logger;
public class RocksLogger {
public RocksLogger(DBOptions rocksdbOptions, Logger logger) {
}
}

View File

@ -13,16 +13,18 @@ import org.rocksdb.WriteOptions;
public sealed interface TransactionalDB extends Closeable {
static TransactionalDB create(RocksDB db) {
static TransactionalDB create(String path, RocksDB db) {
return switch (db) {
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(optimisticTransactionDB);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(transactionDB);
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB);
default -> throw new UnsupportedOperationException("This database is not transactional");
};
}
TransactionalOptions createTransactionalOptions();
String getPath();
RocksDB get();
/**
* Starts a new Transaction.
@ -89,9 +91,11 @@ public sealed interface TransactionalDB extends Closeable {
final class PessimisticTransactionalDB implements TransactionalDB {
private final String path;
private final TransactionDB db;
public PessimisticTransactionalDB(TransactionDB db) {
public PessimisticTransactionalDB(String path, TransactionDB db) {
this.path = path;
this.db = db;
}
@ -100,6 +104,11 @@ public sealed interface TransactionalDB extends Closeable {
return new TransactionalOptionsPessimistic(new TransactionOptions());
}
@Override
public String getPath() {
return path;
}
@Override
public RocksDB get() {
return db;
@ -153,9 +162,11 @@ public sealed interface TransactionalDB extends Closeable {
final class OptimisticTransactionalDB implements TransactionalDB {
private final String path;
private final OptimisticTransactionDB db;
public OptimisticTransactionalDB(OptimisticTransactionDB db) {
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db) {
this.path = path;
this.db = db;
}
@ -164,6 +175,11 @@ public sealed interface TransactionalDB extends Closeable {
return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions());
}
@Override
public String getPath() {
return path;
}
@Override
public RocksDB get() {
return db;

View File

@ -4,6 +4,8 @@ database: {
enable-column-bug: false
# Enable to adapt the database to spinning disk
spinning: false
# Enable to require absolute consistency after a crash. False to use the PointInTime recovery strategy
absolute-consistency: true
# Error checking
checksum: true
# Use direct I/O in RocksDB databases (Higher I/O read throughput but OS cache is not used, less swapping, less memory pressure)
@ -14,6 +16,9 @@ database: {
# If the maximum open files count is -1, the initial startup time will be slower.
# If "cacheIndexAndFilterBlocks" is false, the memory will rise when the number of open files rises.
maximum-open-files: -1
# RocksDB data volumes.
volumes: []
# Optimistic transactions
optimistic: true
# Database block cache size
block-cache: 512MiB
@ -85,6 +90,8 @@ database: {
# or if you want to use the "memtable-memory-budget-size" logic.
# Remember that there are "max-write-buffer-number" in memory, 2 by default
write-buffer-size: 200MiB
# Enable blob files
blob-files: false
}
column-options: [
${database.global.fallback-column-options} {

View File

@ -3,50 +3,57 @@ package it.cavallium.rockserver.core.impl.test;
import it.cavallium.rockserver.core.client.EmbeddedConnection;
import it.cavallium.rockserver.core.common.Callback.CallbackDelta;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Delta;
import java.io.File;
import it.cavallium.rockserver.core.common.Utils;
import org.junit.jupiter.api.Assertions;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicInteger;
class EmbeddedDBTest {
private Path dir;
private EmbeddedConnection db;
private long colId;
@org.junit.jupiter.api.BeforeEach
void setUp() throws IOException {
this.dir = Files.createTempDirectory("db-test");
db = new EmbeddedConnection(dir, "test", null);
if (System.getProperty("rockserver.core.print-config", null) == null) {
System.setProperty("rockserver.core.print-config", "false");
}
db = new EmbeddedConnection(null, "test", null);
this.colId = db.createColumn("put-1", new ColumnSchema(new int[]{1, 2, 1, Integer.BYTES, Integer.BYTES}, 2, true));
}
@org.junit.jupiter.api.AfterEach
void tearDown() throws IOException {
try (Stream<Path> walk = Files.walk(dir)) {
db.close();
walk.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.peek(System.out::println)
.forEach(File::delete);
}
db.deleteColumn(colId);
db.close();
}
@org.junit.jupiter.api.Test
void put() {
var colId = db.createColumn("put-1", new ColumnSchema(new int[]{16, 16, 16, 32, 32}, 2, true));
db.put(0, colId, null, null, new CallbackDelta<MemorySegment>() {
@Override
public void onSuccess(Delta<MemorySegment> previous) {
}
var key = new MemorySegment[] {
Utils.toMemorySegment(new byte[] {3}),
Utils.toMemorySegment(new byte[] {4, 6}),
Utils.toMemorySegment(new byte[] {3}),
Utils.toMemorySegment(new byte[] {1, 2, 3}),
Utils.toMemorySegment(new byte[] {0, 0, 3, 6, 7, 8})
};
var value1 = MemorySegment.ofArray(new byte[] {0, 0, 3});
AtomicInteger callbackCalled = new AtomicInteger();
db.put(0, colId, key, value1, (CallbackDelta<MemorySegment>) prev -> {
callbackCalled.incrementAndGet();
Assertions.assertNull(prev);
});
Assertions.assertEquals(1, callbackCalled.get());
db.put(0, colId, key, MemorySegment.ofArray(new byte[] {0, 0, 5}), (CallbackDelta<MemorySegment>) prev -> {
callbackCalled.incrementAndGet();
Utils.(value1);
});
db.deleteColumn(colId);
}
@org.junit.jupiter.api.Test
void get() {
throw new UnsupportedOperationException();
}
}