diff --git a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java index 31a83c8..7fc3c48 100644 --- a/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java +++ b/src/main/java/it/cavallium/rockserver/core/client/EmbeddedConnection.java @@ -31,7 +31,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI { public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private"); private final ExecutorService exeuctor; - public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) { + public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) throws IOException { super(name); this.db = new EmbeddedDB(path, name, embeddedConfig); this.exeuctor = Executors.newVirtualThreadPerTaskExecutor(); diff --git a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java index 71c1857..d5ad765 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/EmbeddedDB.java @@ -5,6 +5,7 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue; import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue; +import it.cavallium.rockserver.core.common.ColumnHashType; import it.cavallium.rockserver.core.common.RequestType; import it.cavallium.rockserver.core.common.RequestType.RequestGet; import it.cavallium.rockserver.core.common.RequestType.RequestNothing; @@ -25,7 +26,13 @@ import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB; import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions; import it.cavallium.rockserver.core.impl.rocksdb.Tx; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.lang.foreign.Arena; import java.lang.foreign.MemorySegment; @@ -33,6 +40,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; @@ -58,9 +66,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096; public static final long MAX_TRANSACTION_DURATION_MS = 10_000L; private static final boolean USE_FAST_GET = true; + private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8); private final Logger logger; private final @Nullable Path path; private final TransactionalDB db; + private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle; private final NonBlockingHashMapLong columns; private final ConcurrentMap columnNamesIndex; private final NonBlockingHashMapLong txs; @@ -68,7 +78,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { private final SafeShutdown ops; private final Object columnEditLock = new Object(); - public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) { + public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException { this.path = path; this.logger = Logger.getLogger("db." + name); this.columns = new NonBlockingHashMapLong<>(); @@ -78,11 +88,82 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { this.ops = new SafeShutdown(); DatabaseConfig config = ConfigParser.parse(embeddedConfigPath); this.db = RocksDBLoader.load(path, config, logger); + var existingColumnSchemasColumnDescriptorOptional = db + .getStartupColumns() + .entrySet() + .stream() + .filter(e -> Arrays.equals(e.getKey().getName(), COLUMN_SCHEMAS_COLUMN)) + .findAny(); + if (existingColumnSchemasColumnDescriptorOptional.isEmpty()) { + var columnSchemasColumnDescriptor = new ColumnFamilyDescriptor(COLUMN_SCHEMAS_COLUMN); + try { + columnSchemasColumnDescriptorHandle = db.get().createColumnFamily(columnSchemasColumnDescriptor); + } catch (RocksDBException e) { + throw new IOException("Cannot create system column", e); + } + } else { + this.columnSchemasColumnDescriptorHandle = existingColumnSchemasColumnDescriptorOptional.get().getValue(); + } + try (var it = this.db.get().newIterator(columnSchemasColumnDescriptorHandle)) { + it.seekToFirst(); + while (it.isValid()) { + var key = it.key(); + ColumnSchema value = decodeColumnSchema(it.value()); + this.db + .getStartupColumns() + .entrySet() + .stream() + .filter(entry -> Arrays.equals(entry.getKey().getName(), key)) + .findAny() + .ifPresent(entry -> registerColumn(new ColumnInstance(entry.getValue(), value))); + it.next(); + } + } if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) { logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringify(config)); } } + private ColumnSchema decodeColumnSchema(byte[] value) { + try (var is = new ByteArrayInputStream(value); var dis = new DataInputStream(is)) { + var check = dis.readByte(); + assert check == 2; + var size = dis.readInt(); + var keys = new IntArrayList(size); + for (int i = 0; i < size; i++) { + keys.add(dis.readInt()); + } + size = dis.readInt(); + var colHashTypes = new ObjectArrayList(size); + for (int i = 0; i < size; i++) { + colHashTypes.add(ColumnHashType.values()[dis.readUnsignedByte()]); + } + var hasValue = dis.readBoolean(); + return new ColumnSchema(keys, colHashTypes, hasValue); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private byte[] encodeColumnSchema(ColumnSchema schema) { + try (var baos = new ByteArrayOutputStream(); var daos = new DataOutputStream(baos)) { + daos.writeByte(2); + daos.writeInt(schema.keys().size()); + for (int key : schema.keys()) { + daos.writeInt(key); + } + daos.writeInt(schema.variableTailKeys().size()); + for (ColumnHashType variableTailKey : schema.variableTailKeys()) { + daos.writeByte(variableTailKey.ordinal()); + } + daos.writeBoolean(schema.hasValue()); + baos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** * The column must be registered once!!! * Do not try to register a column that may already be registered @@ -97,6 +178,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { this.columns.remove(id); throw new UnsupportedOperationException("Column already registered!"); } + logger.info("Registered column: " + column); return id; } catch (RocksDBException e) { throw new RuntimeException(e); @@ -132,6 +214,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { // Wait for 10 seconds try { ops.closeAndWait(MAX_TRANSACTION_DURATION_MS); + columnSchemasColumnDescriptorHandle.close(); db.close(); if (path == null) { Utils.deleteDirectory(db.getPath()); @@ -251,7 +334,10 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable { } } else { try { - var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8))); + byte[] key = name.getBytes(StandardCharsets.UTF_8); + var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(key)); + byte[] value = encodeColumnSchema(schema); + db.get().put(columnSchemasColumnDescriptorHandle, key, value); return registerColumn(new ColumnInstance(cf, schema)); } catch (RocksDBException e) { throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e); diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java index d36e7b8..4648a9d 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java @@ -576,7 +576,7 @@ public class RocksDBLoader { var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions); var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig); - return TransactionalDB.create(definitiveDbPath.toString(), db, dbTasks); + return TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks); } catch (IOException | RocksDBException ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex); } catch (GestaltException e) { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java index 193cf31..82ed537 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java @@ -1,10 +1,13 @@ package it.cavallium.rockserver.core.impl.rocksdb; -import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.BaseTransactionalDB; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionOptions; import org.rocksdb.RocksDB; @@ -16,10 +19,13 @@ import org.rocksdb.WriteOptions; public sealed interface TransactionalDB extends Closeable { - static TransactionalDB create(String path, RocksDB db, DatabaseTasks databaseTasks) { + static TransactionalDB create(String path, RocksDB db, + List descriptors, + ArrayList handles, + DatabaseTasks databaseTasks) { return switch (db) { - case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, databaseTasks); - case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, databaseTasks); + case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, descriptors, handles, databaseTasks); + case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, descriptors, handles, databaseTasks); default -> throw new UnsupportedOperationException("This database is not transactional"); }; } @@ -29,6 +35,7 @@ public sealed interface TransactionalDB extends Closeable { String getPath(); RocksDB get(); + Map getStartupColumns(); /** * Starts a new Transaction. *

@@ -97,10 +104,17 @@ public sealed interface TransactionalDB extends Closeable { private final String path; protected final RDB db; private final DatabaseTasks databaseTasks; + private final List descriptors; + private final ArrayList handles; - public BaseTransactionalDB(String path, RDB db, DatabaseTasks databaseTasks) { + public BaseTransactionalDB(String path, RDB db, + List descriptors, + ArrayList handles, + DatabaseTasks databaseTasks) { this.path = path; this.db = db; + this.descriptors = descriptors; + this.handles = handles; this.databaseTasks = databaseTasks; databaseTasks.start(); @@ -116,6 +130,16 @@ public sealed interface TransactionalDB extends Closeable { return db; } + @Override + public Map getStartupColumns() { + var cols = new HashMap(); + assert this.descriptors.size() == this.handles.size(); + for (int i = 0; i < descriptors.size(); i++) { + cols.put(this.descriptors.get(i), this.handles.get(i)); + } + return cols; + } + @Override public void close() throws IOException { List exceptions = new ArrayList<>(); @@ -124,6 +148,13 @@ public sealed interface TransactionalDB extends Closeable { } catch (Exception ex) { exceptions.add(ex); } + for (ColumnFamilyHandle handle : handles) { + try { + handle.close(); + } catch (Exception ex) { + exceptions.add(ex); + } + } try { db.closeE(); } catch (RocksDBException e) { @@ -144,8 +175,11 @@ public sealed interface TransactionalDB extends Closeable { final class PessimisticTransactionalDB extends BaseTransactionalDB { - public PessimisticTransactionalDB(String path, TransactionDB db, DatabaseTasks databaseTasks) { - super(path, db, databaseTasks); + public PessimisticTransactionalDB(String path, TransactionDB db, + List descriptors, + ArrayList handles, + DatabaseTasks databaseTasks) { + super(path, db, descriptors, handles, databaseTasks); } @Override @@ -192,8 +226,11 @@ public sealed interface TransactionalDB extends Closeable { final class OptimisticTransactionalDB extends BaseTransactionalDB { - public OptimisticTransactionalDB(String path, OptimisticTransactionDB db, DatabaseTasks databaseTasks) { - super(path, db, databaseTasks); + public OptimisticTransactionalDB(String path, OptimisticTransactionDB db, + List descriptors, + ArrayList handles, + DatabaseTasks databaseTasks) { + super(path, db, descriptors, handles, databaseTasks); } @Override