Restore registered columns

This commit is contained in:
Andrea Cavalli 2024-03-30 03:25:21 +01:00
parent 59b8f2ddac
commit ea6808948b
4 changed files with 136 additions and 13 deletions

View File

@ -31,7 +31,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private");
private final ExecutorService exeuctor;
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) {
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) throws IOException {
super(name);
this.db = new EmbeddedDB(path, name, embeddedConfig);
this.exeuctor = Executors.newVirtualThreadPerTaskExecutor();

View File

@ -5,6 +5,7 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
@ -25,7 +26,13 @@ import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
import it.cavallium.rockserver.core.impl.rocksdb.Tx;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
@ -33,6 +40,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
@ -58,9 +66,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
public static final long MAX_TRANSACTION_DURATION_MS = 10_000L;
private static final boolean USE_FAST_GET = true;
private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8);
private final Logger logger;
private final @Nullable Path path;
private final TransactionalDB db;
private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle;
private final NonBlockingHashMapLong<ColumnInstance> columns;
private final ConcurrentMap<String, Long> columnNamesIndex;
private final NonBlockingHashMapLong<Tx> txs;
@ -68,7 +78,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
private final SafeShutdown ops;
private final Object columnEditLock = new Object();
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) {
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException {
this.path = path;
this.logger = Logger.getLogger("db." + name);
this.columns = new NonBlockingHashMapLong<>();
@ -78,11 +88,82 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.ops = new SafeShutdown();
DatabaseConfig config = ConfigParser.parse(embeddedConfigPath);
this.db = RocksDBLoader.load(path, config, logger);
var existingColumnSchemasColumnDescriptorOptional = db
.getStartupColumns()
.entrySet()
.stream()
.filter(e -> Arrays.equals(e.getKey().getName(), COLUMN_SCHEMAS_COLUMN))
.findAny();
if (existingColumnSchemasColumnDescriptorOptional.isEmpty()) {
var columnSchemasColumnDescriptor = new ColumnFamilyDescriptor(COLUMN_SCHEMAS_COLUMN);
try {
columnSchemasColumnDescriptorHandle = db.get().createColumnFamily(columnSchemasColumnDescriptor);
} catch (RocksDBException e) {
throw new IOException("Cannot create system column", e);
}
} else {
this.columnSchemasColumnDescriptorHandle = existingColumnSchemasColumnDescriptorOptional.get().getValue();
}
try (var it = this.db.get().newIterator(columnSchemasColumnDescriptorHandle)) {
it.seekToFirst();
while (it.isValid()) {
var key = it.key();
ColumnSchema value = decodeColumnSchema(it.value());
this.db
.getStartupColumns()
.entrySet()
.stream()
.filter(entry -> Arrays.equals(entry.getKey().getName(), key))
.findAny()
.ifPresent(entry -> registerColumn(new ColumnInstance(entry.getValue(), value)));
it.next();
}
}
if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) {
logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringify(config));
}
}
private ColumnSchema decodeColumnSchema(byte[] value) {
try (var is = new ByteArrayInputStream(value); var dis = new DataInputStream(is)) {
var check = dis.readByte();
assert check == 2;
var size = dis.readInt();
var keys = new IntArrayList(size);
for (int i = 0; i < size; i++) {
keys.add(dis.readInt());
}
size = dis.readInt();
var colHashTypes = new ObjectArrayList<ColumnHashType>(size);
for (int i = 0; i < size; i++) {
colHashTypes.add(ColumnHashType.values()[dis.readUnsignedByte()]);
}
var hasValue = dis.readBoolean();
return new ColumnSchema(keys, colHashTypes, hasValue);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private byte[] encodeColumnSchema(ColumnSchema schema) {
try (var baos = new ByteArrayOutputStream(); var daos = new DataOutputStream(baos)) {
daos.writeByte(2);
daos.writeInt(schema.keys().size());
for (int key : schema.keys()) {
daos.writeInt(key);
}
daos.writeInt(schema.variableTailKeys().size());
for (ColumnHashType variableTailKey : schema.variableTailKeys()) {
daos.writeByte(variableTailKey.ordinal());
}
daos.writeBoolean(schema.hasValue());
baos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* The column must be registered once!!!
* Do not try to register a column that may already be registered
@ -97,6 +178,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.columns.remove(id);
throw new UnsupportedOperationException("Column already registered!");
}
logger.info("Registered column: " + column);
return id;
} catch (RocksDBException e) {
throw new RuntimeException(e);
@ -132,6 +214,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
// Wait for 10 seconds
try {
ops.closeAndWait(MAX_TRANSACTION_DURATION_MS);
columnSchemasColumnDescriptorHandle.close();
db.close();
if (path == null) {
Utils.deleteDirectory(db.getPath());
@ -251,7 +334,10 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
} else {
try {
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8)));
byte[] key = name.getBytes(StandardCharsets.UTF_8);
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(key));
byte[] value = encodeColumnSchema(schema);
db.get().put(columnSchemasColumnDescriptorHandle, key, value);
return registerColumn(new ColumnInstance(cf, schema));
} catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e);

View File

@ -576,7 +576,7 @@ public class RocksDBLoader {
var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions);
var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig);
return TransactionalDB.create(definitiveDbPath.toString(), db, dbTasks);
return TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks);
} catch (IOException | RocksDBException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
} catch (GestaltException e) {

View File

@ -1,10 +1,13 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.BaseTransactionalDB;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.OptimisticTransactionOptions;
import org.rocksdb.RocksDB;
@ -16,10 +19,13 @@ import org.rocksdb.WriteOptions;
public sealed interface TransactionalDB extends Closeable {
static TransactionalDB create(String path, RocksDB db, DatabaseTasks databaseTasks) {
static TransactionalDB create(String path, RocksDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
return switch (db) {
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, databaseTasks);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, databaseTasks);
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, descriptors, handles, databaseTasks);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, descriptors, handles, databaseTasks);
default -> throw new UnsupportedOperationException("This database is not transactional");
};
}
@ -29,6 +35,7 @@ public sealed interface TransactionalDB extends Closeable {
String getPath();
RocksDB get();
Map<ColumnFamilyDescriptor, ColumnFamilyHandle> getStartupColumns();
/**
* Starts a new Transaction.
* <p>
@ -97,10 +104,17 @@ public sealed interface TransactionalDB extends Closeable {
private final String path;
protected final RDB db;
private final DatabaseTasks databaseTasks;
private final List<ColumnFamilyDescriptor> descriptors;
private final ArrayList<ColumnFamilyHandle> handles;
public BaseTransactionalDB(String path, RDB db, DatabaseTasks databaseTasks) {
public BaseTransactionalDB(String path, RDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
this.path = path;
this.db = db;
this.descriptors = descriptors;
this.handles = handles;
this.databaseTasks = databaseTasks;
databaseTasks.start();
@ -116,6 +130,16 @@ public sealed interface TransactionalDB extends Closeable {
return db;
}
@Override
public Map<ColumnFamilyDescriptor, ColumnFamilyHandle> getStartupColumns() {
var cols = new HashMap<ColumnFamilyDescriptor, ColumnFamilyHandle>();
assert this.descriptors.size() == this.handles.size();
for (int i = 0; i < descriptors.size(); i++) {
cols.put(this.descriptors.get(i), this.handles.get(i));
}
return cols;
}
@Override
public void close() throws IOException {
List<Exception> exceptions = new ArrayList<>();
@ -124,6 +148,13 @@ public sealed interface TransactionalDB extends Closeable {
} catch (Exception ex) {
exceptions.add(ex);
}
for (ColumnFamilyHandle handle : handles) {
try {
handle.close();
} catch (Exception ex) {
exceptions.add(ex);
}
}
try {
db.closeE();
} catch (RocksDBException e) {
@ -144,8 +175,11 @@ public sealed interface TransactionalDB extends Closeable {
final class PessimisticTransactionalDB extends BaseTransactionalDB<TransactionDB> {
public PessimisticTransactionalDB(String path, TransactionDB db, DatabaseTasks databaseTasks) {
super(path, db, databaseTasks);
public PessimisticTransactionalDB(String path, TransactionDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
super(path, db, descriptors, handles, databaseTasks);
}
@Override
@ -192,8 +226,11 @@ public sealed interface TransactionalDB extends Closeable {
final class OptimisticTransactionalDB extends BaseTransactionalDB<OptimisticTransactionDB> {
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db, DatabaseTasks databaseTasks) {
super(path, db, databaseTasks);
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
super(path, db, descriptors, handles, databaseTasks);
}
@Override