diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cfd6805 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ + +.idea/ + +target/ diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..d0679a0 --- /dev/null +++ b/pom.xml @@ -0,0 +1,243 @@ + + + 4.0.0 + + it.cavallium.rockserver + rockserver-core + 1.0.0-SNAPSHOT + + + 21 + 21 + 0.9.28 + 0.22.0 + rockserver-core + it.cavallium.rockserver.core.Main + + + + + org.rocksdb + rocksdbjni + 8.8.1 + + + net.sourceforge.argparse4j + argparse4j + 0.9.0 + + + com.github.seancfoley + ipaddress + 5.4.0 + + + com.typesafe + config + 1.4.3 + + + org.jetbrains + annotations + 24.0.1 + compile + + + com.github.stephenc.high-scale-lib + high-scale-lib + 1.1.4 + + + com.github.gestalt-config + gestalt-core + ${gestalt.version} + + + com.github.gestalt-config + gestalt-hocon + ${gestalt.version} + + + + org.lz4 + lz4-java + 1.8.0 + test + + + org.junit.jupiter + junit-jupiter + 5.9.2 + test + + + + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + + java + + java + + + it.cavallium.rockserver.core.Main + + --enable-preview + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + ${maven.compiler.source} + ${maven.compiler.source} + true + + --enable-preview + + + + + maven-surefire-plugin + + --enable-preview + + + + maven-failsafe-plugin + + --enable-preview + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + + true + it.cavallium.rockserver.core.Main + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + package + + single + + + + + + + true + it.cavallium.rockserver.core.Main + + + + jar-with-dependencies + + + + + + + + + native + + + + org.graalvm.buildtools + native-maven-plugin + ${native.maven.plugin.version} + true + + + build-native + + compile-no-fork + + package + + + test-native + + test + + test + + + + false + + true + + + --strict-image-heap + -march=native + -H:IncludeResourceBundles=net.sourceforge.argparse4j.internal.ArgumentParserImpl-en_US + -O1 + --enable-preview + --no-fallback + --gc=G1 + + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + + java-agent + + exec + + + java + ${project.build.directory} + + --enable-preview + -classpath + + ${mainClass} + + + + + native + + exec + + + ${project.build.directory}/${imageName} + ${project.build.directory} + + + + + + + + + \ No newline at end of file diff --git a/src/main/java/it/cavallium/rockserver/core/Main.java b/src/main/java/it/cavallium/rockserver/core/Main.java new file mode 100644 index 0000000..52727c8 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/Main.java @@ -0,0 +1,102 @@ +package it.cavallium.rockserver.core; + +import static java.util.Objects.requireNonNull; + +import inet.ipaddr.HostName; +import it.cavallium.rockserver.core.client.ClientBuilder; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnixDomainSocketAddress; +import java.net.spi.InetAddressResolver; +import java.net.spi.InetAddressResolverProvider; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.logging.Level; +import java.util.logging.Logger; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import org.rocksdb.RocksDB; + +public class Main { + + private static final Logger LOG = Logger.getLogger("rockserver-core"); + + public static void main(String[] args) throws IOException, URISyntaxException { + ArgumentParser parser = ArgumentParsers.newFor("rockserver-core").build() + .defaultHelp(true) + .description("RocksDB server core"); + parser.addArgument("-u", "--url") + .type(String.class) + .setDefault("file://" + System.getProperty("user.home") + "/rockserver-core-db") + .help("Specify database rocksdb://hostname:port, or unix://, or file://"); + parser.addArgument("-n", "--name") + .type(String.class) + .setDefault("main") + .help("Specify database name"); + parser.addArgument("-c", "--config") + .type(Path.class) + .help("Specify the rockserver-core.conf file path. Do not set if the database is not local"); + parser.addArgument("-p", "--print-default-config") + .type(Boolean.class) + .setDefault(false) + .help("Print the default configs"); + Namespace ns = null; + try { + ns = parser.parseArgs(args); + } catch (ArgumentParserException e) { + parser.handleError(e); + System.exit(1); + } + var clientBuilder = new it.cavallium.rockserver.core.client.ClientBuilder(); + + if (ns.getBoolean("print_default_config")) { + requireNonNull(Main.class.getClassLoader() + .getResourceAsStream("it/cavallium/rockserver/core/resources/default.conf")) + .transferTo(System.out); + System.exit(0); + return; + } + + LOG.info("Starting..."); + RocksDB.loadLibrary(); + + var rawUrl = ns.getString("url"); + var name = ns.getString("name"); + var config = ns.getString("config"); + + var url = new URI(rawUrl); + + if (config != null) { + if (!url.getScheme().equals("file")) { + System.err.println("Do not set --config if the database is not local!"); + System.exit(1); + return; + } else { + clientBuilder.setEmbeddedConfig(Path.of(config)); + } + } + + switch (url.getScheme()) { + case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(url.getPath()))); + case "file" -> clientBuilder.setEmbedded(Path.of(url.getPath())); + case "rocksdb" -> clientBuilder.setAddress(new HostName(url.getHost()).asInetSocketAddress()); + default -> throw new IllegalArgumentException("Invalid scheme: " + url.getScheme()); + } + + clientBuilder.setName(name); + try (var connection = clientBuilder.build()) { + LOG.log(Level.INFO, "Connected to {0}", connection); + CountDownLatch shutdownLatch = new CountDownLatch(1); + Runtime.getRuntime().addShutdownHook(new Thread(shutdownLatch::countDown)); + LOG.info("Shutting down..."); + } + LOG.info("Shut down successfully"); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/BaseConnection.java b/src/main/java/it/cavallium/rockserver/core/client/BaseConnection.java new file mode 100644 index 0000000..0bba727 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/BaseConnection.java @@ -0,0 +1,22 @@ +package it.cavallium.rockserver.core.client; + +import java.io.IOException; + +public abstract class BaseConnection implements RocksDBConnection { + + private final String name; + + public BaseConnection(String name) { + this.name = name; + } + + @Override + public void close() throws IOException { + + } + + @Override + public String toString() { + return "db \"" + name + "\" (" + getUrl() + ")"; + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java new file mode 100644 index 0000000..77fe955 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java @@ -0,0 +1,46 @@ +package it.cavallium.rockserver.core.client; + +import java.net.InetSocketAddress; +import java.net.UnixDomainSocketAddress; +import java.nio.file.Path; + +public class ClientBuilder { + + private InetSocketAddress iNetAddress; + private UnixDomainSocketAddress unixAddress; + private Path embedded; + private String name; + private Path embeddedConfig; + + public void setEmbedded(Path path) { + this.embedded = path; + } + + public void setUnixSocket(UnixDomainSocketAddress address) { + this.unixAddress = address; + } + + public void setAddress(InetSocketAddress address) { + this.iNetAddress = address; + } + + public void setName(String name) { + this.name = name; + } + + public void setEmbeddedConfig(Path embeddedConfig) { + this.embeddedConfig = embeddedConfig; + } + + public RocksDBConnection build() { + if (embedded != null) { + return new EmbeddedConnection(embedded, name, embeddedConfig); + } else if (unixAddress != null) { + return new SocketConnectionUnix(unixAddress, name); + } else if (iNetAddress != null) { + return new SocketConnectionInet(iNetAddress, name); + } else { + throw new UnsupportedOperationException("Please set a connection type"); + } + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java new file mode 100644 index 0000000..c8f5f90 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -0,0 +1,102 @@ +package it.cavallium.rockserver.core.client; + +import it.cavallium.rockserver.core.common.Callback.GetCallback; +import it.cavallium.rockserver.core.common.Callback.IteratorCallback; +import it.cavallium.rockserver.core.common.Callback.PutCallback; +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.RocksDBException; +import it.cavallium.rockserver.core.impl.EmbeddedDB; +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.net.URI; +import java.nio.file.Path; +import org.jetbrains.annotations.Nullable; + +public class EmbeddedConnection extends BaseConnection { + + private final EmbeddedDB db; + + public EmbeddedConnection(Path path, String name, Path embeddedConfig) { + super(name); + this.db = new EmbeddedDB(path, embeddedConfig); + } + + @Override + public void close() throws IOException { + db.close(); + super.close(); + } + + @Override + public URI getUrl() { + return db.getPath().toUri(); + } + + @Override + public long openTransaction(long timeoutMs) { + return db.openTransaction(timeoutMs); + } + + @Override + public void closeTransaction(long transactionId) { + db.closeTransaction(transactionId); + } + + @Override + public long createColumn(String name, ColumnSchema schema) { + return db.createColumn(name, schema); + } + + @Override + public void deleteColumn(long columnId) throws RocksDBException { + db.deleteColumn(columnId); + } + + @Override + public long getColumnId(String name) { + return db.getColumnId(name); + } + + @Override + public void put(long transactionId, + long columnId, + MemorySegment[] keys, + @Nullable MemorySegment value, + PutCallback callback) throws RocksDBException { + db.put(transactionId, columnId, keys, value, callback); + } + + @Override + public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback callback) + throws RocksDBException { + db.get(transactionId, columnId, keys, callback); + } + + @Override + public long openIterator(long transactionId, + long columnId, + MemorySegment[] startKeysInclusive, + @Nullable MemorySegment[] endKeysExclusive, + boolean reverse, + long timeoutMs) throws RocksDBException { + return db.openIterator(transactionId, columnId, startKeysInclusive, endKeysExclusive, reverse, timeoutMs); + } + + @Override + public void closeIterator(long iteratorId) throws RocksDBException { + db.closeIterator(iteratorId); + } + + @Override + public void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException { + db.seekTo(iterationId, keys); + } + + @Override + public void subsequent(long iterationId, + long skipCount, + long takeCount, + IteratorCallback callback) throws RocksDBException { + db.subsequent(iterationId, skipCount, takeCount, callback); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/RocksDBConnection.java b/src/main/java/it/cavallium/rockserver/core/client/RocksDBConnection.java new file mode 100644 index 0000000..369aaee --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/RocksDBConnection.java @@ -0,0 +1,16 @@ +package it.cavallium.rockserver.core.client; + +import it.cavallium.rockserver.core.common.RocksDBAPI; +import java.io.Closeable; +import java.net.URI; +import org.jetbrains.annotations.Nullable; + +public interface RocksDBConnection extends Closeable, RocksDBAPI { + + /** + * Get connection url + * + * @return connection url + */ + URI getUrl(); +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/SocketConnection.java b/src/main/java/it/cavallium/rockserver/core/client/SocketConnection.java new file mode 100644 index 0000000..63cd1c7 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/SocketConnection.java @@ -0,0 +1,97 @@ +package it.cavallium.rockserver.core.client; + +import it.cavallium.rockserver.core.common.Callback.GetCallback; +import it.cavallium.rockserver.core.common.Callback.IteratorCallback; +import it.cavallium.rockserver.core.common.Callback.PutCallback; +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.RocksDBException; +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.net.SocketAddress; +import org.jetbrains.annotations.Nullable; + +public abstract class SocketConnection extends BaseConnection { + + private final SocketAddress address; + + public SocketConnection(SocketAddress address, String name) { + super(name); + this.address = address; + } + + public SocketAddress getAddress() { + return address; + } + + @Override + public void close() throws IOException { + super.close(); + } + @Override + public long openTransaction(long timeoutMs) { + throw new UnsupportedOperationException(); + } + + @Override + public void closeTransaction(long transactionId) { + throw new UnsupportedOperationException(); + } + + @Override + public long createColumn(String name, ColumnSchema schema) { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteColumn(long columnId) throws RocksDBException { + throw new UnsupportedOperationException(); + } + + @Override + public long getColumnId(String name) { + throw new UnsupportedOperationException(); + } + + @Override + public void put(long transactionId, + long columnId, + MemorySegment[] keys, + @Nullable MemorySegment value, + PutCallback callback) throws RocksDBException { + throw new UnsupportedOperationException(); + } + + @Override + public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback callback) + throws RocksDBException { + throw new UnsupportedOperationException(); + } + + @Override + public long openIterator(long transactionId, + long columnId, + MemorySegment[] startKeysInclusive, + @Nullable MemorySegment[] endKeysExclusive, + boolean reverse, + long timeoutMs) throws RocksDBException { + throw new UnsupportedOperationException(); + } + + @Override + public void closeIterator(long iteratorId) throws RocksDBException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException { + throw new UnsupportedOperationException(); + } + + @Override + public void subsequent(long iterationId, + long skipCount, + long takeCount, + IteratorCallback callback) throws RocksDBException { + throw new UnsupportedOperationException(); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionInet.java b/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionInet.java new file mode 100644 index 0000000..21eab70 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionInet.java @@ -0,0 +1,22 @@ +package it.cavallium.rockserver.core.client; + +import java.net.InetSocketAddress; +import java.net.URI; + +public class SocketConnectionInet extends SocketConnection { + + public SocketConnectionInet(InetSocketAddress address, String name) { + super(address, name); + } + + @Override + public InetSocketAddress getAddress() { + return (InetSocketAddress) super.getAddress(); + } + + @Override + public URI getUrl() { + return URI.create("rocksdb://" + getAddress().getHostString()); + } + +} diff --git a/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionUnix.java b/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionUnix.java new file mode 100644 index 0000000..119777e --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionUnix.java @@ -0,0 +1,30 @@ +package it.cavallium.rockserver.core.client; + +import java.io.IOException; +import java.net.SocketAddress; +import java.net.URI; +import java.net.UnixDomainSocketAddress; +import java.nio.file.Files; + +public class SocketConnectionUnix extends SocketConnection { + + public SocketConnectionUnix(UnixDomainSocketAddress address, String name) { + super(address, name); + } + + @Override + public UnixDomainSocketAddress getAddress() { + return (UnixDomainSocketAddress) super.getAddress(); + } + + @Override + public void close() throws IOException { + super.close(); + Files.deleteIfExists(getAddress().getPath()); + } + + @Override + public URI getUrl() { + return URI.create("unix://" + getAddress().getPath()); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/Callback.java b/src/main/java/it/cavallium/rockserver/core/common/Callback.java new file mode 100644 index 0000000..b1f3b6e --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/Callback.java @@ -0,0 +1,58 @@ +package it.cavallium.rockserver.core.common; + +import java.util.List; +import java.util.Map.Entry; +import org.jetbrains.annotations.Nullable; + +public sealed interface Callback { + + static boolean requiresGettingPreviousValue(PutCallback callback) { + return callback instanceof CallbackPrevious + || callback instanceof CallbackDelta + || callback instanceof CallbackChanged; + } + + static boolean requiresGettingCurrentValue(GetCallback callback) { + return callback instanceof CallbackCurrent; + } + + sealed interface PutCallback extends Callback {} + + sealed interface PatchCallback extends Callback {} + + sealed interface GetCallback extends Callback {} + + sealed interface IteratorCallback extends Callback {} + + non-sealed interface CallbackVoid extends PutCallback, PatchCallback, IteratorCallback, GetCallback {} + + non-sealed interface CallbackPrevious extends PutCallback { + + void onPrevious(@Nullable T previous); + } + + non-sealed interface CallbackCurrent extends GetCallback { + + void onCurrent(@Nullable T previous); + } + + non-sealed interface CallbackExists extends GetCallback, IteratorCallback { + + void onExists(boolean exists); + } + + non-sealed interface CallbackDelta extends PutCallback { + + void onSuccess(Delta previous); + } + + non-sealed interface CallbackMulti extends IteratorCallback { + + void onSuccess(List> elements); + } + + non-sealed interface CallbackChanged extends PutCallback, PatchCallback { + + void onChanged(boolean changed); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/ColumnSchema.java b/src/main/java/it/cavallium/rockserver/core/common/ColumnSchema.java new file mode 100644 index 0000000..18938bd --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/ColumnSchema.java @@ -0,0 +1,36 @@ +package it.cavallium.rockserver.core.common; + +public record ColumnSchema(int[] keys, int variableLengthKeysCount, boolean hasValue) { + public ColumnSchema { + if (variableLengthKeysCount > keys.length) { + throw new IllegalArgumentException("variable length keys count must be less or equal keysCount"); + } + for (int i = 0; i < keys.length - variableLengthKeysCount; i++) { + if (keys[i] <= 0) { + throw new UnsupportedOperationException("Key length must be > 0"); + } + } + for (int i = keys.length - variableLengthKeysCount; i < keys.length; i++) { + if (keys[i] <= 1) { + throw new UnsupportedOperationException("Key hash length must be > 1"); + } + } + } + + /** + * Keys with their length + * @return an array with the length of each key, variable-length keys must have the length of their hash + */ + @Override + public int[] keys() { + return keys; + } + + /** + * The last n keys that are variable-length + */ + @Override + public int variableLengthKeysCount() { + return variableLengthKeysCount; + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/Delta.java b/src/main/java/it/cavallium/rockserver/core/common/Delta.java new file mode 100644 index 0000000..1a11292 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/Delta.java @@ -0,0 +1,5 @@ +package it.cavallium.rockserver.core.common; + +import org.jetbrains.annotations.Nullable; + +public record Delta(@Nullable T previous, @Nullable T current) {} diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPI.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPI.java new file mode 100644 index 0000000..09e1c61 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBAPI.java @@ -0,0 +1,112 @@ +package it.cavallium.rockserver.core.common; + +import it.cavallium.rockserver.core.common.Callback.GetCallback; +import it.cavallium.rockserver.core.common.Callback.IteratorCallback; +import it.cavallium.rockserver.core.common.Callback.PutCallback; +import java.lang.foreign.MemorySegment; +import org.jetbrains.annotations.Nullable; + +public interface RocksDBAPI { + + /** + * Open a transaction + * @param timeoutMs timeout in milliseconds + * @return transaction id + */ + long openTransaction(long timeoutMs) throws RocksDBException; + + /** + * Close a transaction + * @param transactionId transaction id to close + */ + void closeTransaction(long transactionId) throws RocksDBException; + + /** + * Create a column + * @param name column name + * @param schema column key-value schema + * @return column id + */ + long createColumn(String name, ColumnSchema schema) throws RocksDBException; + + /** + * Delete a column + * @param columnId column id + */ + void deleteColumn(long columnId) throws RocksDBException; + + /** + * Get column id by name + * @param name column name + * @return column id + */ + long getColumnId(String name) throws RocksDBException; + + /** + * Put an element into the specified position + * @param transactionId transaction id, or 0 + * @param columnId column id + * @param keys column keys, or empty array if not needed + * @param value value, or null if not needed + * @param callback the callback will be executed on the same thread, exactly once. + */ + void put(long transactionId, + long columnId, + MemorySegment[] keys, + @Nullable MemorySegment value, + PutCallback callback) throws RocksDBException; + + /** + * Get an element from the specified position + * @param transactionId transaction id, or 0 + * @param columnId column id + * @param keys column keys, or empty array if not needed + * @param callback the callback will be executed on the same thread, exactly once. + */ + void get(long transactionId, + long columnId, + MemorySegment[] keys, + GetCallback callback) throws RocksDBException; + + /** + * Open an iterator + * @param transactionId transaction id, or 0 + * @param columnId column id + * @param startKeysInclusive start keys, inclusive. [] means "the beginning" + * @param endKeysExclusive end keys, exclusive. Null means "the end" + * @param reverse if true, seek in reverse direction + * @param timeoutMs timeout in milliseconds + * @return iterator id + */ + long openIterator(long transactionId, + long columnId, + MemorySegment[] startKeysInclusive, + @Nullable MemorySegment[] endKeysExclusive, + boolean reverse, + long timeoutMs) throws RocksDBException; + + /** + * Close an iterator + * @param iteratorId iterator id + */ + void closeIterator(long iteratorId) throws RocksDBException; + + /** + * Seek to the specific element during an iteration, or the subsequent one if not found + * @param iterationId iteration id + * @param keys keys, inclusive. [] means "the beginning" + */ + void seekTo(long iterationId, MemorySegment[] keys) throws RocksDBException; + + /** + * Get the subsequent element during an iteration + * @param iterationId iteration id + * @param skipCount number of elements to skip + * @param takeCount number of elements to take + * @param callback the callback will be executed on the same thread, exactly once. + */ + void subsequent(long iterationId, + long skipCount, + long takeCount, + IteratorCallback callback) throws RocksDBException; +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java new file mode 100644 index 0000000..c89b26e --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/RocksDBException.java @@ -0,0 +1,39 @@ +package it.cavallium.rockserver.core.common; + +public class RocksDBException extends RuntimeException { + + private final RocksDBErrorType errorUniqueId; + + public enum RocksDBErrorType { + PUT_UNKNOWN, PUT_2, UNEXPECTED_NULL_VALUE, PUT_1, PUT_3, GET_1, COLUMN_EXISTS, COLUMN_CREATE_FAIL, COLUMN_NOT_FOUND, COLUMN_DELETE_FAIL, CONFIG_ERROR, VALUE_MUST_BE_NULL + + } + + public RocksDBException(RocksDBErrorType errorUniqueId, String message) { + super(message); + this.errorUniqueId = errorUniqueId; + } + + public RocksDBException(RocksDBErrorType errorUniqueId, String message, Throwable ex) { + super(message, ex); + this.errorUniqueId = errorUniqueId; + } + + public RocksDBException(RocksDBErrorType errorUniqueId, Throwable ex) { + super(ex.toString(), ex); + this.errorUniqueId = errorUniqueId; + } + + public RocksDBException(RocksDBErrorType errorUniqueId, org.rocksdb.RocksDBException ex) { + this(errorUniqueId, ex.getMessage()); + } + + public RocksDBErrorType getErrorUniqueId() { + return errorUniqueId; + } + + @Override + public String getLocalizedMessage() { + return "RocksDBError: [uid:" + errorUniqueId + "] " + getMessage(); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/common/Utils.java b/src/main/java/it/cavallium/rockserver/core/common/Utils.java new file mode 100644 index 0000000..d7ed266 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/common/Utils.java @@ -0,0 +1,50 @@ +package it.cavallium.rockserver.core.common; + +import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES; + +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class Utils { + + /** + * Returns the value of the {@code int} argument, throwing an exception if the value overflows an {@code char}. + * + * @param value the int value + * @return the argument as a char + * @throws ArithmeticException if the {@code argument} overflows a char + * @since 1.8 + */ + public static char toCharExact(int value) { + if ((char) value != value) { + throw new ArithmeticException("char overflow"); + } + return (char) value; + } + + /** + * Returns the value of the {@code long} argument, throwing an exception if the value overflows an {@code char}. + * + * @param value the long value + * @return the argument as a char + * @throws ArithmeticException if the {@code argument} overflows a char + * @since 1.8 + */ + public static char toCharExact(long value) { + if ((char) value != value) { + throw new ArithmeticException("char overflow"); + } + return (char) value; + } + + @NotNull + public static MemorySegment toMemorySegment(Arena arena, byte @Nullable [] array) { + if (array != null) { + return arena.allocateArray(BIG_ENDIAN_BYTES, array); + } else { + return MemorySegment.NULL; + } + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/BloomFilterConfig.java b/src/main/java/it/cavallium/rockserver/core/config/BloomFilterConfig.java new file mode 100644 index 0000000..f252416 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/BloomFilterConfig.java @@ -0,0 +1,17 @@ +package it.cavallium.rockserver.core.config; + +public interface BloomFilterConfig { + + int bitsPerKey(); + + boolean optimizeForHits(); + + static String stringify(BloomFilterConfig o) { + return """ + { + "bits-per-key": %d, + "optimize-for-hits": %b + }\ + """.formatted(o.bitsPerKey(), o.optimizeForHits()); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/DataSize.java b/src/main/java/it/cavallium/rockserver/core/config/DataSize.java new file mode 100644 index 0000000..7fb8940 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/DataSize.java @@ -0,0 +1,194 @@ +package it.cavallium.rockserver.core.config; + +import java.io.Serial; +import java.text.CharacterIterator; +import java.text.StringCharacterIterator; +import org.jetbrains.annotations.NotNull; + +@SuppressWarnings("unused") +public final class DataSize extends Number implements Comparable { + + @Serial + private static final long serialVersionUID = 7213411239846723568L; + + public static DataSize ZERO = new DataSize(0L); + public static DataSize ONE = new DataSize(1L); + public static DataSize KIB = new DataSize(1024L); + public static DataSize KB = new DataSize(1000L); + public static DataSize MIB = new DataSize(1024L * 1024); + public static DataSize MB = new DataSize(1000L * 1000); + public static DataSize GIB = new DataSize(1024L * 1024 * 1024); + public static DataSize GB = new DataSize(1000L * 1000 * 1000); + public static DataSize TIB = new DataSize(1024L * 1024 * 1024 * 1024); + public static DataSize TB = new DataSize(1000L * 1000 * 1000 * 1024); + public static DataSize PIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024); + public static DataSize PB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024); + public static DataSize EIB = new DataSize(1024L * 1024 * 1024 * 1024 * 1024 * 1024); + public static DataSize EB = new DataSize(1000L * 1000 * 1000 * 1024 * 1024 * 1024); + public static DataSize MAX_VALUE = new DataSize(Long.MAX_VALUE); + + private final long size; + + public DataSize(long size) { + this.size = size; + } + + public DataSize(String size) { + size = size.replaceAll("\\s|_", ""); + switch (size) { + case "", "0", "-0", "+0" -> { + this.size = 0; + return; + } + case "∞", "inf", "infinite", "∞b" -> { + this.size = Long.MAX_VALUE; + return; + } + } + int numberStartOffset = 0; + int numberEndOffset = 0; + boolean negative = false; + { + boolean firstChar = true; + boolean numberMode = true; + for (char c : size.toCharArray()) { + if (c == '-') { + if (firstChar) { + negative = true; + numberStartOffset++; + numberEndOffset++; + } else { + throw new IllegalArgumentException("Found a minus character after index 0"); + } + } else if (Character.isDigit(c)) { + if (numberMode) { + numberEndOffset++; + } else { + throw new IllegalArgumentException("Found a number after the unit"); + } + } else if (Character.isLetter(c)) { + if (numberEndOffset - numberStartOffset <= 0) { + throw new IllegalArgumentException("No number found"); + } + if (numberMode) { + numberMode = false; + } + } else { + throw new IllegalArgumentException("Unsupported character"); + } + if (firstChar) { + firstChar = false; + } + } + } + var number = Long.parseUnsignedLong(size, numberStartOffset, numberEndOffset, 10); + if (numberEndOffset == size.length()) { + // No measurement + this.size = (negative ? -1 : 1) * number; + return; + } + // Measurements are like B, MB, or MiB, not longer + if (size.length() - numberEndOffset > 3) { + throw new IllegalArgumentException("Wrong measurement unit"); + } + var scaleChar = size.charAt(numberEndOffset); + boolean powerOf2 = numberEndOffset + 1 < size.length() && size.charAt(numberEndOffset + 1) == 'i'; + int k = powerOf2 ? 1024 : 1000; + var scale = switch (scaleChar) { + case 'B' -> 1; + case 'b' -> throw new IllegalArgumentException("Bits are not allowed"); + case 'K', 'k' -> k; + case 'M', 'm' -> k * k; + case 'G', 'g' -> k * k * k; + case 'T', 't' -> k * k * k * k; + case 'P', 'p' -> k * k * k * k * k; + case 'E', 'e' -> k * k * k * k * k * k; + case 'Z', 'z' -> k * k * k * k * k * k * k; + case 'Y', 'y' -> k * k * k * k * k * k * k * k; + default -> throw new IllegalStateException("Unexpected value: " + scaleChar); + }; + // if scale is 1, the unit should be "B", nothing more + if (scale == 1 && numberEndOffset + 1 != size.length()) { + throw new IllegalArgumentException("Invalid unit"); + } + this.size = (negative ? -1 : 1) * number * scale; + } + + public static Long get(DataSize value) { + if (value == null) { + return null; + } else { + return value.size; + } + } + + public static long getOrElse(DataSize value, @NotNull DataSize defaultValue) { + if (value == null) { + return defaultValue.size; + } else { + return value.size; + } + } + + + @Override + public int intValue() { + if (size >= Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } + return (int) size; + } + + @Override + public long longValue() { + return size; + } + + @Override + public float floatValue() { + return size; + } + + @Override + public double doubleValue() { + return size; + } + + @Override + public String toString() { + return toString(true); + } + + public String toString(boolean precise) { + boolean siUnits = size % 1000 == 0; + int k = siUnits ? 1000 : 1024; + long lSize = size; + CharacterIterator ci = new StringCharacterIterator((siUnits ? "k" : "K") + "MGTPEZY"); + while ((precise ? lSize % k == 0 : lSize > k) && lSize != 0) { + lSize /= k; + ci.next(); + } + if (lSize == size) { + return lSize + "B"; + } + return lSize + "" + ci.previous() + (siUnits ? "B" : "iB"); + } + + @Override + public int compareTo(@NotNull DataSize anotherLong) { + return Long.compare(this.size, anotherLong.size); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof DataSize) { + return size == ((DataSize)obj).size; + } + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(size); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/DatabaseCompression.java b/src/main/java/it/cavallium/rockserver/core/config/DatabaseCompression.java new file mode 100644 index 0000000..1bbdedd --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/DatabaseCompression.java @@ -0,0 +1,11 @@ +package it.cavallium.rockserver.core.config; + +public enum DatabaseCompression { + PLAIN, + SNAPPY, + LZ4, + LZ4_HC, + ZSTD, + ZLIB, + BZLIB2 +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java new file mode 100644 index 0000000..206ba9a --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java @@ -0,0 +1,13 @@ +package it.cavallium.rockserver.core.config; + +public interface DatabaseConfig { + + GlobalDatabaseConfig global(); + + static String stringify(DatabaseConfig o) { + return """ + { + "global": %s + }""".formatted(GlobalDatabaseConfig.stringify(o.global())); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/DatabaseLevel.java b/src/main/java/it/cavallium/rockserver/core/config/DatabaseLevel.java new file mode 100644 index 0000000..f5838e8 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/DatabaseLevel.java @@ -0,0 +1,17 @@ +package it.cavallium.rockserver.core.config; + +public interface DatabaseLevel { + + DatabaseCompression compression(); + + DataSize maxDictBytes(); + + static String stringify(DatabaseLevel o) { + return """ + { + "compression": "%s", + "max-dict-bytes": "%s" + }\ + """.formatted(o.compression(), o.maxDictBytes()); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnOptions.java b/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnOptions.java new file mode 100644 index 0000000..bd30644 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/FallbackColumnOptions.java @@ -0,0 +1,44 @@ +package it.cavallium.rockserver.core.config; + +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Collectors; + +public interface FallbackColumnOptions { + + DatabaseLevel[] levels(); + + DataSize memtableMemoryBudgetBytes(); + + boolean cacheIndexAndFilterBlocks(); + + boolean partitionFilters(); + + BloomFilterConfig bloomFilter(); + + DataSize blockSize(); + + DataSize writeBufferSize(); + + static String stringify(FallbackColumnOptions o) { + return """ + { + "levels": %s, + "memtable-memory-budget-bytes": "%s", + "cache-index-and-filter-blocks": %b, + "partition-filters": %s, + "bloom-filter": %s, + "block-size": "%s", + "write-buffer-size": "%s" + }\ + """.formatted(Arrays.stream(Objects.requireNonNullElse(o.levels(), new DatabaseLevel[0])) + .map(DatabaseLevel::stringify).collect(Collectors.joining(",", "[", "]")), + o.memtableMemoryBudgetBytes(), + o.cacheIndexAndFilterBlocks(), + o.partitionFilters(), + BloomFilterConfig.stringify(o.bloomFilter()), + o.blockSize(), + o.writeBufferSize() + ); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java new file mode 100644 index 0000000..69a4352 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java @@ -0,0 +1,61 @@ +package it.cavallium.rockserver.core.config; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Objects; +import java.util.stream.Collectors; + +public interface GlobalDatabaseConfig { + + boolean spinning(); + + boolean checksum(); + + boolean useDirectIo(); + + boolean allowRocksdbMemoryMapping(); + + int maximumOpenFiles(); + + boolean optimistic(); + + DataSize blockCache(); + + DataSize writeBufferManager(); + + Path logPath(); + + FallbackColumnOptions fallbackColumnOptions(); + NamedColumnOptions[] columnOptions(); + + static String stringify(GlobalDatabaseConfig o) { + return """ + { + "spinning": %b, + "checksum": %b, + "use-direct-io": %b, + "allow-rocksdb-memory-mapping": %b, + "maximum-open-files": %d, + "optimistic": %b, + "block-cache": "%s", + "write-buffer-manager": "%s", + "log-path": "%s", + "fallback-column-options": %s, + "column-options": %s + }\ + """.formatted(o.spinning(), + o.checksum(), + o.useDirectIo(), + o.allowRocksdbMemoryMapping(), + o.maximumOpenFiles(), + o.optimistic(), + o.blockCache(), + o.writeBufferManager(), + o.logPath(), + FallbackColumnOptions.stringify(o.fallbackColumnOptions()), + Arrays.stream(Objects.requireNonNullElse(o.columnOptions(), new NamedColumnOptions[0])) + .map(NamedColumnOptions::stringify) + .collect(Collectors.joining(",", "[", "]")) + ); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/config/NamedColumnOptions.java b/src/main/java/it/cavallium/rockserver/core/config/NamedColumnOptions.java new file mode 100644 index 0000000..178988d --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/config/NamedColumnOptions.java @@ -0,0 +1,35 @@ +package it.cavallium.rockserver.core.config; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +public interface NamedColumnOptions extends FallbackColumnOptions { + + String name(); + + static String stringify(NamedColumnOptions o) { + return """ + { + "name": "%s", + "levels": %s, + "memtable-memory-budget-bytes": "%s", + "cache-index-and-filter-blocks": %b, + "partition-filters": %s, + "bloom-filter": %s, + "block-size": "%s", + "write-buffer-size": "%s" + }\ + """.formatted(o.name(), + (o.levels() != null ? List.of(o.levels()) : List.of()).stream() + .map(DatabaseLevel::stringify).collect(Collectors.joining(",", "[", "]")), + o.memtableMemoryBudgetBytes(), + o.cacheIndexAndFilterBlocks(), + o.partitionFilters(), + BloomFilterConfig.stringify(o.bloomFilter()), + o.blockSize(), + o.writeBufferSize() + ); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/Bucket.java b/src/main/java/it/cavallium/rockserver/core/impl/Bucket.java new file mode 100644 index 0000000..bc89c6e --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/Bucket.java @@ -0,0 +1,161 @@ +package it.cavallium.rockserver.core.impl; + +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.ArrayList; +import java.util.Map; +import java.util.Map.Entry; +import org.jetbrains.annotations.Nullable; + +public class Bucket { + + private final ColumnInstance col; + private final ArrayList> elements; + + public Bucket(ColumnInstance col) { + this(col, MemorySegment.NULL); + } + + public Bucket(ColumnInstance col, MemorySegment rawBucketSegment) { + this.col = col; + long offset = 0; + this.elements = new ArrayList<>(); + long rawBucketSegmentByteSize = rawBucketSegment.byteSize(); + if (rawBucketSegmentByteSize > 0) { + var elements = rawBucketSegment.get(ValueLayout.JAVA_INT_UNALIGNED, offset); + offset += Integer.BYTES; + int elementI = 0; + while (elementI < elements) { + var elementKVSize = rawBucketSegment.get(ValueLayout.JAVA_INT_UNALIGNED, offset); + offset += Integer.BYTES; + + MemorySegment[] bucketElementKeys; + { + int segmentOffset = 0; + var elementKVSegment = rawBucketSegment.asSlice(offset, elementKVSize, 1); + int readKeys = 0; + bucketElementKeys = new MemorySegment[col.schema().variableLengthKeysCount()]; + while (readKeys < col.schema().variableLengthKeysCount()) { + var keyISize = elementKVSegment.get(ValueLayout.JAVA_CHAR_UNALIGNED, segmentOffset); + segmentOffset += Character.BYTES; + var elementKeyISegment = elementKVSegment.asSlice(segmentOffset, keyISize); + bucketElementKeys[readKeys] = elementKeyISegment; + segmentOffset += keyISize; + readKeys++; + } + + MemorySegment bucketElementValues; + if (col.schema().hasValue()) { + bucketElementValues = elementKVSegment.asSlice(segmentOffset, elementKVSize - segmentOffset); + segmentOffset = elementKVSize; + } else { + bucketElementValues = MemorySegment.NULL; + assert segmentOffset == elementKVSize; + } + + var entry = Map.entry(bucketElementKeys, bucketElementValues); + + offset += segmentOffset; + } + elementI++; + } + assert offset == rawBucketSegmentByteSize; + } + } + + /** + * Add or replace an element + * @return Return the previous value ({@link MemorySegment#NULL} if no value is expected), + * return null if no element was present + */ + @Nullable + public MemorySegment addElement(MemorySegment[] bucketVariableKeys, @Nullable MemorySegment value) { + var element = Map.entry(bucketVariableKeys, value != null ? value : MemorySegment.NULL); + var i = indexOf(bucketVariableKeys); + if (i == -1) { + this.elements.add(element); + return null; + } else { + var val = this.elements.set(i, element).getValue(); + assert val != null; + return val; + } + } + + /** + * Remove an element + * @return Return the previous value ({@link MemorySegment#NULL} if no value is expected), + * return null if no element was present + */ + @Nullable + public MemorySegment removeElement(MemorySegment[] bucketVariableKeys) { + var i = indexOf(bucketVariableKeys); + if (i == -1) { + return null; + } else { + var val = this.elements.remove(i).getValue(); + assert val != null; + return val; + } + } + + /** + * Get an element + * @return Return the value ({@link MemorySegment#NULL} if no value is expected), + * return null if no element was present + */ + @Nullable + public MemorySegment getElement(MemorySegment[] bucketVariableKeys) { + var i = indexOf(bucketVariableKeys); + if (i == -1) { + return null; + } else { + var val = this.elements.get(i).getValue(); + assert val != null; + return val; + } + } + + private int indexOf(MemorySegment[] bucketVariableKeys) { + for (int i = 0; i < elements.size(); i++) { + var elem = elements.get(i); + var arrayKeys = elem.getKey(); + assert arrayKeys.length == bucketVariableKeys.length; + for (int j = 0; j < arrayKeys.length; j++) { + if (MemorySegment.mismatch(arrayKeys[j], 0, arrayKeys[j].byteSize(), bucketVariableKeys[j], 0, bucketVariableKeys[j].byteSize()) == -1) { + return i; + } + } + } + return -1; + } + + public MemorySegment toSegment(Arena arena) { + if (this.elements.isEmpty()) { + return MemorySegment.NULL; + } + MemorySegment[] serializedElements = new MemorySegment[this.elements.size()]; + ArrayList> entries = this.elements; + for (int i = 0; i < entries.size(); i++) { + Entry element = entries.get(i); + var computedBucketElementKey = col.computeBucketElementKey(arena, element.getKey()); + var computedBucketElementValue = col.computeBucketElementValue(element.getValue()); + serializedElements[i] = col.computeBucketElementKeyValue(arena, computedBucketElementKey, computedBucketElementValue); + } + long totalSize = Integer.BYTES; + for (MemorySegment serializedElement : serializedElements) { + totalSize += serializedElement.byteSize(); + } + var segment = arena.allocate(totalSize); + long offset = 0; + segment.set(ColumnInstance.BIG_ENDIAN_INT, offset, serializedElements.length); + offset += Integer.BYTES; + for (MemorySegment elementAtI : serializedElements) { + var elementSize = elementAtI.byteSize(); + MemorySegment.copy(elementAtI, 0, segment, offset, elementSize); + offset += elementSize; + } + return segment; + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java b/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java new file mode 100644 index 0000000..33cf761 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/ColumnInstance.java @@ -0,0 +1,171 @@ +package it.cavallium.rockserver.core.impl; + +import static it.cavallium.rockserver.core.common.Utils.toCharExact; +import static java.lang.Math.toIntExact; + +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.RocksDBException; +import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout.OfByte; +import java.lang.foreign.ValueLayout.OfChar; +import java.lang.foreign.ValueLayout.OfInt; +import java.lang.foreign.ValueLayout.OfShort; +import java.nio.ByteOrder; +import java.util.Arrays; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ColumnFamilyHandle; + +public record ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema, int finalKeySizeBytes) implements AutoCloseable { + + public static final OfByte BIG_ENDIAN_BYTES = OfByte.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN); + + public static final OfInt BIG_ENDIAN_INT = OfByte.JAVA_INT.withOrder(ByteOrder.BIG_ENDIAN); + + public static final OfShort BIG_ENDIAN_SHORT = OfByte.JAVA_SHORT.withOrder(ByteOrder.BIG_ENDIAN); + + public static final OfShort BIG_ENDIAN_SHORT_UNALIGNED = OfByte.JAVA_SHORT_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN); + + public static final OfChar BIG_ENDIAN_CHAR = OfByte.JAVA_CHAR.withOrder(ByteOrder.BIG_ENDIAN); + + public static final OfChar BIG_ENDIAN_CHAR_UNALIGNED = OfByte.JAVA_CHAR_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN); + public static final OfInt BIG_ENDIAN_INT_UNALIGNED = OfByte.JAVA_INT_UNALIGNED.withOrder(ByteOrder.BIG_ENDIAN); + + public ColumnInstance(ColumnFamilyHandle cfh, ColumnSchema schema) { + this(cfh, schema, calculateFinalKeySizeBytes(schema)); + } + + private static int calculateFinalKeySizeBytes(ColumnSchema schema) { + int total = 0; + for (int i : schema.keys()) { + total += i; + } + return total; + } + + @Override + public void close() { + cfh.close(); + } + + public boolean requiresWriteTransaction() { + return schema.variableLengthKeysCount() > 0; + } + + public boolean hasBuckets() { + return schema.variableLengthKeysCount() > 0; + } + + @NotNull + public MemorySegment calculateKey(Arena arena, MemorySegment[] keys) { + validateKeyCount(keys); + MemorySegment finalKey; + if (keys.length == 0) { + finalKey = MemorySegment.NULL; + } else if(keys.length == 1 && !hasBuckets()) { + finalKey = keys[0]; + } else { + finalKey = arena.allocate(finalKeySizeBytes); + long offsetBytes = 0; + for (int i = 0; i < schema.keys().length; i++) { + var computedKeyAtI = computeKeyAt(arena, i, keys); + var computedKeyAtISize = computedKeyAtI.byteSize(); + MemorySegment.copy(computedKeyAtI, 0, finalKey, offsetBytes, computedKeyAtISize); + offsetBytes += computedKeyAtISize; + } + } + validateFinalKeySize(finalKey); + return finalKey; + } + + private MemorySegment computeKeyAt(Arena arena, int i, MemorySegment[] keys) { + if (i < schema.keys().length - schema.variableLengthKeysCount()) { + return keys[i]; + } else { + if (schema.keys()[i] != Integer.BYTES) { + throw new UnsupportedOperationException("Hash size different than 32-bit is currently unsupported"); + } else { + return XXHash32.getInstance().hash(arena, keys[i], 0, 0, 0); + } + } + } + + private void validateFinalKeySize(MemorySegment key) { + if (finalKeySizeBytes != key.byteSize()) { + throw new IllegalArgumentException( + "Keys size must be equal to the column keys size. Expected " + finalKeySizeBytes + ", got " + + key.byteSize()); + } + } + + private void validateKeyCount(MemorySegment[] keys) { + if (schema.keys().length != keys.length) { + throw new IllegalArgumentException( + "Keys count must be equal to the column keys count. Expected " + schema.keys().length + ", got " + + keys.length); + } + } + + public MemorySegment computeBucketElementKey(Arena arena, MemorySegment[] variableKeys) { + long totalSize = 0L; + assert variableKeys.length == schema.variableLengthKeysCount(); + for (MemorySegment variableKey : variableKeys) { + totalSize += Character.BYTES + variableKey.byteSize(); + } + MemorySegment bucketElementKey = arena.allocate(totalSize); + long offset = 0; + for (MemorySegment keyI : variableKeys) { + var keyISize = keyI.byteSize(); + bucketElementKey.set(BIG_ENDIAN_CHAR_UNALIGNED, offset += Character.BYTES, toCharExact(keyISize)); + MemorySegment.copy(keyI, 0, bucketElementKey, offset += keyISize, keyISize); + } + assert offset == totalSize; + return bucketElementKey; + } + + public MemorySegment computeBucketElementValue(@Nullable MemorySegment value) { + checkNullableValue(value); + if (value != null) { + return value; + } else { + return MemorySegment.NULL; + } + } + + public void checkNullableValue(MemorySegment value) { + if (schema.hasValue() == (value == null || value == MemorySegment.NULL)) { + if (schema.hasValue()) { + throw new RocksDBException(RocksDBErrorType.UNEXPECTED_NULL_VALUE, "Schema expects a value, but a null value has been passed"); + } else { + throw new RocksDBException(RocksDBErrorType.VALUE_MUST_BE_NULL, "Schema expects no value, but a non-null value has been passed"); + } + } + } + + public MemorySegment computeBucketElementKeyValue(Arena arena, MemorySegment computedBucketElementKey, + @Nullable MemorySegment computedBucketElementValue) { + checkNullableValue(computedBucketElementValue); + var keySize = computedBucketElementKey.byteSize(); + var valueSize = computedBucketElementValue != null ? computedBucketElementValue.byteSize() : 0; + var totalSize = keySize + valueSize; + var computedBucketElementKV = arena.allocate(totalSize); + computedBucketElementKV.set(BIG_ENDIAN_INT, 0, toIntExact(totalSize)); + MemorySegment.copy(computedBucketElementKey, 0, computedBucketElementKV, Integer.BYTES, keySize); + if (computedBucketElementValue != null) { + MemorySegment.copy(computedBucketElementValue, 0, computedBucketElementKV, Integer.BYTES + keySize, valueSize); + } + return computedBucketElementKV; + } + + /** + * Get only the variable-length keys + */ + public MemorySegment[] getBucketElementKeys(MemorySegment[] keys) { + assert keys.length == schema.keys().length; + return Arrays.copyOfRange(keys, + schema.keys().length - schema.variableLengthKeysCount(), + schema.keys().length); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/DataSizeDecoder.java b/src/main/java/it/cavallium/rockserver/core/impl/DataSizeDecoder.java new file mode 100644 index 0000000..2a4ec08 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/DataSizeDecoder.java @@ -0,0 +1,38 @@ +package it.cavallium.rockserver.core.impl; + +import it.cavallium.rockserver.core.config.DataSize; +import java.util.List; +import org.github.gestalt.config.decoder.Decoder; +import org.github.gestalt.config.decoder.DecoderService; +import org.github.gestalt.config.decoder.Priority; +import org.github.gestalt.config.entity.ValidationError; +import org.github.gestalt.config.node.ConfigNode; +import org.github.gestalt.config.reflect.TypeCapture; +import org.github.gestalt.config.utils.ValidateOf; + +class DataSizeDecoder implements Decoder { + + @Override + public Priority priority() { + return Priority.LOW; + } + + @Override + public String name() { + return "DataSize"; + } + + @Override + public boolean matches(TypeCapture klass) { + return klass.isAssignableFrom(DataSize.class); + } + + @Override + public ValidateOf decode(String path, ConfigNode node, TypeCapture type, DecoderService decoderService) { + try { + return ValidateOf.validateOf(new DataSize(node.getValue().orElseThrow()), List.of()); + } catch (Exception ex) { + return ValidateOf.inValid(new ValidationError.DecodingNumberFormatException(path, node, name())); + } + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java new file mode 100644 index 0000000..40268b1 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -0,0 +1,491 @@ +package it.cavallium.rockserver.core.impl; + +import static it.cavallium.rockserver.core.common.Utils.toMemorySegment; +import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES; +import static java.lang.foreign.MemorySegment.NULL; +import static java.util.Objects.requireNonNullElse; +import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; +import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; + +import it.cavallium.rockserver.core.common.Callback.GetCallback; +import it.cavallium.rockserver.core.common.Callback.IteratorCallback; +import it.cavallium.rockserver.core.common.Callback.PutCallback; +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.Callback; +import it.cavallium.rockserver.core.common.Delta; +import it.cavallium.rockserver.core.common.RocksDBAPI; +import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; +import it.cavallium.rockserver.core.common.Utils; +import it.cavallium.rockserver.core.config.DatabaseConfig; +import it.cavallium.rockserver.core.impl.rocksdb.REntry; +import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects; +import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB; +import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; +import java.io.Closeable; +import java.io.IOException; +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.cliffc.high_scale_lib.NonBlockingHashMapLong; +import org.github.gestalt.config.builder.GestaltBuilder; +import org.github.gestalt.config.exceptions.GestaltException; +import org.github.gestalt.config.source.ClassPathConfigSource; +import org.github.gestalt.config.source.FileConfigSource; +import org.jetbrains.annotations.Nullable; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.Transaction; +import org.rocksdb.WriteOptions; + +public class EmbeddedDB implements RocksDBAPI, Closeable { + + private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; + private static final boolean USE_FAST_GET = true; + private final Logger logger; + private final Path path; + private final Path embeddedConfigPath; + private final DatabaseConfig config; + private TransactionalDB db; + private final NonBlockingHashMapLong columns; + private final ConcurrentMap columnNamesIndex; + private final NonBlockingHashMapLong> txs; + private final SafeShutdown ops; + private final Object columnEditLock = new Object(); + + public EmbeddedDB(Path path, @Nullable Path embeddedConfigPath) { + this.path = path; + this.embeddedConfigPath = embeddedConfigPath; + this.logger = Logger.getLogger("db"); + this.columns = new NonBlockingHashMapLong<>(); + this.txs = new NonBlockingHashMapLong<>(); + this.columnNamesIndex = new ConcurrentHashMap<>(); + this.ops = new SafeShutdown(); + + var gsb = new GestaltBuilder(); + try { + gsb.addSource(new ClassPathConfigSource("it/cavallium/rockserver/core/resources/default.conf")); + if (embeddedConfigPath != null) { + gsb.addSource(new FileConfigSource(this.embeddedConfigPath)); + } + var gestalt = gsb + .addDecoder(new DataSizeDecoder()) + .addDefaultConfigLoaders() + .addDefaultDecoders() + .build(); + gestalt.loadConfigs(); + + this.config = gestalt.getConfig("database", DatabaseConfig.class); + logger.log(Level.INFO, "Database configuration: {0}", DatabaseConfig.stringify(this.config)); + } catch (GestaltException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.CONFIG_ERROR, e); + } + } + + /** + * The column must be registered once!!! + * Do not try to register a column that may already be registered + */ + private long registerColumn(ColumnInstance column) { + try { + var columnName = new String(column.cfh().getName(), StandardCharsets.UTF_8); + long id = FastRandomUtils.allocateNewValue(this.columns, column, 1, Long.MAX_VALUE); + Long previous = this.columnNamesIndex.putIfAbsent(columnName, id); + if (previous != null) { + //noinspection resource + this.columns.remove(id); + throw new UnsupportedOperationException("Column already registered!"); + } + return id; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + /** + * The column must be unregistered once!!! + * Do not try to unregister a column that may already be unregistered, or that may not be registered + */ + private ColumnInstance unregisterColumn(long id) { + var col = this.columns.remove(id); + Objects.requireNonNull(col, () -> "Column does not exist: " + id); + String name; + try { + name = new String(col.cfh().getName(), StandardCharsets.UTF_8); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + // Unregister the column name from the index avoiding race conditions + int retries = 0; + while (this.columnNamesIndex.remove(name) == null && retries++ < 5_000) { + Thread.yield(); + } + if (retries >= 5000) { + throw new IllegalStateException("Can't find column in column names index: " + name); + } + return col; + } + + @Override + public void close() throws IOException { + // Wait for 10 seconds + try { + ops.closeAndWait(10_000); + } catch (TimeoutException e) { + logger.log(Level.SEVERE, "Some operations lasted more than 10 seconds, forcing database shutdown..."); + } + } + + @Override + public long openTransaction(long timeoutMs) { + ops.beginOp(); + TransactionalOptions txOpts = db.createTransactionalOptions(); + var writeOpts = new WriteOptions(); + var tx = new REntry<>(db.beginTransaction(writeOpts, txOpts), new RocksDBObjects(writeOpts, txOpts)); + return FastRandomUtils.allocateNewValue(txs, tx, Long.MIN_VALUE, -2); + } + + @Override + public void closeTransaction(long transactionId) { + var tx = txs.remove(transactionId); + if (tx != null) { + try { + tx.close(); + } finally { + ops.endOp(); + } + } else { + throw new NoSuchElementException("Transaction not found: " + transactionId); + } + } + + @Override + public long createColumn(String name, ColumnSchema schema) throws it.cavallium.rockserver.core.common.RocksDBException { + synchronized (columnEditLock) { + var colId = getColumnIdOrNull(name); + var col = colId != null ? getColumn(colId) : null; + if (col != null) { + if (schema.equals(col.schema())) { + return colId; + } else { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_EXISTS, + "Column exists, with a different schema: " + name + ); + } + } else { + try { + var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8))); + return registerColumn(new ColumnInstance(cf, schema)); + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_CREATE_FAIL, e); + } + } + } + } + + @Override + public void deleteColumn(long columnId) throws it.cavallium.rockserver.core.common.RocksDBException { + synchronized (columnEditLock) { + var col = getColumn(columnId); + try { + db.get().dropColumnFamily(col.cfh()); + unregisterColumn(columnId).close(); + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_DELETE_FAIL, e); + } + } + } + + @Override + public long getColumnId(String name) { + var columnId = getColumnIdOrNull(name); + if (columnId == null) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_NOT_FOUND, + "Column not found: " + name); + } else { + return columnId; + } + } + + private Long getColumnIdOrNull(String name) { + var columnId = (long) columnNamesIndex.getOrDefault(name, -1L); + ColumnInstance col; + if (columnId == -1L || (col = columns.get(columnId)) == null || !col.cfh().isOwningHandle()) { + return null; + } else { + return columnId; + } + } + + @Override + public void put(long transactionId, + long columnId, + MemorySegment[] keys, + @Nullable MemorySegment value, + PutCallback callback) throws it.cavallium.rockserver.core.common.RocksDBException { + ops.beginOp(); + try (var arena = Arena.ofConfined()) { + // Column id + var col = getColumn(columnId); + // Check for null value + col.checkNullableValue(value); + + MemorySegment previousValue; + REntry tx; + if (transactionId != 0) { + tx = getTransaction(transactionId); + } else { + tx = null; + } + MemorySegment calculatedKey = col.calculateKey(arena, keys); + if (col.hasBuckets()) { + if (tx != null) { + var bucketElementKeys = col.getBucketElementKeys(keys); + try (var readOptions = new ReadOptions()) { + var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); + MemorySegment previousRawBucket = Utils.toMemorySegment(arena, previousRawBucketByteArray); + var bucket = new Bucket(col, previousRawBucket); + previousValue = bucket.addElement(bucketElementKeys, value); + tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(bucket.toSegment(arena))); + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_1, e); + } + } else { + // Retry using a transaction: transactions are required to handle this kind of data + var newTransactionId = this.openTransaction(Long.MAX_VALUE); + try { + put(newTransactionId, columnId, keys, value, callback); + return; + } finally { + this.closeTransaction(newTransactionId); + } + } + } else { + if (Callback.requiresGettingPreviousValue(callback)) { + try (var readOptions = new ReadOptions()) { + byte[] previousValueByteArray; + if (tx != null) { + previousValueByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); + } else { + previousValueByteArray = db.get().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); + } + previousValue = toMemorySegment(arena, previousValueByteArray); + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e); + } + } else { + previousValue = null; + } + if (tx != null) { + tx.val().put(col.cfh(), toByteArray(calculatedKey), toByteArray(requireNonNullElse(value, NULL))); + } else { + try (var w = new WriteOptions()) { + db.get().put(col.cfh(), w, calculatedKey.asByteBuffer(), requireNonNullElse(value, NULL).asByteBuffer()); + } + } + } + switch (callback) { + case Callback.CallbackVoid ignored -> {} + case Callback.CallbackPrevious c -> c.onPrevious(previousValue); + case Callback.CallbackChanged c -> c.onChanged(valueEquals(previousValue, value)); + case Callback.CallbackDelta c -> c.onSuccess(new Delta<>(previousValue, value)); + default -> throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_3, + "Unexpected value: " + callback); + } + } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { + throw ex; + } catch (Exception ex) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN, ex); + } finally { + ops.endOp(); + } + } + + @Override + public void get(long transactionId, long columnId, MemorySegment[] keys, GetCallback callback) + throws it.cavallium.rockserver.core.common.RocksDBException { + ops.beginOp(); + try (var arena = Arena.ofConfined()) { + // Column id + var col = getColumn(columnId); + + MemorySegment foundValue; + boolean existsValue; + REntry tx; + if (transactionId != 0) { + tx = getTransaction(transactionId); + } else { + tx = null; + } + MemorySegment calculatedKey = col.calculateKey(arena, keys); + if (col.hasBuckets()) { + var bucketElementKeys = col.getBucketElementKeys(keys); + try (var readOptions = new ReadOptions()) { + MemorySegment previousRawBucket = dbGet(tx, col, arena, readOptions, calculatedKey); + var bucket = new Bucket(col, previousRawBucket); + foundValue = bucket.getElement(bucketElementKeys); + existsValue = foundValue != null; + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.GET_1, e); + } + } else { + boolean shouldGetCurrent = Callback.requiresGettingCurrentValue(callback) + || (tx != null && callback instanceof Callback.CallbackExists); + if (shouldGetCurrent) { + try (var readOptions = new ReadOptions()) { + foundValue = dbGet(tx, col, arena, readOptions, calculatedKey); + existsValue = foundValue != null; + } catch (RocksDBException e) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_2, e); + } + } else if (callback instanceof Callback.CallbackExists) { + // tx is always null here + //noinspection ConstantValue + assert tx == null; + foundValue = null; + existsValue = db.get().keyExists(calculatedKey.asByteBuffer()); + } else { + foundValue = null; + existsValue = false; + } + } + switch (callback) { + case Callback.CallbackVoid ignored -> {} + case Callback.CallbackCurrent c -> c.onCurrent(foundValue); + case Callback.CallbackExists c -> c.onExists(existsValue); + default -> throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_3, + "Unexpected value: " + callback); + } + } catch (it.cavallium.rockserver.core.common.RocksDBException ex) { + throw ex; + } catch (Exception ex) { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.PUT_UNKNOWN, ex); + } finally { + ops.endOp(); + } + } + + @Override + public long openIterator(long transactionId, + long columnId, + MemorySegment[] startKeysInclusive, + @Nullable MemorySegment[] endKeysExclusive, + boolean reverse, + long timeoutMs) throws it.cavallium.rockserver.core.common.RocksDBException { + return 0; + } + + @Override + public void closeIterator(long iteratorId) throws it.cavallium.rockserver.core.common.RocksDBException { + + } + + @Override + public void seekTo(long iterationId, MemorySegment[] keys) + throws it.cavallium.rockserver.core.common.RocksDBException { + + } + + @Override + public void subsequent(long iterationId, + long skipCount, + long takeCount, + IteratorCallback callback) throws it.cavallium.rockserver.core.common.RocksDBException { + + } + + private MemorySegment dbGet(REntry tx, + ColumnInstance col, + Arena arena, + ReadOptions readOptions, + MemorySegment calculatedKey) throws RocksDBException { + if (tx != null) { + var previousRawBucketByteArray = tx.val().get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); + return Utils.toMemorySegment(arena, previousRawBucketByteArray); + } else { + var db = this.db.get(); + if (USE_FAST_GET) { + return dbGetDirect(arena, readOptions, calculatedKey); + } else { + var previousRawBucketByteArray = db.get(col.cfh(), readOptions, calculatedKey.toArray(BIG_ENDIAN_BYTES)); + return Utils.toMemorySegment(arena, previousRawBucketByteArray); + } + } + } + + @Nullable + private MemorySegment dbGetDirect(Arena arena, ReadOptions readOptions, MemorySegment calculatedKey) + throws RocksDBException { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer = calculatedKey.asByteBuffer(); + + // Create a direct result buffer because RocksDB works only with direct buffers + var resultBuffer = arena.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES).asByteBuffer(); + var keyMayExist = this.db.get().keyMayExist(readOptions, keyNioBuffer.rewind(), resultBuffer.clear()); + return switch (keyMayExist.exists) { + case kNotExist -> null; + case kExistsWithValue, kExistsWithoutValue -> { + // At the beginning, size reflects the expected size, then it becomes the real data size + int size = keyMayExist.exists == kExistsWithValue ? keyMayExist.valueLength : -1; + if (keyMayExist.exists == kExistsWithoutValue || size > resultBuffer.limit()) { + if (size > resultBuffer.capacity()) { + resultBuffer = arena.allocate(size).asByteBuffer(); + } + size = this.db.get().get(readOptions, keyNioBuffer.rewind(), resultBuffer.clear()); + } + + if (size == RocksDB.NOT_FOUND) { + yield null; + } else if (size == resultBuffer.limit()) { + yield MemorySegment.ofBuffer(resultBuffer.position(0)); + } else { + throw new IllegalStateException("size (" + size + ") != read size (" + resultBuffer.limit() + ")"); + } + } + }; + } + + private boolean valueEquals(MemorySegment previousValue, MemorySegment currentValue) { + previousValue = requireNonNullElse(previousValue, NULL); + currentValue = requireNonNullElse(currentValue, NULL); + return MemorySegment.mismatch(previousValue, 0, previousValue.byteSize(), currentValue, 0, currentValue.byteSize()) != -1; + } + + public static byte[] toByteArray(MemorySegment memorySegment) { + return memorySegment.toArray(BIG_ENDIAN_BYTES); + } + + private ColumnInstance getColumn(long columnId) { + var col = columns.get(columnId); + if (col != null) { + return col; + } else { + throw new it.cavallium.rockserver.core.common.RocksDBException(RocksDBErrorType.COLUMN_NOT_FOUND, + "No column with id " + columnId); + } + } + + private REntry getTransaction(long transactionId) { + var tx = txs.get(transactionId); + if (tx != null) { + return tx; + } else { + throw new NoSuchElementException("No transaction with id " + transactionId); + } + } + + public Path getPath() { + return path; + } + +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/FastRandomUtils.java b/src/main/java/it/cavallium/rockserver/core/impl/FastRandomUtils.java new file mode 100644 index 0000000..71b2bfa --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/FastRandomUtils.java @@ -0,0 +1,19 @@ +package it.cavallium.rockserver.core.impl; + +import java.util.concurrent.ThreadLocalRandom; +import org.cliffc.high_scale_lib.NonBlockingHashMapLong; + +public class FastRandomUtils { + + public static long allocateNewValue(NonBlockingHashMapLong map, T value, long minId, long maxId) { + long newTransactionId; + do { + newTransactionId = getRandomId(minId, maxId); + } while (map.putIfAbsent(newTransactionId, value) != null); + return newTransactionId; + } + + private static long getRandomId(long minId, long maxId) { + return ThreadLocalRandom.current().nextLong(minId, maxId); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/SafeShutdown.java b/src/main/java/it/cavallium/rockserver/core/impl/SafeShutdown.java new file mode 100644 index 0000000..7e80cb4 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/SafeShutdown.java @@ -0,0 +1,55 @@ +package it.cavallium.rockserver.core.impl; + +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAdder; + +public class SafeShutdown implements AutoCloseable { + + private volatile boolean closing; + + private final LongAdder pendingOps = new LongAdder(); + + public void beginOp() { + if (closing) { + throw new IllegalStateException("Closed"); + } + pendingOps.increment(); + } + + public void endOp() { + pendingOps.decrement(); + } + + public void closeAndWait(long timeoutMs) throws TimeoutException { + this.closing = true; + waitForExit(timeoutMs); + } + + public void waitForExit(long timeoutMs) throws TimeoutException { + try { + long startMs = System.nanoTime(); + while (pendingOps.sum() > 0 && System.nanoTime() - startMs < (timeoutMs * 1000000L)) { + //noinspection BusyWait + Thread.sleep(10); + } + if (pendingOps.sum() > 0) { + throw new TimeoutException(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + try { + closeAndWait(Long.MAX_VALUE); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + } + + public boolean isOpen() { + return !closing; + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/SlicingArena.java b/src/main/java/it/cavallium/rockserver/core/impl/SlicingArena.java new file mode 100644 index 0000000..f75e627 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/SlicingArena.java @@ -0,0 +1,28 @@ +package it.cavallium.rockserver.core.impl; + +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.SegmentAllocator; + +public class SlicingArena implements Arena { + + final Arena arena = Arena.ofConfined(); + final SegmentAllocator slicingAllocator; + + public SlicingArena(long size) { + slicingAllocator = SegmentAllocator.slicingAllocator(arena.allocate(size)); + } + + public MemorySegment allocate(long byteSize, long byteAlignment) { + return slicingAllocator.allocate(byteSize, byteAlignment); + } + + public MemorySegment.Scope scope() { + return arena.scope(); + } + + public void close() { + arena.close(); + } + +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/rockserver/core/impl/XXHash32.java b/src/main/java/it/cavallium/rockserver/core/impl/XXHash32.java new file mode 100644 index 0000000..122e1bf --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/XXHash32.java @@ -0,0 +1,48 @@ +package it.cavallium.rockserver.core.impl; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; + +/** + * A 32-bits hash. + *

+ * Instances of this class are thread-safe. + */ +public abstract class XXHash32 { + + /** + * Compute the 32-bits hash of buf[off:off+len] using seed + * seed. + */ + public abstract int hash(byte[] buf, int off, int len, int seed); + + /** + * Compute the big-endian 32-bits hash of buf[off:off+len] using seed + * seed. + */ + public abstract MemorySegment hash(Arena arena, MemorySegment buf, int off, int len, int seed); + + @Override + public String toString() { + return getClass().getSimpleName(); + } + + public static XXHash32 getInstance() { + return XXHash32JavaSafe.INSTANCE; + } + +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/rockserver/core/impl/XXHash32JavaSafe.java b/src/main/java/it/cavallium/rockserver/core/impl/XXHash32JavaSafe.java new file mode 100644 index 0000000..6116286 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/XXHash32JavaSafe.java @@ -0,0 +1,222 @@ +package it.cavallium.rockserver.core.impl; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME1; +import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME2; +import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME3; +import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME4; +import static it.cavallium.rockserver.core.impl.XXHashUtils.PRIME5; +import static java.lang.Integer.rotateLeft; + +import java.lang.foreign.Arena; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.lang.foreign.ValueLayout.OfByte; +import java.lang.foreign.ValueLayout.OfInt; +import java.nio.ByteOrder; + +/** + * Safe Java implementation of {@link XXHash32}. + */ +public final class XXHash32JavaSafe extends XXHash32 { + + + public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder(); + public static final XXHash32 INSTANCE = new XXHash32JavaSafe(); + private static final OfInt INT_LE = ValueLayout.JAVA_INT.withOrder(ByteOrder.LITTLE_ENDIAN); + private static final OfInt INT_BE = ValueLayout.JAVA_INT.withOrder(ByteOrder.BIG_ENDIAN); + private static final OfByte BYTE_BE = ValueLayout.JAVA_BYTE.withOrder(ByteOrder.BIG_ENDIAN); + + @Override + public int hash(byte[] buf, int off, int len, int seed) { + checkRange(buf, off, len); + + final int end = off + len; + int h32; + + if (len >= 16) { + final int limit = end - 16; + int v1 = seed + PRIME1 + PRIME2; + int v2 = seed + PRIME2; + int v3 = seed + 0; + int v4 = seed - PRIME1; + do { + v1 += readIntLE(buf, off) * PRIME2; + v1 = rotateLeft(v1, 13); + v1 *= PRIME1; + off += 4; + + v2 += readIntLE(buf, off) * PRIME2; + v2 = rotateLeft(v2, 13); + v2 *= PRIME1; + off += 4; + + v3 += readIntLE(buf, off) * PRIME2; + v3 = rotateLeft(v3, 13); + v3 *= PRIME1; + off += 4; + + v4 += readIntLE(buf, off) * PRIME2; + v4 = rotateLeft(v4, 13); + v4 *= PRIME1; + off += 4; + } while (off <= limit); + + h32 = rotateLeft(v1, 1) + rotateLeft(v2, 7) + rotateLeft(v3, 12) + rotateLeft(v4, 18); + } else { + h32 = seed + PRIME5; + } + + h32 += len; + + while (off <= end - 4) { + h32 += readIntLE(buf, off) * PRIME3; + h32 = rotateLeft(h32, 17) * PRIME4; + off += 4; + } + + while (off < end) { + h32 += (buf[off] & 0xFF) * PRIME5; + h32 = rotateLeft(h32, 11) * PRIME1; + ++off; + } + + h32 ^= h32 >>> 15; + h32 *= PRIME2; + h32 ^= h32 >>> 13; + h32 *= PRIME3; + h32 ^= h32 >>> 16; + + return h32; + } + + @Override + public MemorySegment hash(Arena arena, MemorySegment buf, int off, int len, int seed) { + checkRange(buf, off, len); + + final int end = off + len; + int h32; + + if (len >= 16) { + final int limit = end - 16; + int v1 = seed + PRIME1 + PRIME2; + int v2 = seed + PRIME2; + int v3 = seed + 0; + int v4 = seed - PRIME1; + do { + v1 += readIntLE(buf, off) * PRIME2; + v1 = rotateLeft(v1, 13); + v1 *= PRIME1; + off += 4; + + v2 += readIntLE(buf, off) * PRIME2; + v2 = rotateLeft(v2, 13); + v2 *= PRIME1; + off += 4; + + v3 += readIntLE(buf, off) * PRIME2; + v3 = rotateLeft(v3, 13); + v3 *= PRIME1; + off += 4; + + v4 += readIntLE(buf, off) * PRIME2; + v4 = rotateLeft(v4, 13); + v4 *= PRIME1; + off += 4; + } while (off <= limit); + + h32 = rotateLeft(v1, 1) + rotateLeft(v2, 7) + rotateLeft(v3, 12) + rotateLeft(v4, 18); + } else { + h32 = seed + PRIME5; + } + + h32 += len; + + while (off <= end - 4) { + h32 += readIntLE(buf, off) * PRIME3; + h32 = rotateLeft(h32, 17) * PRIME4; + off += 4; + } + + while (off < end) { + h32 += (buf.get(BYTE_BE, off) & 0xFF) * PRIME5; + h32 = rotateLeft(h32, 11) * PRIME1; + ++off; + } + + h32 ^= h32 >>> 15; + h32 *= PRIME2; + h32 ^= h32 >>> 13; + h32 *= PRIME3; + h32 ^= h32 >>> 16; + + return arena.allocate(INT_BE, h32); + } + + private static void checkRange(byte[] buf, int off) { + if (off < 0 || off >= buf.length) { + throw new ArrayIndexOutOfBoundsException(off); + } + } + + private static void checkRange(MemorySegment buf, int off) { + if (off < 0 || off >= buf.byteSize()) { + throw new ArrayIndexOutOfBoundsException(off); + } + } + + private static void checkRange(byte[] buf, int off, int len) { + checkLength(len); + if (len > 0) { + checkRange(buf, off); + checkRange(buf, off + len - 1); + } + } + + private static void checkRange(MemorySegment buf, int off, int len) { + checkLength(len); + if (len > 0) { + checkRange(buf, off); + checkRange(buf, off + len - 1); + } + } + + private static void checkLength(int len) { + if (len < 0) { + throw new IllegalArgumentException("lengths must be >= 0"); + } + } + + private static int readIntBE(byte[] buf, int i) { + return ((buf[i] & 0xFF) << 24) | ((buf[i+1] & 0xFF) << 16) | ((buf[i+2] & 0xFF) << 8) | (buf[i+3] & 0xFF); + } + + private static int readIntLE(byte[] buf, int i) { + return (buf[i] & 0xFF) | ((buf[i+1] & 0xFF) << 8) | ((buf[i+2] & 0xFF) << 16) | ((buf[i+3] & 0xFF) << 24); + } + + private static int readIntLE(MemorySegment buf, int i) { + return buf.get(INT_LE, i); + } + + private static int readInt(byte[] buf, int i) { + if (NATIVE_BYTE_ORDER == ByteOrder.BIG_ENDIAN) { + return readIntBE(buf, i); + } else { + return readIntLE(buf, i); + } + } +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/rockserver/core/impl/XXHashUtils.java b/src/main/java/it/cavallium/rockserver/core/impl/XXHashUtils.java new file mode 100644 index 0000000..c96b9d6 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/XXHashUtils.java @@ -0,0 +1,26 @@ +package it.cavallium.rockserver.core.impl; + +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +enum XXHashUtils { + ; + + static final int PRIME1 = -1640531535; + static final int PRIME2 = -2048144777; + static final int PRIME3 = -1028477379; + static final int PRIME4 = 668265263; + static final int PRIME5 = 374761393; + +} \ No newline at end of file diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/REntry.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/REntry.java new file mode 100644 index 0000000..eb62771 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/REntry.java @@ -0,0 +1,13 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import java.io.Closeable; +import org.rocksdb.AbstractNativeReference; + +public record REntry(T val, RocksDBObjects objs) implements Closeable { + + @Override + public void close() { + val.close(); + objs.close(); + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBObjects.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBObjects.java new file mode 100644 index 0000000..364918c --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBObjects.java @@ -0,0 +1,37 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import java.util.ArrayList; +import java.util.List; + +public class RocksDBObjects implements AutoCloseable { + private final List refs; + + public RocksDBObjects(int size) { + this.refs = new ArrayList<>(size); + } + public RocksDBObjects() { + this.refs = new ArrayList<>(); + } + + public RocksDBObjects(AutoCloseable... refs) { + this(refs.length); + for (AutoCloseable ref : refs) { + add(ref); + } + } + + public void add(AutoCloseable ref) { + this.refs.add(ref); + } + + @Override + public void close() { + for (int i = refs.size() - 1; i >= 0; i--) { + try { + refs.get(i).close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java new file mode 100644 index 0000000..0966346 --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java @@ -0,0 +1,217 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import java.io.Closeable; +import java.io.IOException; +import org.rocksdb.OptimisticTransactionDB; +import org.rocksdb.OptimisticTransactionOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.Transaction; +import org.rocksdb.TransactionDB; +import org.rocksdb.TransactionOptions; +import org.rocksdb.WriteOptions; + +public sealed interface TransactionalDB extends Closeable { + + static TransactionalDB create(RocksDB db) { + return switch (db) { + case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(optimisticTransactionDB); + case TransactionDB transactionDB -> new PessimisticTransactionalDB(transactionDB); + default -> throw new UnsupportedOperationException("This database is not transactional"); + }; + } + + TransactionalOptions createTransactionalOptions(); + + RocksDB get(); + /** + * Starts a new Transaction. + *

+ * Caller is responsible for calling {@link #close()} on the returned + * transaction when it is no longer needed. + * + * @param writeOptions Any write options for the transaction + * @return a new transaction + */ + Transaction beginTransaction(final WriteOptions writeOptions); + + /** + * Starts a new Transaction. + *

+ * Caller is responsible for calling {@link #close()} on the returned + * transaction when it is no longer needed. + * + * @param writeOptions Any write options for the transaction + * @param transactionOptions Any options for the transaction + * @return a new transaction + */ + Transaction beginTransaction(final WriteOptions writeOptions, + final TransactionalOptions transactionOptions); + + /** + * Starts a new Transaction. + *

+ * Caller is responsible for calling {@link #close()} on the returned + * transaction when it is no longer needed. + * + * @param writeOptions Any write options for the transaction + * @param oldTransaction this Transaction will be reused instead of allocating + * a new one. This is an optimization to avoid extra allocations + * when repeatedly creating transactions. + * @return The oldTransaction which has been reinitialized as a new + * transaction + */ + Transaction beginTransaction(final WriteOptions writeOptions, + final Transaction oldTransaction); + + /** + * Starts a new Transaction. + *

+ * Caller is responsible for calling {@link #close()} on the returned + * transaction when it is no longer needed. + * + * @param writeOptions Any write options for the transaction + * @param transactionOptions Any options for the transaction + * @param oldTransaction this Transaction will be reused instead of allocating + * a new one. This is an optimization to avoid extra allocations + * when repeatedly creating transactions. + * @return The oldTransaction which has been reinitialized as a new + * transaction + */ + Transaction beginTransaction(final WriteOptions writeOptions, + final TransactionalOptions transactionOptions, final Transaction oldTransaction); + + sealed interface TransactionalOptions extends Closeable { + + @Override + void close(); + } + + final class PessimisticTransactionalDB implements TransactionalDB { + + private final TransactionDB db; + + public PessimisticTransactionalDB(TransactionDB db) { + this.db = db; + } + + @Override + public TransactionalOptions createTransactionalOptions() { + return new TransactionalOptionsPessimistic(new TransactionOptions()); + } + + @Override + public RocksDB get() { + return db; + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions) { + return db.beginTransaction(writeOptions); + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions, TransactionalOptions transactionOptions) { + return db.beginTransaction(writeOptions, + ((TransactionalOptionsPessimistic) transactionOptions).transactionOptions + ); + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions, Transaction oldTransaction) { + return db.beginTransaction(writeOptions, oldTransaction); + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions, + TransactionalOptions transactionOptions, + Transaction oldTransaction) { + return db.beginTransaction(writeOptions, + ((TransactionalOptionsPessimistic) transactionOptions).transactionOptions, + oldTransaction + ); + } + + @Override + public void close() throws IOException { + try { + db.closeE(); + } catch (RocksDBException e) { + throw new IOException(e); + } + } + + private record TransactionalOptionsPessimistic(TransactionOptions transactionOptions) implements + TransactionalOptions { + + @Override + public void close() { + transactionOptions.close(); + } + } + } + + final class OptimisticTransactionalDB implements TransactionalDB { + + private final OptimisticTransactionDB db; + + public OptimisticTransactionalDB(OptimisticTransactionDB db) { + this.db = db; + } + + @Override + public TransactionalOptions createTransactionalOptions() { + return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions()); + } + + @Override + public RocksDB get() { + return db; + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions) { + return db.beginTransaction(writeOptions); + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions, TransactionalOptions transactionOptions) { + return db.beginTransaction(writeOptions, + ((TransactionalOptionsOptimistic) transactionOptions).transactionOptions + ); + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions, Transaction oldTransaction) { + return db.beginTransaction(writeOptions, oldTransaction); + } + + @Override + public Transaction beginTransaction(WriteOptions writeOptions, + TransactionalOptions transactionOptions, + Transaction oldTransaction) { + return db.beginTransaction(writeOptions, + ((TransactionalOptionsOptimistic) transactionOptions).transactionOptions, + oldTransaction + ); + } + + @Override + public void close() throws IOException { + try { + db.closeE(); + } catch (RocksDBException e) { + throw new IOException(e); + } + } + + private record TransactionalOptionsOptimistic(OptimisticTransactionOptions transactionOptions) implements + TransactionalOptions { + + @Override + public void close() { + transactionOptions.close(); + } + } + } +} diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java new file mode 100644 index 0000000..eebafa1 --- /dev/null +++ b/src/main/java/module-info.java @@ -0,0 +1,19 @@ +module rockserver.core { + requires rocksdbjni; + requires net.sourceforge.argparse4j; + requires inet.ipaddr; + requires java.logging; + requires typesafe.config; + requires org.jetbrains.annotations; + requires high.scale.lib; + requires org.github.gestalt.core; + requires org.github.gestalt.hocon; + + exports it.cavallium.rockserver.core.client; + exports it.cavallium.rockserver.core.common; + exports it.cavallium.rockserver.core.config; + opens it.cavallium.rockserver.core.resources; + opens it.cavallium.rockserver.core.config to org.github.gestalt.core, org.github.gestalt.hocon; + exports it.cavallium.rockserver.core.impl.rocksdb; + exports it.cavallium.rockserver.core.impl; +} \ No newline at end of file diff --git a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf new file mode 100644 index 0000000..cd993da --- /dev/null +++ b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf @@ -0,0 +1,95 @@ +database: { + global: { + # Keep false unless you have a legacy database + enable-column-bug: false + # Enable to adapt the database to spinning disk + spinning: false + # Error checking + checksum: true + # Use direct I/O in RocksDB databases (Higher I/O read throughput but OS cache is not used, less swapping, less memory pressure) + use-direct-io: true + # Allow memory mapped (mmap) RocksDB databases (High OS cache usage if direct I/O is not enabled) + allow-rocksdb-memory-mapping: false + # Maximum open files for each RocksDB database instance. -1 is infinite. + # If the maximum open files count is -1, the initial startup time will be slower. + # If "cacheIndexAndFilterBlocks" is false, the memory will rise when the number of open files rises. + maximum-open-files: -1 + optimistic: true + # Database block cache size + block-cache: 512MiB + # Database write buffer manager size + # You should enable this option if you are using direct I/O or spinning disks + write-buffer-manager: 64MiB + # Log data path + log-path: ./logs + # Write-Ahead-Log data path + wal-path: ./wal + fallback-column-options: { + # RocksDB data levels + # Available compression types: PLAIN, SNAPPY, LZ4, LZ4_HC, ZSTD, ZLIB, BZLIB2 + levels: [ + { + compression: LZ4 + max-dict-bytes: 0 + } + { + compression: LZ4 + max-dict-bytes: 0 + } + { + compression: ZSTD + max-dict-bytes: 0 + } + { + compression: ZSTD + max-dict-bytes: 0 + } + { + compression: ZSTD + max-dict-bytes: 0 + } + { + compression: ZSTD + max-dict-bytes: 0 + } + { + compression: ZSTD + # Maximum compression dictionary bytes per-sst + max-dict-bytes: 32KiB + } + ] + # Memtable memory budget for RocksDB + # Used to optimize compactions and avoid write stalls + memtable-memory-budget-bytes: 128MiB + # Disable to reduce IOWAIT and make the read/writes faster + # Enable to reduce ram usage + # If maximum-open-files is != -1, this option must be set to true, + # otherwise the indexes and filters will be unloaded often + cache-index-and-filter-blocks: true + # Disable to reduce IOWAIT and make the read/writes faster + # Enable to reduce ram usage + partition-filters: false + # Bloom filter. + bloom-filter: { + # Bits per key. This will determine bloom memory size: bitsPerKey * totalKeys + bits-per-key: 10 + # Disable bloom for the bottommost level, this reduces the memory size to 1/10 + optimize-for-hits: false + } + # Use relatively larger block sizes to reduce index block size. + # You should use at least 64KB block size. + # You can consider 256KB or even 512KB. + # The downside of using large blocks is that RAM is wasted in the block cache. + block-size: 16KiB + # This should be kept to null if write-buffer-manager is set, + # or if you want to use the "memtable-memory-budget-size" logic. + # Remember that there are "max-write-buffer-number" in memory, 2 by default + write-buffer-size: 200MiB + } + column-options: [ + ${database.global.fallback-column-options} { + name: "default" + } + ] + } +} diff --git a/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java new file mode 100644 index 0000000..b558454 --- /dev/null +++ b/src/test/java/it/cavallium/rockserver/core/impl/test/EmbeddedDBTest.java @@ -0,0 +1,52 @@ +package it.cavallium.rockserver.core.impl.test; + +import it.cavallium.rockserver.core.client.EmbeddedConnection; +import it.cavallium.rockserver.core.common.Callback.CallbackDelta; +import it.cavallium.rockserver.core.common.ColumnSchema; +import it.cavallium.rockserver.core.common.Delta; +import java.io.File; +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.stream.Stream; + +class EmbeddedDBTest { + + private Path dir; + private EmbeddedConnection db; + + @org.junit.jupiter.api.BeforeEach + void setUp() throws IOException { + this.dir = Files.createTempDirectory("db-test"); + db = new EmbeddedConnection(dir, "test", null); + } + + @org.junit.jupiter.api.AfterEach + void tearDown() throws IOException { + try (Stream walk = Files.walk(dir)) { + db.close(); + walk.sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .peek(System.out::println) + .forEach(File::delete); + } + } + + @org.junit.jupiter.api.Test + void put() { + var colId = db.createColumn("put-1", new ColumnSchema(new int[]{16, 16, 16, 32, 32}, 2, true)); + db.put(0, colId, null, null, new CallbackDelta() { + @Override + public void onSuccess(Delta previous) { + + } + }); + db.deleteColumn(colId); + } + + @org.junit.jupiter.api.Test + void get() { + } +} \ No newline at end of file diff --git a/src/test/java/it/cavallium/rockserver/core/test/XXHash32Test.java b/src/test/java/it/cavallium/rockserver/core/test/XXHash32Test.java new file mode 100644 index 0000000..8c53501 --- /dev/null +++ b/src/test/java/it/cavallium/rockserver/core/test/XXHash32Test.java @@ -0,0 +1,34 @@ +package it.cavallium.rockserver.core.test; + +import it.cavallium.rockserver.core.impl.XXHash32; +import java.lang.foreign.Arena; +import java.lang.foreign.ValueLayout; +import java.lang.foreign.ValueLayout.OfByte; +import java.nio.ByteOrder; +import java.util.concurrent.ThreadLocalRandom; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class XXHash32Test { + + public static void main(String[] args) { + new XXHash32Test().test(); + } + + @Test + public void test() { + var safeXxhash32 = net.jpountz.xxhash.XXHashFactory.safeInstance().hash32(); + var myXxhash32 = XXHash32.getInstance(); + for (int runs = 0; runs < 3; runs++) { + for (int len = 0; len < 600; len++) { + byte[] bytes = new byte[len]; + ThreadLocalRandom.current().nextBytes(bytes); + var hash = safeXxhash32.hash(bytes, 0, bytes.length, Integer.MIN_VALUE); + var a = Arena.global(); + var result = myXxhash32.hash(a, a.allocateArray(OfByte.JAVA_BYTE, bytes), 0, bytes.length, Integer.MIN_VALUE); + var resultInt = result.get(ValueLayout.JAVA_INT.withOrder(ByteOrder.BIG_ENDIAN), 0); + Assertions.assertEquals(hash, resultInt); + } + } + } +} diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java new file mode 100644 index 0000000..f43f685 --- /dev/null +++ b/src/test/java/module-info.java @@ -0,0 +1,7 @@ +module rockserver.core.test { + requires org.lz4.java; + requires rockserver.core; + requires org.junit.jupiter.api; + opens it.cavallium.rockserver.core.test; + opens it.cavallium.rockserver.core.impl.test; +} \ No newline at end of file