diff --git a/generate-api.sh b/generate-api.sh
new file mode 100755
index 0000000..2645138
--- /dev/null
+++ b/generate-api.sh
@@ -0,0 +1,2 @@
+#!/bin/bash -e
+thrift -r --gen java:generated_annotations=suppress -out src/main/java src/main/resources/it/cavallium/rockserver/core/resources/rocksdb.thrift
diff --git a/pom.xml b/pom.xml
index 6364377..11a566a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,8 +9,8 @@
1.0.0-SNAPSHOT
- 21
- 21
+ 22
+ 22
0.9.28
0.25.3
9.0.0
@@ -18,6 +18,32 @@
it.cavallium.rockserver.core.Main
+
+
+ sonatype-snapshot
+ Sonatype OSS Snapshots
+
+ false
+
+
+ true
+
+ https://oss.sonatype.org/content/repositories/snapshots
+
+
+
+
+
+
+ io.netty
+ netty5-bom
+ 5.0.0.Alpha6-SNAPSHOT
+ pom
+ import
+
+
+
+
org.rocksdb
@@ -31,8 +57,8 @@
it.unimi.dsi
- fastutil-core
- 8.5.12
+ fastutil
+ 8.5.13
com.github.seancfoley
@@ -60,6 +86,27 @@
gestalt-hocon
${gestalt.version}
+
+ io.netty
+ netty5-buffer
+
+
+ io.netty
+ netty5-codec-http2
+
+
+ io.netty
+ netty5-transport-native-io_uring
+
+
+ io.netty
+ netty5-transport-classes-io_uring
+
+
+ org.apache.thrift
+ libthrift
+ 0.20.0
+
org.lz4
@@ -99,9 +146,6 @@
it.cavallium.rockserver.core.Main
-
- --enable-preview
-
@@ -113,23 +157,13 @@
${maven.compiler.source}
- true
-
- --enable-preview
-
maven-surefire-plugin
-
- --enable-preview
-
maven-failsafe-plugin
-
- --enable-preview
-
@@ -209,7 +243,6 @@
-march=native
-H:IncludeResourceBundles=net.sourceforge.argparse4j.internal.ArgumentParserImpl
-O1
- --enable-preview
-H:+StaticExecutableWithDynamicLibC
-H:+JNI
-H:IncludeResources=librocksdbjni-linux64.so
@@ -235,7 +268,6 @@
java
${project.build.directory}
- --enable-preview
-classpath
${mainClass}
diff --git a/src/main/java/it/cavallium/rockserver/core/Main.java b/src/main/java/it/cavallium/rockserver/core/Main.java
index 2f6d187..9a8c2d0 100644
--- a/src/main/java/it/cavallium/rockserver/core/Main.java
+++ b/src/main/java/it/cavallium/rockserver/core/Main.java
@@ -27,10 +27,14 @@ public class Main {
ArgumentParser parser = ArgumentParsers.newFor("rockserver-core").build()
.defaultHelp(true)
.description("RocksDB server core");
- parser.addArgument("-u", "--url")
+ parser.addArgument("-u", "--database-url")
.type(String.class)
.setDefault(PRIVATE_MEMORY_URL.toString())
.help("Specify database rocksdb://hostname:port, or unix://, or file://");
+ parser.addArgument("-l", "--listen-url")
+ .type(String.class)
+ .setDefault("http://127.0.0.1:5332")
+ .help("Specify database http://hostname:port, or unix://, or file://");
parser.addArgument("-n", "--name")
.type(String.class)
.setDefault("main")
@@ -50,6 +54,7 @@ public class Main {
System.exit(1);
}
var clientBuilder = new it.cavallium.rockserver.core.client.ClientBuilder();
+ var serverBuilder = new it.cavallium.rockserver.core.server.ServerBuilder();
if (ns.getBoolean("print_default_config")) {
requireNonNull(Main.class.getClassLoader()
@@ -62,14 +67,16 @@ public class Main {
LOG.info("Starting...");
RocksDBLoader.loadLibrary();
- var rawUrl = ns.getString("url");
+ var rawDatabaseUrl = ns.getString("database_url");
+ var rawListenUrl = ns.getString("listen_url");
var name = ns.getString("name");
var config = ns.getString("config");
- var url = new URI(rawUrl);
+ var databaseUrl = new URI(rawDatabaseUrl);
+ var listenUrl = new URI(rawListenUrl);
if (config != null) {
- if (!url.getScheme().equals("file")) {
+ if (!databaseUrl.getScheme().equals("file")) {
System.err.println("Do not set --config if the database is not local!");
System.exit(1);
return;
@@ -78,20 +85,36 @@ public class Main {
}
}
- switch (url.getScheme()) {
- case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(url.getPath())));
- case "file" -> clientBuilder.setEmbeddedPath(Path.of((url.getAuthority() != null ? url.getAuthority() : "") + url.getPath()).normalize());
+ switch (databaseUrl.getScheme()) {
+ case "unix" -> clientBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(databaseUrl.getPath())));
+ case "file" -> clientBuilder.setEmbeddedPath(Path.of((databaseUrl.getAuthority() != null ? databaseUrl.getAuthority() : "") + databaseUrl.getPath()).normalize());
case "memory" -> clientBuilder.setEmbeddedInMemory(true);
- case "rocksdb" -> clientBuilder.setAddress(new HostName(url.getHost()).asInetSocketAddress());
- default -> throw new IllegalArgumentException("Invalid scheme: " + url.getScheme());
+ case "rocksdb" -> clientBuilder.setAddress(new HostName(databaseUrl.getHost()).asInetSocketAddress());
+ default -> throw new IllegalArgumentException("Invalid scheme: " + databaseUrl.getScheme());
+ }
+
+ switch (listenUrl.getScheme()) {
+ case "unix" -> serverBuilder.setUnixSocket(UnixDomainSocketAddress.of(Path.of(listenUrl.getPath())));
+ case "http" -> serverBuilder.setHttpAddress(listenUrl.getHost(), listenUrl.getPort());
+ case "rocksdb" -> serverBuilder.setAddress(new HostName(listenUrl.getHost()).asInetSocketAddress());
+ default -> throw new IllegalArgumentException("Invalid scheme: " + listenUrl.getScheme());
}
clientBuilder.setName(name);
try (var connection = clientBuilder.build()) {
LOG.log(Level.INFO, "Connected to {0}", connection);
+
+ serverBuilder.setClient(connection);
+
CountDownLatch shutdownLatch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(shutdownLatch::countDown));
- LOG.info("Shutting down...");
+
+ try (var server = serverBuilder.build()) {
+ shutdownLatch.await();
+ LOG.info("Shutting down...");
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
}
LOG.info("Shut down successfully");
}
diff --git a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java
index 547c3c9..8818c22 100644
--- a/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java
+++ b/src/main/java/it/cavallium/rockserver/core/client/ClientBuilder.java
@@ -1,5 +1,6 @@
package it.cavallium.rockserver.core.client;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Path;
@@ -37,15 +38,15 @@ public class ClientBuilder {
this.embeddedConfig = embeddedConfig;
}
- public RocksDBConnection build() {
+ public RocksDBConnection build() throws IOException {
if (embeddedInMemory) {
return new EmbeddedConnection(null, name, embeddedConfig);
} else if (embeddedPath != null) {
return new EmbeddedConnection(embeddedPath, name, embeddedConfig);
} else if (unixAddress != null) {
- return new SocketConnectionUnix(unixAddress, name);
+ throw new UnsupportedOperationException("Not implemented: unix socket");
} else if (iNetAddress != null) {
- return new SocketConnectionInet(iNetAddress, name);
+ throw new UnsupportedOperationException("Not implemented: inet address");
} else {
throw new UnsupportedOperationException("Please set a connection type");
}
diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java
index d47898b..8e54d69 100644
--- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java
+++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java
@@ -1,10 +1,14 @@
package it.cavallium.rockserver.core.client;
-import it.cavallium.rockserver.core.common.Callback.GetCallback;
-import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
-import it.cavallium.rockserver.core.common.Callback.PutCallback;
+import it.cavallium.rockserver.core.common.RequestType;
+import it.cavallium.rockserver.core.common.RequestType.RequestGet;
+import it.cavallium.rockserver.core.common.RequestType.RequestPut;
import it.cavallium.rockserver.core.common.ColumnSchema;
+import it.cavallium.rockserver.core.common.RocksDBAPI;
+import it.cavallium.rockserver.core.common.RocksDBAPICommand;
+import it.cavallium.rockserver.core.common.RocksDBAsyncAPI;
import it.cavallium.rockserver.core.common.RocksDBException;
+import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import it.cavallium.rockserver.core.impl.EmbeddedDB;
import java.io.IOException;
import java.lang.foreign.Arena;
@@ -13,10 +17,11 @@ import java.net.URI;
import java.nio.file.Path;
import java.util.Optional;
+import java.util.concurrent.CompletionStage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-public class EmbeddedConnection extends BaseConnection {
+public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
private final EmbeddedDB db;
public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private");
@@ -37,6 +42,16 @@ public class EmbeddedConnection extends BaseConnection {
return Optional.ofNullable(db.getPath()).map(Path::toUri).orElse(PRIVATE_MEMORY_URL);
}
+ @Override
+ public RocksDBSyncAPI getSyncApi() {
+ return this;
+ }
+
+ @Override
+ public RocksDBAsyncAPI getAsyncApi() {
+ return this;
+ }
+
@Override
public long openTransaction(long timeoutMs) {
return db.openTransaction(timeoutMs);
@@ -67,14 +82,24 @@ public class EmbeddedConnection extends BaseConnection {
return db.getColumnId(name);
}
+ @Override
+ public R requestSync(RocksDBAPICommand req) {
+ return req.handleSync(this);
+ }
+
+ @Override
+ public CompletionStage requestAsync(RocksDBAPICommand req) {
+ return req.handleAsync(this);
+ }
+
@Override
public T put(Arena arena,
long transactionOrUpdateId,
long columnId,
@NotNull MemorySegment @NotNull [] keys,
@NotNull MemorySegment value,
- PutCallback super MemorySegment, T> callback) throws RocksDBException {
- return db.put(arena, transactionOrUpdateId, columnId, keys, value, callback);
+ RequestPut super MemorySegment, T> requestType) throws RocksDBException {
+ return db.put(arena, transactionOrUpdateId, columnId, keys, value, requestType);
}
@Override
@@ -82,8 +107,8 @@ public class EmbeddedConnection extends BaseConnection {
long transactionOrUpdateId,
long columnId,
MemorySegment @NotNull [] keys,
- GetCallback super MemorySegment, T> callback) throws RocksDBException {
- return db.get(arena, transactionOrUpdateId, columnId, keys, callback);
+ RequestGet super MemorySegment, T> requestType) throws RocksDBException {
+ return db.get(arena, transactionOrUpdateId, columnId, keys, requestType);
}
@Override
@@ -112,7 +137,7 @@ public class EmbeddedConnection extends BaseConnection {
long iterationId,
long skipCount,
long takeCount,
- @NotNull IteratorCallback super MemorySegment, T> callback) throws RocksDBException {
- return db.subsequent(arena, iterationId, skipCount, takeCount, callback);
+ @NotNull RequestType.RequestIterate super MemorySegment, T> requestType) throws RocksDBException {
+ return db.subsequent(arena, iterationId, skipCount, takeCount, requestType);
}
}
diff --git a/src/main/java/it/cavallium/rockserver/core/client/RocksDBConnection.java b/src/main/java/it/cavallium/rockserver/core/client/RocksDBConnection.java
index 768b0f5..8554b22 100644
--- a/src/main/java/it/cavallium/rockserver/core/client/RocksDBConnection.java
+++ b/src/main/java/it/cavallium/rockserver/core/client/RocksDBConnection.java
@@ -1,10 +1,11 @@
package it.cavallium.rockserver.core.client;
-import it.cavallium.rockserver.core.common.RocksDBAPI;
+import it.cavallium.rockserver.core.common.RocksDBAsyncAPI;
+import it.cavallium.rockserver.core.common.RocksDBSyncAPI;
import java.io.Closeable;
import java.net.URI;
-public interface RocksDBConnection extends Closeable, RocksDBAPI {
+public interface RocksDBConnection extends Closeable {
/**
* Get connection url
@@ -12,4 +13,8 @@ public interface RocksDBConnection extends Closeable, RocksDBAPI {
* @return connection url
*/
URI getUrl();
+
+ RocksDBSyncAPI getSyncApi();
+
+ RocksDBAsyncAPI getAsyncApi();
}
diff --git a/src/main/java/it/cavallium/rockserver/core/client/SocketConnection.java b/src/main/java/it/cavallium/rockserver/core/client/SocketConnection.java
deleted file mode 100644
index b274f99..0000000
--- a/src/main/java/it/cavallium/rockserver/core/client/SocketConnection.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package it.cavallium.rockserver.core.client;
-
-import it.cavallium.rockserver.core.common.Callback.GetCallback;
-import it.cavallium.rockserver.core.common.Callback.IteratorCallback;
-import it.cavallium.rockserver.core.common.Callback.PutCallback;
-import it.cavallium.rockserver.core.common.ColumnSchema;
-import it.cavallium.rockserver.core.common.RocksDBException;
-import java.io.IOException;
-import java.lang.foreign.Arena;
-import java.lang.foreign.MemorySegment;
-import java.net.SocketAddress;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-public abstract class SocketConnection extends BaseConnection {
-
- private final SocketAddress address;
-
- public SocketConnection(SocketAddress address, String name) {
- super(name);
- this.address = address;
- }
-
- public SocketAddress getAddress() {
- return address;
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- }
- @Override
- public long openTransaction(long timeoutMs) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean closeTransaction(long transactionId, boolean commit) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void closeFailedUpdate(long updateId) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
- @Override
- public long createColumn(String name, @NotNull ColumnSchema schema) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void deleteColumn(long columnId) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long getColumnId(@NotNull String name) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T put(Arena arena,
- long transactionOrUpdateId,
- long columnId,
- MemorySegment @NotNull [] keys,
- @NotNull MemorySegment value,
- PutCallback super MemorySegment, T> callback) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T get(Arena arena,
- long transactionOrUpdateId,
- long columnId,
- MemorySegment @NotNull [] keys,
- GetCallback super MemorySegment, T> callback) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public long openIterator(Arena arena,
- long transactionId,
- long columnId,
- MemorySegment @NotNull [] startKeysInclusive,
- @Nullable MemorySegment[] endKeysExclusive,
- boolean reverse,
- long timeoutMs) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void closeIterator(long iteratorId) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void seekTo(Arena arena, long iterationId, MemorySegment @NotNull [] keys) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T subsequent(Arena arena,
- long iterationId,
- long skipCount,
- long takeCount,
- @NotNull IteratorCallback super MemorySegment, T> callback) throws RocksDBException {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionInet.java b/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionInet.java
deleted file mode 100644
index 21eab70..0000000
--- a/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionInet.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package it.cavallium.rockserver.core.client;
-
-import java.net.InetSocketAddress;
-import java.net.URI;
-
-public class SocketConnectionInet extends SocketConnection {
-
- public SocketConnectionInet(InetSocketAddress address, String name) {
- super(address, name);
- }
-
- @Override
- public InetSocketAddress getAddress() {
- return (InetSocketAddress) super.getAddress();
- }
-
- @Override
- public URI getUrl() {
- return URI.create("rocksdb://" + getAddress().getHostString());
- }
-
-}
diff --git a/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionUnix.java b/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionUnix.java
deleted file mode 100644
index 9f92c4f..0000000
--- a/src/main/java/it/cavallium/rockserver/core/client/SocketConnectionUnix.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package it.cavallium.rockserver.core.client;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.UnixDomainSocketAddress;
-import java.nio.file.Files;
-
-public class SocketConnectionUnix extends SocketConnection {
-
- public SocketConnectionUnix(UnixDomainSocketAddress address, String name) {
- super(address, name);
- }
-
- @Override
- public UnixDomainSocketAddress getAddress() {
- return (UnixDomainSocketAddress) super.getAddress();
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- Files.deleteIfExists(getAddress().getPath());
- }
-
- @Override
- public URI getUrl() {
- return URI.create("unix://" + getAddress().getPath());
- }
-}
diff --git a/src/main/java/it/cavallium/rockserver/core/common/Callback.java b/src/main/java/it/cavallium/rockserver/core/common/Callback.java
deleted file mode 100644
index 9c1540b..0000000
--- a/src/main/java/it/cavallium/rockserver/core/common/Callback.java
+++ /dev/null
@@ -1,128 +0,0 @@
-package it.cavallium.rockserver.core.common;
-
-import it.cavallium.rockserver.core.common.Callback.CallbackForUpdate;
-import it.cavallium.rockserver.core.common.Callback.CallbackPreviousPresence;
-import java.util.List;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-public sealed interface Callback {
-
- static boolean requiresGettingPreviousValue(PutCallback, ?> callback) {
- return callback instanceof CallbackPrevious>
- || callback instanceof CallbackDelta>
- || callback instanceof CallbackChanged;
- }
-
- static boolean requiresGettingPreviousPresence(PutCallback, ?> callback) {
- return callback instanceof Callback.CallbackPreviousPresence>;
- }
-
- static boolean requiresGettingCurrentValue(GetCallback, ?> callback) {
- return callback instanceof CallbackCurrent>
- || callback instanceof Callback.CallbackForUpdate>;
- }
-
- static U safeCast(Object previousValue) {
- //noinspection unchecked
- return (U) previousValue;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackPrevious previous() {
- return (CallbackPrevious) CallbackPrevious.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackCurrent current() {
- return (CallbackCurrent) CallbackCurrent.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackForUpdate forUpdate() {
- return (CallbackForUpdate) CallbackForUpdate.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackDelta delta() {
- return (CallbackDelta) CallbackDelta.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackExists exists() {
- return (CallbackExists) CallbackExists.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackMulti multi() {
- return (CallbackMulti) CallbackMulti.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackChanged changed() {
- return (CallbackChanged) CallbackChanged.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackPreviousPresence previousPresence() {
- return (CallbackPreviousPresence) CallbackPreviousPresence.INSTANCE;
- }
-
- @SuppressWarnings("unchecked")
- static CallbackVoid none() {
- return (CallbackVoid) CallbackVoid.INSTANCE;
- }
-
- sealed interface PutCallback extends Callback {}
-
- sealed interface PatchCallback extends Callback {}
-
- sealed interface GetCallback extends Callback {}
-
- sealed interface IteratorCallback extends Callback {}
-
- record CallbackVoid() implements PutCallback, PatchCallback, IteratorCallback, GetCallback {
-
- private static final CallbackVoid