Add client, server

This commit is contained in:
Andrea Cavalli 2023-12-06 00:43:35 +01:00
parent 86afb28db0
commit 584aed3063
42 changed files with 3109 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.idea/
target/

243
pom.xml Normal file
View File

@ -0,0 +1,243 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>it.cavallium.rockserver</groupId>
<artifactId>rockserver-core</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<native.maven.plugin.version>0.9.28</native.maven.plugin.version>
<gestalt.version>0.22.0</gestalt.version>
<imageName>rockserver-core</imageName>
<mainClass>it.cavallium.rockserver.core.Main</mainClass>
</properties>
<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>8.8.1</version>
</dependency>
<dependency>
<groupId>net.sourceforge.argparse4j</groupId>
<artifactId>argparse4j</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>com.github.seancfoley</groupId>
<artifactId>ipaddress</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.4.3</version>
</dependency>
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>24.0.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.github.stephenc.high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.1.4</version>
</dependency>
<dependency>
<groupId>com.github.gestalt-config</groupId>
<artifactId>gestalt-core</artifactId>
<version>${gestalt.version}</version>
</dependency>
<dependency>
<groupId>com.github.gestalt-config</groupId>
<artifactId>gestalt-hocon</artifactId>
<version>${gestalt.version}</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>1.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.9.2</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>java</id>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>it.cavallium.rockserver.core.Main</mainClass>
<arguments>
<argument>--enable-preview</argument>
</arguments>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.source}</target>
<enablePreview>true</enablePreview>
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<argLine>--enable-preview</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>it.cavallium.rockserver.core.Main</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>it.cavallium.rockserver.core.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>native</id>
<build>
<plugins>
<plugin>
<groupId>org.graalvm.buildtools</groupId>
<artifactId>native-maven-plugin</artifactId>
<version>${native.maven.plugin.version}</version>
<extensions>true</extensions>
<executions>
<execution>
<id>build-native</id>
<goals>
<goal>compile-no-fork</goal>
</goals>
<phase>package</phase>
</execution>
<execution>
<id>test-native</id>
<goals>
<goal>test</goal>
</goals>
<phase>test</phase>
</execution>
</executions>
<configuration>
<fallback>false</fallback>
<agent>
<enabled>true</enabled>
</agent>
<buildArgs>
<buildArg>--strict-image-heap</buildArg>
<buildArg>-march=native</buildArg>
<buildArg>-H:IncludeResourceBundles=net.sourceforge.argparse4j.internal.ArgumentParserImpl-en_US</buildArg>
<buildArg>-O1</buildArg>
<buildArg>--enable-preview</buildArg>
<buildArg>--no-fallback</buildArg>
<buildArg>--gc=G1</buildArg>
</buildArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>java-agent</id>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>java</executable>
<workingDirectory>${project.build.directory}</workingDirectory>
<arguments>
<argument>--enable-preview</argument>
<argument>-classpath</argument>
<classpath/>
<argument>${mainClass}</argument>
</arguments>
</configuration>
</execution>
<execution>
<id>native</id>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<executable>${project.build.directory}/${imageName}</executable>
<workingDirectory>${project.build.directory}</workingDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -0,0 +1,102 @@
package it.cavallium.rockserver.core;
import static java.util.Objects.requireNonNull;
import inet.ipaddr.HostName;
import it.cavallium.rockserver.core.client.ClientBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnixDomainSocketAddress;
import java.net.spi.InetAddressResolver;
import java.net.spi.InetAddressResolverProvider;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.rocksdb.RocksDB;
public class Main {
private static final Logger LOG = Logger.getLogger("rockserver-core");
public static void main(String[] args) throws IOException, URISyntaxException {
ArgumentParser parser = ArgumentParsers.newFor("rockserver-core").build()
.defaultHelp(true)
.description("RocksDB server core");
parser.addArgument("-u", "--url")
.type(String.class)
.setDefault("file://" + System.getProperty("user.home") + "/rockserver-core-db")
.help("Specify database rocksdb://hostname:port, or unix://<path>, or file://<path>");
parser.addArgument("-n", "--name")
.type(String.class)
.setDefault("main")
.help("Specify database name");
parser.addArgument("-c", "--config")
.type(Path.class)
.help("Specify the rockserver-core.conf file path. Do not set if the database is not local");
parser.addArgument("-p", "--print-default-config")
.type(Boolean.class)
.setDefault(false)
.help("Print the default configs");
Namespace ns = null;
try {
ns = parser.parseArgs(args);
} catch (ArgumentParserException e) {
parser.handleError(e);
System.exit(1);
}
var clientBuilder = new it.cavallium.rockserver.core.client.ClientBuilder();
if (ns.getBoolean("print_default_config")) {
requireNonNull(Main.class.getClassLoader()
.getResourceAsStream("it/cavallium/rockserver/core/resources/default.conf"))
.transferTo(System.out);
System.exit(0);
return;
}
LOG.info("Starting...");
RocksDB.loadLibrary();
var rawUrl = ns.getString("url");
var name = ns.getString("name");
var config = ns.getString("config");
var url = new URI(rawUrl);
if (config != null) {
if (!url.getScheme().equals("file")) {
System.err.println("Do not set --config if the database is not local!");
System.exit(1);
return;
} else {
clientBuilder.setEmbeddedConfig(Path.of(config));
}
}
switch (url.getScheme()) {
case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(url.getPath())));
case "file" -> clientBuilder.setEmbedded(Path.of(url.getPath()));
case "rocksdb" -> clientBuilder.setAddress(new HostName(url.getHost()).asInetSocketAddress());
default -> throw new IllegalArgumentException("Invalid scheme: " + url.getScheme());
}
clientBuilder.setName(name);
try (var connection = clientBuilder.build()) {
LOG.log(Level.INFO, "Connected to {0}", connection);
CountDownLatch shutdownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(shutdownLatch::countDown));
LOG.info("Shutting down...");
}
LOG.info("Shut down successfully");
}
}

View File

@ -0,0 +1,22 @@
package it.cavallium.rockserver.core.client;
import java.io.IOException;
public abstract class BaseConnection implements RocksDBConnection {
private final String name;
public BaseConnection(String name) {
this.name = name;
}
@Override
public void close() throws IOException {
}
@Override
public String toString() {
return "db \"" + name + "\" (" + getUrl() + ")";
}
}

View File

@ -0,0 +1,46 @@
package it.cavallium.rockserver.core.client;
import java.net.InetSocketAddress;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Path;
public class ClientBuilder {
private InetSocketAddress iNetAddress;
private UnixDomainSocketAddress unixAddress;
private Path embedded;
private String name;
private Path embeddedConfig;
public void setEmbedded(Path path) {
this.embedded = path;
}
public void setUnixSocket(UnixDomainSocketAddress address) {
this.unixAddress = address;
}
public void setAddress(InetSocketAddress address) {
this.iNetAddress = address;
}
public void setName(String name) {
this.name = name;
}
public void setEmbeddedConfig(Path embeddedConfig) {
this.embeddedConfig = embeddedConfig;
}
public RocksDBConnection build() {
if (embedded != null) {
return new EmbeddedConnection(embedded, name, embeddedConfig);
} else if (unixAddress != null) {
return new SocketConnectionUnix(unixAddress, name);
} else if (iNetAddress != null) {
return new SocketConnectionInet(iNetAddress, name);
} else {
throw new UnsupportedOperationException("Please set a connection type");
}
}
}

View File

@ -0,0 +1,102 @@
package it.cavallium.rockserver.core.client;
import it.cavallium.rockserver.core.common.Callback.GetCallback;
import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
import it.cavallium.rockserver.core.common.Callback.PutCallback;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.impl.EmbeddedDB;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.net.URI;
import java.nio.file.Path;
import org.jetbrains.annotations.Nullable;
public class EmbeddedConnection extends BaseConnection {
private final EmbeddedDB db;
public EmbeddedConnection(Path path, String name, Path embeddedConfig) {
super(name);
this.db = new EmbeddedDB(path, embeddedConfig);
}
@Override
public void close() throws IOException {
db.close();
super.close();
}
@Override
public URI getUrl() {
return db.getPath().toUri();
}
@Override
public long openTransaction(long timeoutMs) {
return db.openTransaction(timeoutMs);
}
@Override
public void closeTransaction(long transactionId) {
db.closeTransaction(transactionId);
}
@Override
public long createColumn(String name, ColumnSchema schema) {
return db.createColumn(name, schema);
}
@Override
public void deleteColumn(long columnId) throws RocksDBException {
db.deleteColumn(columnId);
}
@Override
public long getColumnId(String name) {
return db.getColumnId(name);
}
@Override
public void put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws RocksDBException {
db.put(transactionId, columnId, keys, value, callback);
}
@Override
public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback<? super MemorySegment> callback)
throws RocksDBException {
db.get(transactionId, columnId, keys, callback);
}
@Override
public long openIterator(long transactionId,
long columnId,
MemorySegment[] startKeysInclusive,
@Nullable MemorySegment[] endKeysExclusive,
boolean reverse,
long timeoutMs) throws RocksDBException {
return db.openIterator(transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs);
}
@Override
public void closeIterator(long iteratorId) throws RocksDBException {
db.closeIterator(iteratorId);
}
@Override
public void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException {
db.seekTo(iterationId, keys);
}
@Override
public void subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws RocksDBException {
db.subsequent(iterationId, skipCount, takeCount, callback);
}
}

View File

@ -0,0 +1,16 @@
package it.cavallium.rockserver.core.client;
import it.cavallium.rockserver.core.common.RocksDBAPI;
import java.io.Closeable;
import java.net.URI;
import org.jetbrains.annotations.Nullable;
public interface RocksDBConnection extends Closeable, RocksDBAPI {
/**
* Get connection url
*
* @return connection url
*/
URI getUrl();
}

View File

@ -0,0 +1,97 @@
package it.cavallium.rockserver.core.client;
import it.cavallium.rockserver.core.common.Callback.GetCallback;
import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
import it.cavallium.rockserver.core.common.Callback.PutCallback;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.RocksDBException;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.net.SocketAddress;
import org.jetbrains.annotations.Nullable;
public abstract class SocketConnection extends BaseConnection {
private final SocketAddress address;
public SocketConnection(SocketAddress address, String name) {
super(name);
this.address = address;
}
public SocketAddress getAddress() {
return address;
}
@Override
public void close() throws IOException {
super.close();
}
@Override
public long openTransaction(long timeoutMs) {
throw new UnsupportedOperationException();
}
@Override
public void closeTransaction(long transactionId) {
throw new UnsupportedOperationException();
}
@Override
public long createColumn(String name, ColumnSchema schema) {
throw new UnsupportedOperationException();
}
@Override
public void deleteColumn(long columnId) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public long getColumnId(String name) {
throw new UnsupportedOperationException();
}
@Override
public void put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback<? super MemorySegment> callback)
throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public long openIterator(long transactionId,
long columnId,
MemorySegment[] startKeysInclusive,
@Nullable MemorySegment[] endKeysExclusive,
boolean reverse,
long timeoutMs) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void closeIterator(long iteratorId) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException {
throw new UnsupportedOperationException();
}
@Override
public void subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws RocksDBException {
throw new UnsupportedOperationException();
}
}

View File

@ -0,0 +1,22 @@
package it.cavallium.rockserver.core.client;
import java.net.InetSocketAddress;
import java.net.URI;
public class SocketConnectionInet extends SocketConnection {
public SocketConnectionInet(InetSocketAddress address, String name) {
super(address, name);
}
@Override
public InetSocketAddress getAddress() {
return (InetSocketAddress) super.getAddress();
}
@Override
public URI getUrl() {
return URI.create("rocksdb://" + getAddress().getHostString());
}
}

View File

@ -0,0 +1,30 @@
package it.cavallium.rockserver.core.client;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Files;
public class SocketConnectionUnix extends SocketConnection {
public SocketConnectionUnix(UnixDomainSocketAddress address, String name) {
super(address, name);
}
@Override
public UnixDomainSocketAddress getAddress() {
return (UnixDomainSocketAddress) super.getAddress();
}
@Override
public void close() throws IOException {
super.close();
Files.deleteIfExists(getAddress().getPath());
}
@Override
public URI getUrl() {
return URI.create("unix://" + getAddress().getPath());
}
}

View File

@ -0,0 +1,58 @@
package it.cavallium.rockserver.core.common;
import java.util.List;
import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
public sealed interface Callback<T> {
static boolean requiresGettingPreviousValue(PutCallback<?> callback) {
return callback instanceof CallbackPrevious<?>
|| callback instanceof CallbackDelta<?>
|| callback instanceof CallbackChanged;
}
static boolean requiresGettingCurrentValue(GetCallback<?> callback) {
return callback instanceof CallbackCurrent<?>;
}
sealed interface PutCallback<T> extends Callback<T> {}
sealed interface PatchCallback<T> extends Callback<T> {}
sealed interface GetCallback<T> extends Callback<T> {}
sealed interface IteratorCallback<T> extends Callback<T> {}
non-sealed interface CallbackVoid<T> extends PutCallback<T>, PatchCallback<T>, IteratorCallback<T>, GetCallback<T> {}
non-sealed interface CallbackPrevious<T> extends PutCallback<T> {
void onPrevious(@Nullable T previous);
}
non-sealed interface CallbackCurrent<T> extends GetCallback<T> {
void onCurrent(@Nullable T previous);
}
non-sealed interface CallbackExists<T> extends GetCallback<T>, IteratorCallback<T> {
void onExists(boolean exists);
}
non-sealed interface CallbackDelta<T> extends PutCallback<T> {
void onSuccess(Delta<T> previous);
}
non-sealed interface CallbackMulti<T> extends IteratorCallback<T> {
void onSuccess(List<Entry<T, T>> elements);
}
non-sealed interface CallbackChanged extends PutCallback<Object>, PatchCallback<Object> {
void onChanged(boolean changed);
}
}

View File

@ -0,0 +1,36 @@
package it.cavallium.rockserver.core.common;
public record ColumnSchema(int[] keys, int variableLengthKeysCount, boolean hasValue) {
public ColumnSchema {
if (variableLengthKeysCount > keys.length) {
throw new IllegalArgumentException("variable length keys count must be less or equal keysCount");
}
for (int i = 0; i < keys.length - variableLengthKeysCount; i++) {
if (keys[i] <= 0) {
throw new UnsupportedOperationException("Key length must be > 0");
}
}
for (int i = keys.length - variableLengthKeysCount; i < keys.length; i++) {
if (keys[i] <= 1) {
throw new UnsupportedOperationException("Key hash length must be > 1");
}
}
}
/**
* Keys with their length
* @return an array with the length of each key, variable-length keys must have the length of their hash
*/
@Override
public int[] keys() {
return keys;
}
/**
* The last n keys that are variable-length
*/
@Override
public int variableLengthKeysCount() {
return variableLengthKeysCount;
}
}

View File

@ -0,0 +1,5 @@
package it.cavallium.rockserver.core.common;
import org.jetbrains.annotations.Nullable;
public record Delta<T>(@Nullable T previous, @Nullable T current) {}

View File

@ -0,0 +1,112 @@
package it.cavallium.rockserver.core.common;
import it.cavallium.rockserver.core.common.Callback.GetCallback;
import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
import it.cavallium.rockserver.core.common.Callback.PutCallback;
import java.lang.foreign.MemorySegment;
import org.jetbrains.annotations.Nullable;
public interface RocksDBAPI {
/**
* Open a transaction
* @param timeoutMs timeout in milliseconds
* @return transaction id
*/
long openTransaction(long timeoutMs) throws RocksDBException;
/**
* Close a transaction
* @param transactionId transaction id to close
*/
void closeTransaction(long transactionId) throws RocksDBException;
/**
* Create a column
* @param name column name
* @param schema column key-value schema
* @return column id
*/
long createColumn(String name, ColumnSchema schema) throws RocksDBException;
/**
* Delete a column
* @param columnId column id
*/
void deleteColumn(long columnId) throws RocksDBException;
/**
* Get column id by name
* @param name column name
* @return column id
*/
long getColumnId(String name) throws RocksDBException;
/**
* Put an element into the specified position
* @param transactionId transaction id, or 0
* @param columnId column id
* @param keys column keys, or empty array if not needed
* @param value value, or null if not needed
* @param callback the callback will be executed on the same thread, exactly once.
*/
void put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws RocksDBException;
/**
* Get an element from the specified position
* @param transactionId transaction id, or 0
* @param columnId column id
* @param keys column keys, or empty array if not needed
* @param callback the callback will be executed on the same thread, exactly once.
*/
void get(long transactionId,
long columnId,
MemorySegment[] keys,
GetCallback<? super MemorySegment> callback) throws RocksDBException;
/**
* Open an iterator
* @param transactionId transaction id, or 0
* @param columnId column id
* @param startKeysInclusive start keys, inclusive. [] means "the beginning"
* @param endKeysExclusive end keys, exclusive. Null means "the end"
* @param reverse if true, seek in reverse direction
* @param timeoutMs timeout in milliseconds
* @return iterator id
*/
long openIterator(long transactionId,
long columnId,
MemorySegment[] startKeysInclusive,
@Nullable MemorySegment[] endKeysExclusive,
boolean reverse,
long timeoutMs) throws RocksDBException;
/**
* Close an iterator
* @param iteratorId iterator id
*/
void closeIterator(long iteratorId) throws RocksDBException;
/**
* Seek to the specific element during an iteration, or the subsequent one if not found
* @param iterationId iteration id
* @param keys keys, inclusive. [] means "the beginning"
*/
void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException;
/**
* Get the subsequent element during an iteration
* @param iterationId iteration id
* @param skipCount number of elements to skip
* @param takeCount number of elements to take
* @param callback the callback will be executed on the same thread, exactly once.
*/
void subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws RocksDBException;
}

View File

@ -0,0 +1,39 @@
package it.cavallium.rockserver.core.common;
public class RocksDBException extends RuntimeException {
private final RocksDBErrorType errorUniqueId;
public enum RocksDBErrorType {
PUT_UNKNOWN, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, VALUE_MUST_BE_NULL
}
public RocksDBException(RocksDBErrorType errorUniqueId, String message) {
super(message);
this.errorUniqueId = errorUniqueId;
}
public RocksDBException(RocksDBErrorType errorUniqueId, String message, Throwable ex) {
super(message, ex);
this.errorUniqueId = errorUniqueId;
}
public RocksDBException(RocksDBErrorType errorUniqueId, Throwable ex) {
super(ex.toString(), ex);
this.errorUniqueId = errorUniqueId;
}
public RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) {
this(errorUniqueId, ex.getMessage());
}
public RocksDBErrorType getErrorUniqueId() {
return errorUniqueId;
}
@Override
public String getLocalizedMessage() {
return "RocksDBError: [uid:" + errorUniqueId + "] " + getMessage();
}
}

View File

@ -0,0 +1,50 @@
package it.cavallium.rockserver.core.common;
import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
public class Utils {
/**
* Returns the value of the {@code int} argument, throwing an exception if the value overflows an {@code char}.
*
* @param value the int value
* @return the argument as a char
* @throws ArithmeticException if the {@code argument} overflows a char
* @since 1.8
*/
public static char toCharExact(int value) {
if ((char) value != value) {
throw new ArithmeticException("char overflow");
}
return (char) value;
}
/**
* Returns the value of the {@code long} argument, throwing an exception if the value overflows an {@code char}.
*
* @param value the long value
* @return the argument as a char
* @throws ArithmeticException if the {@code argument} overflows a char
* @since 1.8
*/
public static char toCharExact(long value) {
if ((char) value != value) {
throw new ArithmeticException("char overflow");
}
return (char) value;
}
@NotNull
public static MemorySegment toMemorySegment(Arena arena, byte @Nullable [] array) {
if (array != null) {
return arena.allocateArray(BIG_ENDIAN_BYTES, array);
} else {
return MemorySegment.NULL;
}
}
}

View File

@ -0,0 +1,17 @@
package it.cavallium.rockserver.core.config;
public interface BloomFilterConfig {
int bitsPerKey();
boolean optimizeForHits();
static String stringify(BloomFilterConfig o) {
return """
{
"bits-per-key": %d,
"optimize-for-hits": %b
}\
""".formatted(o.bitsPerKey(), o.optimizeForHits());
}
}

View File

@ -0,0 +1,194 @@
package it.cavallium.rockserver.core.config;
import java.io.Serial;
import java.text.CharacterIterator;
import java.text.StringCharacterIterator;
import org.jetbrains.annotations.NotNull;
@SuppressWarnings("unused")
public final class DataSize extends Number implements Comparable<DataSize> {
@Serial
private static final long serialVersionUID = 7213411239846723568L;
public static DataSize ZERO = new DataSize(0L);
public static DataSize ONE = new DataSize(1L);
public static DataSize KIB = new DataSize(1024L);
public static DataSize KB = new DataSize(1000L);
public static DataSize MIB = new DataSize(1024L * 1024);
public static DataSize MB = new DataSize(1000L * 1000);
public static DataSize GIB = new DataSize(1024L * 1024 * 1024);
public static DataSize GB = new DataSize(1000L * 1000 * 1000);
public static DataSize TIB = new DataSize(1024L * 1024 * 1024 * 1024);
public static DataSize TB = new DataSize(1000L * 1000 * 1000 * 1024);
public static DataSize PIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024);
public static DataSize PB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024);
public static DataSize EIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024 * 1024);
public static DataSize EB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024 * 1024);
public static DataSize MAX_VALUE = new DataSize(Long.MAX_VALUE);
private final long size;
public DataSize(long size) {
this.size = size;
}
public DataSize(String size) {
size = size.replaceAll("\\s|_", "");
switch (size) {
case "", "0", "-0", "+0" -> {
this.size = 0;
return;
}
case "", "inf", "infinite", "∞b" -> {
this.size = Long.MAX_VALUE;
return;
}
}
int numberStartOffset = 0;
int numberEndOffset = 0;
boolean negative = false;
{
boolean firstChar = true;
boolean numberMode = true;
for (char c : size.toCharArray()) {
if (c == '-') {
if (firstChar) {
negative = true;
numberStartOffset++;
numberEndOffset++;
} else {
throw new IllegalArgumentException("Found a minus character after index 0");
}
} else if (Character.isDigit(c)) {
if (numberMode) {
numberEndOffset++;
} else {
throw new IllegalArgumentException("Found a number after the unit");
}
} else if (Character.isLetter(c)) {
if (numberEndOffset - numberStartOffset <= 0) {
throw new IllegalArgumentException("No number found");
}
if (numberMode) {
numberMode = false;
}
} else {
throw new IllegalArgumentException("Unsupported character");
}
if (firstChar) {
firstChar = false;
}
}
}
var number = Long.parseUnsignedLong(size, numberStartOffset, numberEndOffset, 10);
if (numberEndOffset == size.length()) {
// No measurement
this.size = (negative ? -1 : 1) * number;
return;
}
// Measurements are like B, MB, or MiB, not longer
if (size.length() - numberEndOffset > 3) {
throw new IllegalArgumentException("Wrong measurement unit");
}
var scaleChar = size.charAt(numberEndOffset);
boolean powerOf2 = numberEndOffset + 1 < size.length() && size.charAt(numberEndOffset + 1) == 'i';
int k = powerOf2 ? 1024 : 1000;
var scale = switch (scaleChar) {
case 'B' -> 1;
case 'b' -> throw new IllegalArgumentException("Bits are not allowed");
case 'K', 'k' -> k;
case 'M', 'm' -> k * k;
case 'G', 'g' -> k * k * k;
case 'T', 't' -> k * k * k * k;
case 'P', 'p' -> k * k * k * k * k;
case 'E', 'e' -> k * k * k * k * k * k;
case 'Z', 'z' -> k * k * k * k * k * k * k;
case 'Y', 'y' -> k * k * k * k * k * k * k * k;
default -> throw new IllegalStateException("Unexpected value: " + scaleChar);
};
// if scale is 1, the unit should be "B", nothing more
if (scale == 1 && numberEndOffset + 1 != size.length()) {
throw new IllegalArgumentException("Invalid unit");
}
this.size = (negative ? -1 : 1) * number * scale;
}
public static Long get(DataSize value) {
if (value == null) {
return null;
} else {
return value.size;
}
}
public static long getOrElse(DataSize value, @NotNull DataSize defaultValue) {
if (value == null) {
return defaultValue.size;
} else {
return value.size;
}
}
@Override
public int intValue() {
if (size >= Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int) size;
}
@Override
public long longValue() {
return size;
}
@Override
public float floatValue() {
return size;
}
@Override
public double doubleValue() {
return size;
}
@Override
public String toString() {
return toString(true);
}
public String toString(boolean precise) {
boolean siUnits = size % 1000 == 0;
int k = siUnits ? 1000 : 1024;
long lSize = size;
CharacterIterator ci = new StringCharacterIterator((siUnits ? "k" : "K") + "MGTPEZY");
while ((precise ? lSize % k == 0 : lSize > k) && lSize != 0) {
lSize /= k;
ci.next();
}
if (lSize == size) {
return lSize + "B";
}
return lSize + "" + ci.previous() + (siUnits ? "B" : "iB");
}
@Override
public int compareTo(@NotNull DataSize anotherLong) {
return Long.compare(this.size, anotherLong.size);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof DataSize) {
return size == ((DataSize)obj).size;
}
return false;
}
@Override
public int hashCode() {
return Long.hashCode(size);
}
}

View File

@ -0,0 +1,11 @@
package it.cavallium.rockserver.core.config;
public enum DatabaseCompression {
PLAIN,
SNAPPY,
LZ4,
LZ4_HC,
ZSTD,
ZLIB,
BZLIB2
}

View File

@ -0,0 +1,13 @@
package it.cavallium.rockserver.core.config;
public interface DatabaseConfig {
GlobalDatabaseConfig global();
static String stringify(DatabaseConfig o) {
return """
{
"global": %s
}""".formatted(GlobalDatabaseConfig.stringify(o.global()));
}
}

View File

@ -0,0 +1,17 @@
package it.cavallium.rockserver.core.config;
public interface DatabaseLevel {
DatabaseCompression compression();
DataSize maxDictBytes();
static String stringify(DatabaseLevel o) {
return """
{
"compression": "%s",
"max-dict-bytes": "%s"
}\
""".formatted(o.compression(), o.maxDictBytes());
}
}

View File

@ -0,0 +1,44 @@
package it.cavallium.rockserver.core.config;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
public interface FallbackColumnOptions {
DatabaseLevel[] levels();
DataSize memtableMemoryBudgetBytes();
boolean cacheIndexAndFilterBlocks();
boolean partitionFilters();
BloomFilterConfig bloomFilter();
DataSize blockSize();
DataSize writeBufferSize();
static String stringify(FallbackColumnOptions o) {
return """
{
"levels": %s,
"memtable-memory-budget-bytes": "%s",
"cache-index-and-filter-blocks": %b,
"partition-filters": %s,
"bloom-filter": %s,
"block-size": "%s",
"write-buffer-size": "%s"
}\
""".formatted(Arrays.stream(Objects.requireNonNullElse(o.levels(), new DatabaseLevel[0]))
.map(DatabaseLevel::stringify).collect(Collectors.joining(",", "[", "]")),
o.memtableMemoryBudgetBytes(),
o.cacheIndexAndFilterBlocks(),
o.partitionFilters(),
BloomFilterConfig.stringify(o.bloomFilter()),
o.blockSize(),
o.writeBufferSize()
);
}
}

View File

@ -0,0 +1,61 @@
package it.cavallium.rockserver.core.config;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
public interface GlobalDatabaseConfig {
boolean spinning();
boolean checksum();
boolean useDirectIo();
boolean allowRocksdbMemoryMapping();
int maximumOpenFiles();
boolean optimistic();
DataSize blockCache();
DataSize writeBufferManager();
Path logPath();
FallbackColumnOptions fallbackColumnOptions();
NamedColumnOptions[] columnOptions();
static String stringify(GlobalDatabaseConfig o) {
return """
{
"spinning": %b,
"checksum": %b,
"use-direct-io": %b,
"allow-rocksdb-memory-mapping": %b,
"maximum-open-files": %d,
"optimistic": %b,
"block-cache": "%s",
"write-buffer-manager": "%s",
"log-path": "%s",
"fallback-column-options": %s,
"column-options": %s
}\
""".formatted(o.spinning(),
o.checksum(),
o.useDirectIo(),
o.allowRocksdbMemoryMapping(),
o.maximumOpenFiles(),
o.optimistic(),
o.blockCache(),
o.writeBufferManager(),
o.logPath(),
FallbackColumnOptions.stringify(o.fallbackColumnOptions()),
Arrays.stream(Objects.requireNonNullElse(o.columnOptions(), new NamedColumnOptions[0]))
.map(NamedColumnOptions::stringify)
.collect(Collectors.joining(",", "[", "]"))
);
}
}

View File

@ -0,0 +1,35 @@
package it.cavallium.rockserver.core.config;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
public interface NamedColumnOptions extends FallbackColumnOptions {
String name();
static String stringify(NamedColumnOptions o) {
return """
{
"name": "%s",
"levels": %s,
"memtable-memory-budget-bytes": "%s",
"cache-index-and-filter-blocks": %b,
"partition-filters": %s,
"bloom-filter": %s,
"block-size": "%s",
"write-buffer-size": "%s"
}\
""".formatted(o.name(),
(o.levels() != null ? List.of(o.levels()) : List.<DatabaseLevel>of()).stream()
.map(DatabaseLevel::stringify).collect(Collectors.joining(",", "[", "]")),
o.memtableMemoryBudgetBytes(),
o.cacheIndexAndFilterBlocks(),
o.partitionFilters(),
BloomFilterConfig.stringify(o.bloomFilter()),
o.blockSize(),
o.writeBufferSize()
);
}
}

View File

@ -0,0 +1,161 @@
package it.cavallium.rockserver.core.impl;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.ArrayList;
import java.util.Map;
import java.util.Map.Entry;
import org.jetbrains.annotations.Nullable;
public class Bucket {
private final ColumnInstance col;
private final ArrayList<Entry<MemorySegment[], MemorySegment>> elements;
public Bucket(ColumnInstance col) {
this(col, MemorySegment.NULL);
}
public Bucket(ColumnInstance col, MemorySegment rawBucketSegment) {
this.col = col;
long offset = 0;
this.elements = new ArrayList<>();
long rawBucketSegmentByteSize = rawBucketSegment.byteSize();
if (rawBucketSegmentByteSize > 0) {
var elements = rawBucketSegment.get(ValueLayout.JAVA_INT_UNALIGNED, offset);
offset += Integer.BYTES;
int elementI = 0;
while (elementI < elements) {
var elementKVSize = rawBucketSegment.get(ValueLayout.JAVA_INT_UNALIGNED, offset);
offset += Integer.BYTES;
MemorySegment[] bucketElementKeys;
{
int segmentOffset = 0;
var elementKVSegment = rawBucketSegment.asSlice(offset, elementKVSize, 1);
int readKeys = 0;
bucketElementKeys = new MemorySegment[col.schema().variableLengthKeysCount()];
while (readKeys < col.schema().variableLengthKeysCount()) {
var keyISize = elementKVSegment.get(ValueLayout.JAVA_CHAR_UNALIGNED, segmentOffset);
segmentOffset += Character.BYTES;
var elementKeyISegment = elementKVSegment.asSlice(segmentOffset, keyISize);
bucketElementKeys[readKeys] = elementKeyISegment;
segmentOffset += keyISize;
readKeys++;
}
MemorySegment bucketElementValues;
if (col.schema().hasValue()) {
bucketElementValues = elementKVSegment.asSlice(segmentOffset, elementKVSize - segmentOffset);
segmentOffset = elementKVSize;
} else {
bucketElementValues = MemorySegment.NULL;
assert segmentOffset == elementKVSize;
}
var entry = Map.entry(bucketElementKeys, bucketElementValues);
offset += segmentOffset;
}
elementI++;
}
assert offset == rawBucketSegmentByteSize;
}
}
/**
* Add or replace an element
* @return Return the previous value ({@link MemorySegment#NULL} if no value is expected),
* return null if no element was present
*/
@Nullable
public MemorySegment addElement(MemorySegment[] bucketVariableKeys, @Nullable MemorySegment value) {
var element = Map.entry(bucketVariableKeys, value != null ? value : MemorySegment.NULL);
var i = indexOf(bucketVariableKeys);
if (i == -1) {
this.elements.add(element);
return null;
} else {
var val = this.elements.set(i, element).getValue();
assert val != null;
return val;
}
}
/**
* Remove an element
* @return Return the previous value ({@link MemorySegment#NULL} if no value is expected),
* return null if no element was present
*/
@Nullable
public MemorySegment removeElement(MemorySegment[] bucketVariableKeys) {
var i = indexOf(bucketVariableKeys);
if (i == -1) {
return null;
} else {
var val = this.elements.remove(i).getValue();
assert val != null;
return val;
}
}
/**
* Get an element
* @return Return the value ({@link MemorySegment#NULL} if no value is expected),
* return null if no element was present
*/
@Nullable
public MemorySegment getElement(MemorySegment[] bucketVariableKeys) {
var i = indexOf(bucketVariableKeys);
if (i == -1) {
return null;
} else {
var val = this.elements.get(i).getValue();
assert val != null;
return val;
}
}
private int indexOf(MemorySegment[] bucketVariableKeys) {
for (int i = 0; i < elements.size(); i++) {
var elem = elements.get(i);
var arrayKeys = elem.getKey();
assert arrayKeys.length == bucketVariableKeys.length;
for (int j = 0; j < arrayKeys.length; j++) {
if (MemorySegment.mismatch(arrayKeys[j], 0, arrayKeys[j].byteSize(), bucketVariableKeys[j], 0, bucketVariableKeys[j].byteSize()) == -1) {
return i;
}
}
}
return -1;
}
public MemorySegment toSegment(Arena arena) {
if (this.elements.isEmpty()) {
return MemorySegment.NULL;
}
MemorySegment[] serializedElements = new MemorySegment[this.elements.size()];
ArrayList<Entry<MemorySegment[], MemorySegment>> entries = this.elements;
for (int i = 0; i < entries.size(); i++) {
Entry<MemorySegment[], MemorySegment> element = entries.get(i);
var computedBucketElementKey = col.computeBucketElementKey(arena, element.getKey());
var computedBucketElementValue = col.computeBucketElementValue(element.getValue());
serializedElements[i] = col.computeBucketElementKeyValue(arena, computedBucketElementKey, computedBucketElementValue);
}
long totalSize = Integer.BYTES;
for (MemorySegment serializedElement : serializedElements) {
totalSize += serializedElement.byteSize();
}
var segment = arena.allocate(totalSize);
long offset = 0;
segment.set(ColumnInstance.BIG_ENDIAN_INT, offset, serializedElements.length);
offset += Integer.BYTES;
for (MemorySegment elementAtI : serializedElements) {
var elementSize = elementAtI.byteSize();
MemorySegment.copy(elementAtI, 0, segment, offset, elementSize);
offset += elementSize;
}
return segment;
}
}

View File

@ -0,0 +1,171 @@
package it.cavallium.rockserver.core.impl;
import static it.cavallium.rockserver.core.common.Utils.toCharExact;
import static java.lang.Math.toIntExact;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.RocksDBException;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout.OfByte;
import java.lang.foreign.ValueLayout.OfChar;
import java.lang.foreign.ValueLayout.OfInt;
import java.lang.foreign.ValueLayout.OfShort;
import java.nio.ByteOrder;
import java.util.Arrays;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int finalKeySizeBytes) implements AutoCloseable {
public static final OfByte BIG_ENDIAN_BYTES = OfByte.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN);
public static final OfInt BIG_ENDIAN_INT = OfByte.JAVA_INT.withOrder(ByteOrder.BIG_ENDIAN);
public static final OfShort BIG_ENDIAN_SHORT = OfByte.JAVA_SHORT.withOrder(ByteOrder.BIG_ENDIAN);
public static final OfShort BIG_ENDIAN_SHORT_UNALIGNED = OfByte.JAVA_SHORT_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN);
public static final OfChar BIG_ENDIAN_CHAR = OfByte.JAVA_CHAR.withOrder(ByteOrder.BIG_ENDIAN);
public static final OfChar BIG_ENDIAN_CHAR_UNALIGNED = OfByte.JAVA_CHAR_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN);
public static final OfInt BIG_ENDIAN_INT_UNALIGNED = OfByte.JAVA_INT_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN);
public ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema) {
this(cfh, schema, calculateFinalKeySizeBytes(schema));
}
private static int calculateFinalKeySizeBytes(ColumnSchema schema) {
int total = 0;
for (int i : schema.keys()) {
total += i;
}
return total;
}
@Override
public void close() {
cfh.close();
}
public boolean requiresWriteTransaction() {
return schema.variableLengthKeysCount() > 0;
}
public boolean hasBuckets() {
return schema.variableLengthKeysCount() > 0;
}
@NotNull
public MemorySegment calculateKey(Arena arena, MemorySegment[] keys) {
validateKeyCount(keys);
MemorySegment finalKey;
if (keys.length == 0) {
finalKey = MemorySegment.NULL;
} else if(keys.length == 1 && !hasBuckets()) {
finalKey = keys[0];
} else {
finalKey = arena.allocate(finalKeySizeBytes);
long offsetBytes = 0;
for (int i = 0; i < schema.keys().length; i++) {
var computedKeyAtI = computeKeyAt(arena, i, keys);
var computedKeyAtISize = computedKeyAtI.byteSize();
MemorySegment.copy(computedKeyAtI, 0, finalKey, offsetBytes, computedKeyAtISize);
offsetBytes += computedKeyAtISize;
}
}
validateFinalKeySize(finalKey);
return finalKey;
}
private MemorySegment computeKeyAt(Arena arena, int i, MemorySegment[] keys) {
if (i < schema.keys().length - schema.variableLengthKeysCount()) {
return keys[i];
} else {
if (schema.keys()[i] != Integer.BYTES) {
throw new UnsupportedOperationException("Hash size different than 32-bit is currently unsupported");
} else {
return XXHash32.getInstance().hash(arena, keys[i], 0, 0, 0);
}
}
}
private void validateFinalKeySize(MemorySegment key) {
if (finalKeySizeBytes != key.byteSize()) {
throw new IllegalArgumentException(
"Keys size must be equal to the column keys size. Expected " + finalKeySizeBytes + ", got "
+ key.byteSize());
}
}
private void validateKeyCount(MemorySegment[] keys) {
if (schema.keys().length != keys.length) {
throw new IllegalArgumentException(
"Keys count must be equal to the column keys count. Expected " + schema.keys().length + ", got "
+ keys.length);
}
}
public MemorySegment computeBucketElementKey(Arena arena, MemorySegment[] variableKeys) {
long totalSize = 0L;
assert variableKeys.length == schema.variableLengthKeysCount();
for (MemorySegment variableKey : variableKeys) {
totalSize += Character.BYTES + variableKey.byteSize();
}
MemorySegment bucketElementKey = arena.allocate(totalSize);
long offset = 0;
for (MemorySegment keyI : variableKeys) {
var keyISize = keyI.byteSize();
bucketElementKey.set(BIG_ENDIAN_CHAR_UNALIGNED, offset += Character.BYTES, toCharExact(keyISize));
MemorySegment.copy(keyI, 0, bucketElementKey, offset += keyISize, keyISize);
}
assert offset == totalSize;
return bucketElementKey;
}
public MemorySegment computeBucketElementValue(@Nullable MemorySegment value) {
checkNullableValue(value);
if (value != null) {
return value;
} else {
return MemorySegment.NULL;
}
}
public void checkNullableValue(MemorySegment value) {
if (schema.hasValue() == (value == null || value == MemorySegment.NULL)) {
if (schema.hasValue()) {
throw new RocksDBException(RocksDBErrorType.UNEXPECTED_NULL_VALUE, "Schema expects a value, but a null value has been passed");
} else {
throw new RocksDBException(RocksDBErrorType.VALUE_MUST_BE_NULL, "Schema expects no value, but a non-null value has been passed");
}
}
}
public MemorySegment computeBucketElementKeyValue(Arena arena, MemorySegment computedBucketElementKey,
@Nullable MemorySegment computedBucketElementValue) {
checkNullableValue(computedBucketElementValue);
var keySize = computedBucketElementKey.byteSize();
var valueSize = computedBucketElementValue != null ? computedBucketElementValue.byteSize() : 0;
var totalSize = keySize + valueSize;
var computedBucketElementKV = arena.allocate(totalSize);
computedBucketElementKV.set(BIG_ENDIAN_INT, 0, toIntExact(totalSize));
MemorySegment.copy(computedBucketElementKey, 0, computedBucketElementKV, Integer.BYTES, keySize);
if (computedBucketElementValue != null) {
MemorySegment.copy(computedBucketElementValue, 0, computedBucketElementKV, Integer.BYTES + keySize, valueSize);
}
return computedBucketElementKV;
}
/**
* Get only the variable-length keys
*/
public MemorySegment[] getBucketElementKeys(MemorySegment[] keys) {
assert keys.length == schema.keys().length;
return Arrays.copyOfRange(keys,
schema.keys().length - schema.variableLengthKeysCount(),
schema.keys().length);
}
}

View File

@ -0,0 +1,38 @@
package it.cavallium.rockserver.core.impl;
import it.cavallium.rockserver.core.config.DataSize;
import java.util.List;
import org.github.gestalt.config.decoder.Decoder;
import org.github.gestalt.config.decoder.DecoderService;
import org.github.gestalt.config.decoder.Priority;
import org.github.gestalt.config.entity.ValidationError;
import org.github.gestalt.config.node.ConfigNode;
import org.github.gestalt.config.reflect.TypeCapture;
import org.github.gestalt.config.utils.ValidateOf;
class DataSizeDecoder implements Decoder<DataSize> {
@Override
public Priority priority() {
return Priority.LOW;
}
@Override
public String name() {
return "DataSize";
}
@Override
public boolean matches(TypeCapture klass) {
return klass.isAssignableFrom(DataSize.class);
}
@Override
public ValidateOf<DataSize> decode(String path, ConfigNode node, TypeCapture type, DecoderService decoderService) {
try {
return ValidateOf.validateOf(new DataSize(node.getValue().orElseThrow()), List.of());
} catch (Exception ex) {
return ValidateOf.inValid(new ValidationError.DecodingNumberFormatException(path, node, name()));
}
}
}

View File

@ -0,0 +1,491 @@
package it.cavallium.rockserver.core.impl;
import static it.cavallium.rockserver.core.common.Utils.toMemorySegment;
import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import static java.lang.foreign.MemorySegment.NULL;
import static java.util.Objects.requireNonNullElse;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.Callback.GetCallback;
import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
import it.cavallium.rockserver.core.common.Callback.PutCallback;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Callback;
import it.cavallium.rockserver.core.common.Delta;
import it.cavallium.rockserver.core.common.RocksDBAPI;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import it.cavallium.rockserver.core.common.Utils;
import it.cavallium.rockserver.core.config.DatabaseConfig;
import it.cavallium.rockserver.core.impl.rocksdb.REntry;
import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
import java.io.Closeable;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
import org.github.gestalt.config.builder.GestaltBuilder;
import org.github.gestalt.config.exceptions.GestaltException;
import org.github.gestalt.config.source.ClassPathConfigSource;
import org.github.gestalt.config.source.FileConfigSource;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.WriteOptions;
public class EmbeddedDB implements RocksDBAPI, Closeable {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
private static final boolean USE_FAST_GET = true;
private final Logger logger;
private final Path path;
private final Path embeddedConfigPath;
private final DatabaseConfig config;
private TransactionalDB db;
private final NonBlockingHashMapLong<ColumnInstance> columns;
private final ConcurrentMap<String, Long> columnNamesIndex;
private final NonBlockingHashMapLong<REntry<Transaction>> txs;
private final SafeShutdown ops;
private final Object columnEditLock = new Object();
public EmbeddedDB(Path path, @Nullable Path embeddedConfigPath) {
this.path = path;
this.embeddedConfigPath = embeddedConfigPath;
this.logger = Logger.getLogger("db");
this.columns = new NonBlockingHashMapLong<>();
this.txs = new NonBlockingHashMapLong<>();
this.columnNamesIndex = new ConcurrentHashMap<>();
this.ops = new SafeShutdown();
var gsb = new GestaltBuilder();
try {
gsb.addSource(new ClassPathConfigSource("it/cavallium/rockserver/core/resources/default.conf"));
if (embeddedConfigPath != null) {
gsb.addSource(new FileConfigSource(this.embeddedConfigPath));
}
var gestalt = gsb
.addDecoder(new DataSizeDecoder())
.addDefaultConfigLoaders()
.addDefaultDecoders()
.build();
gestalt.loadConfigs();
this.config = gestalt.getConfig("database", DatabaseConfig.class);
logger.log(Level.INFO, "Database configuration: {0}", DatabaseConfig.stringify(this.config));
} catch (GestaltException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.CONFIG_ERROR, e);
}
}
/**
* The column must be registered once!!!
* Do not try to register a column that may already be registered
*/
private long registerColumn(ColumnInstance column) {
try {
var columnName = new String(column.cfh().getName(), StandardCharsets.UTF_8);
long id = FastRandomUtils.allocateNewValue(this.columns, column, 1, Long.MAX_VALUE);
Long previous = this.columnNamesIndex.putIfAbsent(columnName, id);
if (previous != null) {
//noinspection resource
this.columns.remove(id);
throw new UnsupportedOperationException("Column already registered!");
}
return id;
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
}
/**
* The column must be unregistered once!!!
* Do not try to unregister a column that may already be unregistered, or that may not be registered
*/
private ColumnInstance unregisterColumn(long id) {
var col = this.columns.remove(id);
Objects.requireNonNull(col, () -> "Column does not exist: " + id);
String name;
try {
name = new String(col.cfh().getName(), StandardCharsets.UTF_8);
} catch (RocksDBException e) {
throw new RuntimeException(e);
}
// Unregister the column name from the index avoiding race conditions
int retries = 0;
while (this.columnNamesIndex.remove(name) == null && retries++ < 5_000) {
Thread.yield();
}
if (retries >= 5000) {
throw new IllegalStateException("Can't find column in column names index: " + name);
}
return col;
}
@Override
public void close() throws IOException {
// Wait for 10 seconds
try {
ops.closeAndWait(10_000);
} catch (TimeoutException e) {
logger.log(Level.SEVERE, "Some operations lasted more than 10 seconds, forcing database shutdown...");
}
}
@Override
public long openTransaction(long timeoutMs) {
ops.beginOp();
TransactionalOptions txOpts = db.createTransactionalOptions();
var writeOpts = new WriteOptions();
var tx = new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts));
return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2);
}
@Override
public void closeTransaction(long transactionId) {
var tx = txs.remove(transactionId);
if (tx != null) {
try {
tx.close();
} finally {
ops.endOp();
}
} else {
throw new NoSuchElementException("Transaction not found: " + transactionId);
}
}
@Override
public long createColumn(String name, ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException {
synchronized (columnEditLock) {
var colId = getColumnIdOrNull(name);
var col = colId != null ? getColumn(colId) : null;
if (col != null) {
if (schema.equals(col.schema())) {
return colId;
} else {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_EXISTS,
"Column exists, with a different schema: " + name
);
}
} else {
try {
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8)));
return registerColumn(new ColumnInstance(cf, schema));
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
}
}
}
}
@Override
public void deleteColumn(long columnId) throws it.cavallium.rockserver.core.common.RocksDBException {
synchronized (columnEditLock) {
var col = getColumn(columnId);
try {
db.get().dropColumnFamily(col.cfh());
unregisterColumn(columnId).close();
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_DELETE_FAIL, e);
}
}
}
@Override
public long getColumnId(String name) {
var columnId = getColumnIdOrNull(name);
if (columnId == null) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_NOT_FOUND,
"Column not found: " + name);
} else {
return columnId;
}
}
private Long getColumnIdOrNull(String name) {
var columnId = (long) columnNamesIndex.getOrDefault(name, -1L);
ColumnInstance col;
if (columnId == -1L || (col = columns.get(columnId)) == null || !col.cfh().isOwningHandle()) {
return null;
} else {
return columnId;
}
}
@Override
public void put(long transactionId,
long columnId,
MemorySegment[] keys,
@Nullable MemorySegment value,
PutCallback<? super MemorySegment> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try (var arena = Arena.ofConfined()) {
// Column id
var col = getColumn(columnId);
// Check for null value
col.checkNullableValue(value);
MemorySegment previousValue;
REntry<Transaction> tx;
if (transactionId != 0) {
tx = getTransaction(transactionId);
} else {
tx = null;
}
MemorySegment calculatedKey = col.calculateKey(arena, keys);
if (col.hasBuckets()) {
if (tx != null) {
var bucketElementKeys = col.getBucketElementKeys(keys);
try (var readOptions = new ReadOptions()) {
var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
MemorySegment previousRawBucket = Utils.toMemorySegment(arena, previousRawBucketByteArray);
var bucket = new Bucket(col, previousRawBucket);
previousValue = bucket.addElement(bucketElementKeys, value);
tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(bucket.toSegment(arena)));
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_1, e);
}
} else {
// Retry using a transaction: transactions are required to handle this kind of data
var newTransactionId = this.openTransaction(Long.MAX_VALUE);
try {
put(newTransactionId, columnId, keys, value, callback);
return;
} finally {
this.closeTransaction(newTransactionId);
}
}
} else {
if (Callback.requiresGettingPreviousValue(callback)) {
try (var readOptions = new ReadOptions()) {
byte[] previousValueByteArray;
if (tx != null) {
previousValueByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
} else {
previousValueByteArray = db.get().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
}
previousValue = toMemorySegment(arena, previousValueByteArray);
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e);
}
} else {
previousValue = null;
}
if (tx != null) {
tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(requireNonNullElse(value, NULL)));
} else {
try (var w = new WriteOptions()) {
db.get().put(col.cfh(), w, calculatedKey.asByteBuffer(), requireNonNullElse(value, NULL).asByteBuffer());
}
}
}
switch (callback) {
case Callback.CallbackVoid<? super MemorySegment> ignored -> {}
case Callback.CallbackPrevious<? super MemorySegment> c -> c.onPrevious(previousValue);
case Callback.CallbackChanged c -> c.onChanged(valueEquals(previousValue, value));
case Callback.CallbackDelta<? super MemorySegment> c -> c.onSuccess(new Delta<>(previousValue, value));
default -> throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_3,
"Unexpected value: " + callback);
}
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN, ex);
} finally {
ops.endOp();
}
}
@Override
public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback<? super MemorySegment> callback)
throws it.cavallium.rockserver.core.common.RocksDBException {
ops.beginOp();
try (var arena = Arena.ofConfined()) {
// Column id
var col = getColumn(columnId);
MemorySegment foundValue;
boolean existsValue;
REntry<Transaction> tx;
if (transactionId != 0) {
tx = getTransaction(transactionId);
} else {
tx = null;
}
MemorySegment calculatedKey = col.calculateKey(arena, keys);
if (col.hasBuckets()) {
var bucketElementKeys = col.getBucketElementKeys(keys);
try (var readOptions = new ReadOptions()) {
MemorySegment previousRawBucket = dbGet(tx, col, arena, readOptions, calculatedKey);
var bucket = new Bucket(col, previousRawBucket);
foundValue = bucket.getElement(bucketElementKeys);
existsValue = foundValue != null;
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.GET_1, e);
}
} else {
boolean shouldGetCurrent = Callback.requiresGettingCurrentValue(callback)
|| (tx != null && callback instanceof Callback.CallbackExists<? super MemorySegment>);
if (shouldGetCurrent) {
try (var readOptions = new ReadOptions()) {
foundValue = dbGet(tx, col, arena, readOptions, calculatedKey);
existsValue = foundValue != null;
} catch (RocksDBException e) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e);
}
} else if (callback instanceof Callback.CallbackExists<? super MemorySegment>) {
// tx is always null here
//noinspection ConstantValue
assert tx == null;
foundValue = null;
existsValue = db.get().keyExists(calculatedKey.asByteBuffer());
} else {
foundValue = null;
existsValue = false;
}
}
switch (callback) {
case Callback.CallbackVoid<? super MemorySegment> ignored -> {}
case Callback.CallbackCurrent<? super MemorySegment> c -> c.onCurrent(foundValue);
case Callback.CallbackExists<? super MemorySegment> c -> c.onExists(existsValue);
default -> throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_3,
"Unexpected value: " + callback);
}
} catch (it.cavallium.rockserver.core.common.RocksDBException ex) {
throw ex;
} catch (Exception ex) {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN, ex);
} finally {
ops.endOp();
}
}
@Override
public long openIterator(long transactionId,
long columnId,
MemorySegment[] startKeysInclusive,
@Nullable MemorySegment[] endKeysExclusive,
boolean reverse,
long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException {
return 0;
}
@Override
public void closeIterator(long iteratorId) throws it.cavallium.rockserver.core.common.RocksDBException {
}
@Override
public void seekTo(long iterationId, MemorySegment[] keys)
throws it.cavallium.rockserver.core.common.RocksDBException {
}
@Override
public void subsequent(long iterationId,
long skipCount,
long takeCount,
IteratorCallback<? super MemorySegment> callback) throws it.cavallium.rockserver.core.common.RocksDBException {
}
private MemorySegment dbGet(REntry<Transaction> tx,
ColumnInstance col,
Arena arena,
ReadOptions readOptions,
MemorySegment calculatedKey) throws RocksDBException {
if (tx != null) {
var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
return Utils.toMemorySegment(arena, previousRawBucketByteArray);
} else {
var db = this.db.get();
if (USE_FAST_GET) {
return dbGetDirect(arena, readOptions, calculatedKey);
} else {
var previousRawBucketByteArray = db.get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES));
return Utils.toMemorySegment(arena, previousRawBucketByteArray);
}
}
}
@Nullable
private MemorySegment dbGetDirect(Arena arena, ReadOptions readOptions, MemorySegment calculatedKey)
throws RocksDBException {
// Get the key nio buffer to pass to RocksDB
ByteBuffer keyNioBuffer = calculatedKey.asByteBuffer();
// Create a direct result buffer because RocksDB works only with direct buffers
var resultBuffer = arena.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES).asByteBuffer();
var keyMayExist = this.db.get().keyMayExist(readOptions, keyNioBuffer.rewind(), resultBuffer.clear());
return switch (keyMayExist.exists) {
case kNotExist -> null;
case kExistsWithValue, kExistsWithoutValue -> {
// At the beginning, size reflects the expected size, then it becomes the real data size
int size = keyMayExist.exists == kExistsWithValue ? keyMayExist.valueLength : -1;
if (keyMayExist.exists == kExistsWithoutValue || size > resultBuffer.limit()) {
if (size > resultBuffer.capacity()) {
resultBuffer = arena.allocate(size).asByteBuffer();
}
size = this.db.get().get(readOptions, keyNioBuffer.rewind(), resultBuffer.clear());
}
if (size == RocksDB.NOT_FOUND) {
yield null;
} else if (size == resultBuffer.limit()) {
yield MemorySegment.ofBuffer(resultBuffer.position(0));
} else {
throw new IllegalStateException("size (" + size + ") != read size (" + resultBuffer.limit() + ")");
}
}
};
}
private boolean valueEquals(MemorySegment previousValue, MemorySegment currentValue) {
previousValue = requireNonNullElse(previousValue, NULL);
currentValue = requireNonNullElse(currentValue, NULL);
return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize()) != -1;
}
public static byte[] toByteArray(MemorySegment memorySegment) {
return memorySegment.toArray(BIG_ENDIAN_BYTES);
}
private ColumnInstance getColumn(long columnId) {
var col = columns.get(columnId);
if (col != null) {
return col;
} else {
throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_NOT_FOUND,
"No column with id " + columnId);
}
}
private REntry<Transaction> getTransaction(long transactionId) {
var tx = txs.get(transactionId);
if (tx != null) {
return tx;
} else {
throw new NoSuchElementException("No transaction with id " + transactionId);
}
}
public Path getPath() {
return path;
}
}

View File

@ -0,0 +1,19 @@
package it.cavallium.rockserver.core.impl;
import java.util.concurrent.ThreadLocalRandom;
import org.cliffc.high_scale_lib.NonBlockingHashMapLong;
public class FastRandomUtils {
public static <T> long allocateNewValue(NonBlockingHashMapLong<T> map, T value, long minId, long maxId) {
long newTransactionId;
do {
newTransactionId = getRandomId(minId, maxId);
} while (map.putIfAbsent(newTransactionId, value) != null);
return newTransactionId;
}
private static long getRandomId(long minId, long maxId) {
return ThreadLocalRandom.current().nextLong(minId, maxId);
}
}

View File

@ -0,0 +1,55 @@
package it.cavallium.rockserver.core.impl;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
public class SafeShutdown implements AutoCloseable {
private volatile boolean closing;
private final LongAdder pendingOps = new LongAdder();
public void beginOp() {
if (closing) {
throw new IllegalStateException("Closed");
}
pendingOps.increment();
}
public void endOp() {
pendingOps.decrement();
}
public void closeAndWait(long timeoutMs) throws TimeoutException {
this.closing = true;
waitForExit(timeoutMs);
}
public void waitForExit(long timeoutMs) throws TimeoutException {
try {
long startMs = System.nanoTime();
while (pendingOps.sum() > 0 && System.nanoTime() - startMs < (timeoutMs * 1000000L)) {
//noinspection BusyWait
Thread.sleep(10);
}
if (pendingOps.sum() > 0) {
throw new TimeoutException();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
try {
closeAndWait(Long.MAX_VALUE);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}
public boolean isOpen() {
return !closing;
}
}

View File

@ -0,0 +1,28 @@
package it.cavallium.rockserver.core.impl;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.SegmentAllocator;
public class SlicingArena implements Arena {
final Arena arena = Arena.ofConfined();
final SegmentAllocator slicingAllocator;
public SlicingArena(long size) {
slicingAllocator = SegmentAllocator.slicingAllocator(arena.allocate(size));
}
public MemorySegment allocate(long byteSize, long byteAlignment) {
return slicingAllocator.allocate(byteSize, byteAlignment);
}
public MemorySegment.Scope scope() {
return arena.scope();
}
public void close() {
arena.close();
}
}

View File

@ -0,0 +1,48 @@
package it.cavallium.rockserver.core.impl;
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
/**
* A 32-bits hash.
* <p>
* Instances of this class are thread-safe.
*/
public abstract class XXHash32 {
/**
* Compute the 32-bits hash of <code>buf[off:off+len]</code> using seed
* <code>seed</code>.
*/
public abstract int hash(byte[] buf, int off, int len, int seed);
/**
* Compute the big-endian 32-bits hash of <code>buf[off:off+len]</code> using seed
* <code>seed</code>.
*/
public abstract MemorySegment hash(Arena arena, MemorySegment buf, int off, int len, int seed);
@Override
public String toString() {
return getClass().getSimpleName();
}
public static XXHash32 getInstance() {
return XXHash32JavaSafe.INSTANCE;
}
}

View File

@ -0,0 +1,222 @@
package it.cavallium.rockserver.core.impl;
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME1;
import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME2;
import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME3;
import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME4;
import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME5;
import static java.lang.Integer.rotateLeft;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.lang.foreign.ValueLayout.OfByte;
import java.lang.foreign.ValueLayout.OfInt;
import java.nio.ByteOrder;
/**
* Safe Java implementation of {@link XXHash32}.
*/
public final class XXHash32JavaSafe extends XXHash32 {
public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
public static final XXHash32 INSTANCE = new XXHash32JavaSafe();
private static final OfInt INT_LE = ValueLayout.JAVA_INT.withOrder(ByteOrder.LITTLE_ENDIAN);
private static final OfInt INT_BE = ValueLayout.JAVA_INT.withOrder(ByteOrder.BIG_ENDIAN);
private static final OfByte BYTE_BE = ValueLayout.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN);
@Override
public int hash(byte[] buf, int off, int len, int seed) {
checkRange(buf, off, len);
final int end = off + len;
int h32;
if (len >= 16) {
final int limit = end - 16;
int v1 = seed + PRIME1 + PRIME2;
int v2 = seed + PRIME2;
int v3 = seed + 0;
int v4 = seed - PRIME1;
do {
v1 += readIntLE(buf, off) * PRIME2;
v1 = rotateLeft(v1, 13);
v1 *= PRIME1;
off += 4;
v2 += readIntLE(buf, off) * PRIME2;
v2 = rotateLeft(v2, 13);
v2 *= PRIME1;
off += 4;
v3 += readIntLE(buf, off) * PRIME2;
v3 = rotateLeft(v3, 13);
v3 *= PRIME1;
off += 4;
v4 += readIntLE(buf, off) * PRIME2;
v4 = rotateLeft(v4, 13);
v4 *= PRIME1;
off += 4;
} while (off <= limit);
h32 = rotateLeft(v1, 1) + rotateLeft(v2, 7) + rotateLeft(v3, 12) + rotateLeft(v4, 18);
} else {
h32 = seed + PRIME5;
}
h32 += len;
while (off <= end - 4) {
h32 += readIntLE(buf, off) * PRIME3;
h32 = rotateLeft(h32, 17) * PRIME4;
off += 4;
}
while (off < end) {
h32 += (buf[off] & 0xFF) * PRIME5;
h32 = rotateLeft(h32, 11) * PRIME1;
++off;
}
h32 ^= h32 >>> 15;
h32 *= PRIME2;
h32 ^= h32 >>> 13;
h32 *= PRIME3;
h32 ^= h32 >>> 16;
return h32;
}
@Override
public MemorySegment hash(Arena arena, MemorySegment buf, int off, int len, int seed) {
checkRange(buf, off, len);
final int end = off + len;
int h32;
if (len >= 16) {
final int limit = end - 16;
int v1 = seed + PRIME1 + PRIME2;
int v2 = seed + PRIME2;
int v3 = seed + 0;
int v4 = seed - PRIME1;
do {
v1 += readIntLE(buf, off) * PRIME2;
v1 = rotateLeft(v1, 13);
v1 *= PRIME1;
off += 4;
v2 += readIntLE(buf, off) * PRIME2;
v2 = rotateLeft(v2, 13);
v2 *= PRIME1;
off += 4;
v3 += readIntLE(buf, off) * PRIME2;
v3 = rotateLeft(v3, 13);
v3 *= PRIME1;
off += 4;
v4 += readIntLE(buf, off) * PRIME2;
v4 = rotateLeft(v4, 13);
v4 *= PRIME1;
off += 4;
} while (off <= limit);
h32 = rotateLeft(v1, 1) + rotateLeft(v2, 7) + rotateLeft(v3, 12) + rotateLeft(v4, 18);
} else {
h32 = seed + PRIME5;
}
h32 += len;
while (off <= end - 4) {
h32 += readIntLE(buf, off) * PRIME3;
h32 = rotateLeft(h32, 17) * PRIME4;
off += 4;
}
while (off < end) {
h32 += (buf.get(BYTE_BE, off) & 0xFF) * PRIME5;
h32 = rotateLeft(h32, 11) * PRIME1;
++off;
}
h32 ^= h32 >>> 15;
h32 *= PRIME2;
h32 ^= h32 >>> 13;
h32 *= PRIME3;
h32 ^= h32 >>> 16;
return arena.allocate(INT_BE, h32);
}
private static void checkRange(byte[] buf, int off) {
if (off < 0 || off >= buf.length) {
throw new ArrayIndexOutOfBoundsException(off);
}
}
private static void checkRange(MemorySegment buf, int off) {
if (off < 0 || off >= buf.byteSize()) {
throw new ArrayIndexOutOfBoundsException(off);
}
}
private static void checkRange(byte[] buf, int off, int len) {
checkLength(len);
if (len > 0) {
checkRange(buf, off);
checkRange(buf, off + len - 1);
}
}
private static void checkRange(MemorySegment buf, int off, int len) {
checkLength(len);
if (len > 0) {
checkRange(buf, off);
checkRange(buf, off + len - 1);
}
}
private static void checkLength(int len) {
if (len < 0) {
throw new IllegalArgumentException("lengths must be >= 0");
}
}
private static int readIntBE(byte[] buf, int i) {
return ((buf[i] & 0xFF) << 24) | ((buf[i+1] & 0xFF) << 16) | ((buf[i+2] & 0xFF) << 8) | (buf[i+3] & 0xFF);
}
private static int readIntLE(byte[] buf, int i) {
return (buf[i] & 0xFF) | ((buf[i+1] & 0xFF) << 8) | ((buf[i+2] & 0xFF) << 16) | ((buf[i+3] & 0xFF) << 24);
}
private static int readIntLE(MemorySegment buf, int i) {
return buf.get(INT_LE, i);
}
private static int readInt(byte[] buf, int i) {
if (NATIVE_BYTE_ORDER == ByteOrder.BIG_ENDIAN) {
return readIntBE(buf, i);
} else {
return readIntLE(buf, i);
}
}
}

View File

@ -0,0 +1,26 @@
package it.cavallium.rockserver.core.impl;
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
enum XXHashUtils {
;
static final int PRIME1 = -1640531535;
static final int PRIME2 = -2048144777;
static final int PRIME3 = -1028477379;
static final int PRIME4 = 668265263;
static final int PRIME5 = 374761393;
}

View File

@ -0,0 +1,13 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import java.io.Closeable;
import org.rocksdb.AbstractNativeReference;
public record REntry<T extends AbstractNativeReference>(T val, RocksDBObjects objs) implements Closeable {
@Override
public void close() {
val.close();
objs.close();
}
}

View File

@ -0,0 +1,37 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import java.util.ArrayList;
import java.util.List;
public class RocksDBObjects implements AutoCloseable {
private final List<AutoCloseable> refs;
public RocksDBObjects(int size) {
this.refs = new ArrayList<>(size);
}
public RocksDBObjects() {
this.refs = new ArrayList<>();
}
public RocksDBObjects(AutoCloseable... refs) {
this(refs.length);
for (AutoCloseable ref : refs) {
add(ref);
}
}
public void add(AutoCloseable ref) {
this.refs.add(ref);
}
@Override
public void close() {
for (int i = refs.size() - 1; i >= 0; i--) {
try {
refs.get(i).close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,217 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import java.io.Closeable;
import java.io.IOException;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.OptimisticTransactionOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Transaction;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionOptions;
import org.rocksdb.WriteOptions;
public sealed interface TransactionalDB extends Closeable {
static TransactionalDB create(RocksDB db) {
return switch (db) {
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(optimisticTransactionDB);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(transactionDB);
default -> throw new UnsupportedOperationException("This database is not transactional");
};
}
TransactionalOptions createTransactionalOptions();
RocksDB get();
/**
* Starts a new Transaction.
* <p>
* Caller is responsible for calling {@link #close()} on the returned
* transaction when it is no longer needed.
*
* @param writeOptions Any write options for the transaction
* @return a new transaction
*/
Transaction beginTransaction(final WriteOptions writeOptions);
/**
* Starts a new Transaction.
* <p>
* Caller is responsible for calling {@link #close()} on the returned
* transaction when it is no longer needed.
*
* @param writeOptions Any write options for the transaction
* @param transactionOptions Any options for the transaction
* @return a new transaction
*/
Transaction beginTransaction(final WriteOptions writeOptions,
final TransactionalOptions transactionOptions);
/**
* Starts a new Transaction.
* <p>
* Caller is responsible for calling {@link #close()} on the returned
* transaction when it is no longer needed.
*
* @param writeOptions Any write options for the transaction
* @param oldTransaction this Transaction will be reused instead of allocating
* a new one. This is an optimization to avoid extra allocations
* when repeatedly creating transactions.
* @return The oldTransaction which has been reinitialized as a new
* transaction
*/
Transaction beginTransaction(final WriteOptions writeOptions,
final Transaction oldTransaction);
/**
* Starts a new Transaction.
* <p>
* Caller is responsible for calling {@link #close()} on the returned
* transaction when it is no longer needed.
*
* @param writeOptions Any write options for the transaction
* @param transactionOptions Any options for the transaction
* @param oldTransaction this Transaction will be reused instead of allocating
* a new one. This is an optimization to avoid extra allocations
* when repeatedly creating transactions.
* @return The oldTransaction which has been reinitialized as a new
* transaction
*/
Transaction beginTransaction(final WriteOptions writeOptions,
final TransactionalOptions transactionOptions, final Transaction oldTransaction);
sealed interface TransactionalOptions extends Closeable {
@Override
void close();
}
final class PessimisticTransactionalDB implements TransactionalDB {
private final TransactionDB db;
public PessimisticTransactionalDB(TransactionDB db) {
this.db = db;
}
@Override
public TransactionalOptions createTransactionalOptions() {
return new TransactionalOptionsPessimistic(new TransactionOptions());
}
@Override
public RocksDB get() {
return db;
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions) {
return db.beginTransaction(writeOptions);
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions, TransactionalOptions transactionOptions) {
return db.beginTransaction(writeOptions,
((TransactionalOptionsPessimistic) transactionOptions).transactionOptions
);
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions, Transaction oldTransaction) {
return db.beginTransaction(writeOptions, oldTransaction);
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions,
TransactionalOptions transactionOptions,
Transaction oldTransaction) {
return db.beginTransaction(writeOptions,
((TransactionalOptionsPessimistic) transactionOptions).transactionOptions,
oldTransaction
);
}
@Override
public void close() throws IOException {
try {
db.closeE();
} catch (RocksDBException e) {
throw new IOException(e);
}
}
private record TransactionalOptionsPessimistic(TransactionOptions transactionOptions) implements
TransactionalOptions {
@Override
public void close() {
transactionOptions.close();
}
}
}
final class OptimisticTransactionalDB implements TransactionalDB {
private final OptimisticTransactionDB db;
public OptimisticTransactionalDB(OptimisticTransactionDB db) {
this.db = db;
}
@Override
public TransactionalOptions createTransactionalOptions() {
return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions());
}
@Override
public RocksDB get() {
return db;
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions) {
return db.beginTransaction(writeOptions);
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions, TransactionalOptions transactionOptions) {
return db.beginTransaction(writeOptions,
((TransactionalOptionsOptimistic) transactionOptions).transactionOptions
);
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions, Transaction oldTransaction) {
return db.beginTransaction(writeOptions, oldTransaction);
}
@Override
public Transaction beginTransaction(WriteOptions writeOptions,
TransactionalOptions transactionOptions,
Transaction oldTransaction) {
return db.beginTransaction(writeOptions,
((TransactionalOptionsOptimistic) transactionOptions).transactionOptions,
oldTransaction
);
}
@Override
public void close() throws IOException {
try {
db.closeE();
} catch (RocksDBException e) {
throw new IOException(e);
}
}
private record TransactionalOptionsOptimistic(OptimisticTransactionOptions transactionOptions) implements
TransactionalOptions {
@Override
public void close() {
transactionOptions.close();
}
}
}
}

View File

@ -0,0 +1,19 @@
module rockserver.core {
requires rocksdbjni;
requires net.sourceforge.argparse4j;
requires inet.ipaddr;
requires java.logging;
requires typesafe.config;
requires org.jetbrains.annotations;
requires high.scale.lib;
requires org.github.gestalt.core;
requires org.github.gestalt.hocon;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;
exports it.cavallium.rockserver.core.config;
opens it.cavallium.rockserver.core.resources;
opens it.cavallium.rockserver.core.config to org.github.gestalt.core, org.github.gestalt.hocon;
exports it.cavallium.rockserver.core.impl.rocksdb;
exports it.cavallium.rockserver.core.impl;
}

View File

@ -0,0 +1,95 @@
database: {
global: {
# Keep false unless you have a legacy database
enable-column-bug: false
# Enable to adapt the database to spinning disk
spinning: false
# Error checking
checksum: true
# Use direct I/O in RocksDB databases (Higher I/O read throughput but OS cache is not used, less swapping, less memory pressure)
use-direct-io: true
# Allow memory mapped (mmap) RocksDB databases (High OS cache usage if direct I/O is not enabled)
allow-rocksdb-memory-mapping: false
# Maximum open files for each RocksDB database instance. -1 is infinite.
# If the maximum open files count is -1, the initial startup time will be slower.
# If "cacheIndexAndFilterBlocks" is false, the memory will rise when the number of open files rises.
maximum-open-files: -1
optimistic: true
# Database block cache size
block-cache: 512MiB
# Database write buffer manager size
# You should enable this option if you are using direct I/O or spinning disks
write-buffer-manager: 64MiB
# Log data path
log-path: ./logs
# Write-Ahead-Log data path
wal-path: ./wal
fallback-column-options: {
# RocksDB data levels
# Available compression types: PLAIN, SNAPPY, LZ4, LZ4_HC, ZSTD, ZLIB, BZLIB2
levels: [
{
compression: LZ4
max-dict-bytes: 0
}
{
compression: LZ4
max-dict-bytes: 0
}
{
compression: ZSTD
max-dict-bytes: 0
}
{
compression: ZSTD
max-dict-bytes: 0
}
{
compression: ZSTD
max-dict-bytes: 0
}
{
compression: ZSTD
max-dict-bytes: 0
}
{
compression: ZSTD
# Maximum compression dictionary bytes per-sst
max-dict-bytes: 32KiB
}
]
# Memtable memory budget for RocksDB
# Used to optimize compactions and avoid write stalls
memtable-memory-budget-bytes: 128MiB
# Disable to reduce IOWAIT and make the read/writes faster
# Enable to reduce ram usage
# If maximum-open-files is != -1, this option must be set to true,
# otherwise the indexes and filters will be unloaded often
cache-index-and-filter-blocks: true
# Disable to reduce IOWAIT and make the read/writes faster
# Enable to reduce ram usage
partition-filters: false
# Bloom filter.
bloom-filter: {
# Bits per key. This will determine bloom memory size: bitsPerKey * totalKeys
bits-per-key: 10
# Disable bloom for the bottommost level, this reduces the memory size to 1/10
optimize-for-hits: false
}
# Use relatively larger block sizes to reduce index block size.
# You should use at least 64KB block size.
# You can consider 256KB or even 512KB.
# The downside of using large blocks is that RAM is wasted in the block cache.
block-size: 16KiB
# This should be kept to null if write-buffer-manager is set,
# or if you want to use the "memtable-memory-budget-size" logic.
# Remember that there are "max-write-buffer-number" in memory, 2 by default
write-buffer-size: 200MiB
}
column-options: [
${database.global.fallback-column-options} {
name: "default"
}
]
}
}

View File

@ -0,0 +1,52 @@
package it.cavallium.rockserver.core.impl.test;
import it.cavallium.rockserver.core.client.EmbeddedConnection;
import it.cavallium.rockserver.core.common.Callback.CallbackDelta;
import it.cavallium.rockserver.core.common.ColumnSchema;
import it.cavallium.rockserver.core.common.Delta;
import java.io.File;
import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.stream.Stream;
class EmbeddedDBTest {
private Path dir;
private EmbeddedConnection db;
@org.junit.jupiter.api.BeforeEach
void setUp() throws IOException {
this.dir = Files.createTempDirectory("db-test");
db = new EmbeddedConnection(dir, "test", null);
}
@org.junit.jupiter.api.AfterEach
void tearDown() throws IOException {
try (Stream<Path> walk = Files.walk(dir)) {
db.close();
walk.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.peek(System.out::println)
.forEach(File::delete);
}
}
@org.junit.jupiter.api.Test
void put() {
var colId = db.createColumn("put-1", new ColumnSchema(new int[]{16, 16, 16, 32, 32}, 2, true));
db.put(0, colId, null, null, new CallbackDelta<MemorySegment>() {
@Override
public void onSuccess(Delta<MemorySegment> previous) {
}
});
db.deleteColumn(colId);
}
@org.junit.jupiter.api.Test
void get() {
}
}

View File

@ -0,0 +1,34 @@
package it.cavallium.rockserver.core.test;
import it.cavallium.rockserver.core.impl.XXHash32;
import java.lang.foreign.Arena;
import java.lang.foreign.ValueLayout;
import java.lang.foreign.ValueLayout.OfByte;
import java.nio.ByteOrder;
import java.util.concurrent.ThreadLocalRandom;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class XXHash32Test {
public static void main(String[] args) {
new XXHash32Test().test();
}
@Test
public void test() {
var safeXxhash32 = net.jpountz.xxhash.XXHashFactory.safeInstance().hash32();
var myXxhash32 = XXHash32.getInstance();
for (int runs = 0; runs < 3; runs++) {
for (int len = 0; len < 600; len++) {
byte[] bytes = new byte[len];
ThreadLocalRandom.current().nextBytes(bytes);
var hash = safeXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE);
var a = Arena.global();
var result = myXxhash32.hash(a, a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE);
var resultInt = result.get(ValueLayout.JAVA_INT.withOrder(ByteOrder.BIG_ENDIAN), 0);
Assertions.assertEquals(hash, resultInt);
}
}
}
}

View File

@ -0,0 +1,7 @@
module rockserver.core.test {
requires org.lz4.java;
requires rockserver.core;
requires org.junit.jupiter.api;
opens it.cavallium.rockserver.core.test;
opens it.cavallium.rockserver.core.impl.test;
}