Compare commits

...

14 Commits

Author SHA1 Message Date
Andrea Cavalli ecdb0b05b8 migrate 2024-03-30 23:14:05 +01:00
Andrea Cavalli 7a8e3d6158 Async migrate 2024-03-30 23:14:05 +01:00
Andrea Cavalli 9729704bb9 Restore columnId without getting it first 2024-03-30 23:14:05 +01:00
Andrea Cavalli bb9e9ad33c Migration 2024-03-30 23:14:05 +01:00
Andrea Cavalli 0c98f6ef29 Flush before exit 2024-03-30 03:40:02 +01:00
Andrea Cavalli ea6808948b Restore registered columns 2024-03-30 03:25:21 +01:00
Andrea Cavalli 59b8f2ddac Close db 2024-03-30 02:53:11 +01:00
Andrea Cavalli 82b2848222 Fix rocksexception 2024-03-30 02:51:07 +01:00
Andrea Cavalli e038effde9 Close client 2024-03-30 02:46:37 +01:00
Andrea Cavalli 9e66b17e58 Bypass error 2024-03-30 02:44:10 +01:00
Andrea Cavalli c57a3d2a45 Add RocksDBException 2024-03-30 02:35:35 +01:00
Andrea Cavalli e89728e0fd Add RocksDBException 2024-03-30 02:34:17 +01:00
Andrea Cavalli 78a30a71f4 Add RocksDBException 2024-03-30 02:29:36 +01:00
Andrea Cavalli e7b5718fdc Fix reflection errors 2024-03-30 02:20:42 +01:00
9 changed files with 315 additions and 15 deletions

View File

@ -34,6 +34,11 @@
</repositories>
<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>bson</artifactId>
<version>5.0.0</version>
</dependency>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
@ -236,7 +241,7 @@
<buildArg>-H:IncludeResources=it/cavallium/rockserver/core/resources/build.properties</buildArg>
<buildArg>-H:DynamicProxyConfigurationFiles=proxy-config.json</buildArg>
<buildArg>-H:ReflectionConfigurationFiles=reflect-config.json</buildArg>
<buildArg>--initialize-at-run-time=org.rocksdb.RocksDB,org.rocksdb.RocksObject</buildArg>
<buildArg>--initialize-at-run-time=org.rocksdb.RocksDB,org.rocksdb.RocksObject,org.rocksdb.RocksDBException</buildArg>
<!--<buildArg>_-no-fallback</buildArg>-->
<buildArg>--gc=G1</buildArg>
<buildArg>--link-at-build-time</buildArg>

View File

@ -47,5 +47,17 @@
{ "name" : "<init>", "parameterTypes" : ["java.lang.String"] },
{ "name" : "of", "parameterTypes" : ["java.lang.String"] }
]
},
{
"name" : "org.rocksdb.RocksDBException",
"methods": [
{ "name" : "<init>", "parameterTypes" : ["java.lang.String"] },
{ "name" : "<init>", "parameterTypes" : ["java.lang.String", "org.rocksdb.Status"] },
{ "name" : "<init>", "parameterTypes" : ["org.rocksdb.Status"] }
],
"allPublicConstructors": true,
"allPublicClasses": true,
"queryAllDeclaredConstructors": true,
"queryAllPublicConstructors": true
}
]

View File

@ -0,0 +1,147 @@
package it.cavallium.rockserver.core;
import it.cavallium.rockserver.core.common.api.ColumnSchema;
import it.cavallium.rockserver.core.common.api.RocksDB;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.bson.BSONObject;
import org.bson.BasicBSONDecoder;
import org.bson.BasicBSONEncoder;
import org.bson.BasicBSONObject;
import org.bson.Document;
public class Migrate {
public static void main(String[] args) throws TException, IOException, InterruptedException {
// Tunables
var columnName = args[0];
var password = args[1];
//
System.out.println("Column: " + columnName);
var temp = Files.createTempFile("temp-out-" + columnName + "-", ".json");
boolean peerMode = columnName.startsWith("peers_");
var columnSchema = peerMode ? new ColumnSchema(List.of(Long.BYTES), List.of(), true) : new ColumnSchema(List.of(Byte.BYTES), List.of(), true);
var result = Runtime
.getRuntime()
.exec(new String[]{"psql", "--host", "home.cavallium.it", "-d", "sessions", "-U", "postgres", "-c",
"SELECT json_agg(t) FROM (SELECT * FROM \"" + columnName + "\") t", "-qAtX",
"--output=" + temp}, new String[] {"PGPASSWORD=" + password});
result.waitFor();
var jo = Document.parse("{\"data\": " + Files.readString(temp) + "}");
Files.delete(temp);
List<Document> documents = (List<Document>) jo.get("data");
jo = null;
System.gc();
System.out.println("Read json file");
var it = documents.iterator();
List<Map.Entry<Object, BasicBSONObject>> documentMap = new ArrayList<>();
while (it.hasNext()) {
var document = it.next();
var obj = new BasicBSONObject();
if (peerMode) {
var id = ((Number) document.get("id")).longValue();
var accessHash = ((Number) document.get("access_hash")).longValue();
var peerType = ((String) document.get("type"));
var username = ((String) document.get("username"));
var phoneNumber = ((String) document.get("phone_number"));
var lastUpdateOn = ((Number) document.get("last_update_on")).longValue();
obj.put("access_hash", accessHash);
obj.put("peer_type", peerType);
obj.put("username", username);
obj.put("phone_number", phoneNumber);
obj.put("last_update_on", lastUpdateOn);
if (!peerType.equals("user")) {
documentMap.add(Map.entry(id, obj));
}
} else {
byte id = 0;
Long dcId = ((Number) document.get("dc_id")).longValue();
Long apiId = document.get("api_id") != null ? ((Number) document.get("api_id")).longValue() : null;
var testMode = ((Boolean) document.get("test_mode"));
var authKey = HexFormat.of().parseHex(((String) document.get("auth_key")).substring(2));
var date = ((Number) document.get("date")).longValue();
var userId = ((Number) document.get("user_id")).longValue();
var isBot = ((Boolean) document.get("is_bot"));
var phone = ((String) document.get("phone"));
obj.put("dc_id", dcId);
obj.put("api_id", apiId);
obj.put("test_mode", testMode);
obj.put("auth_key", authKey);
obj.put("date", date);
obj.put("user_id", userId);
obj.put("is_bot", isBot);
obj.put("phone", phone);
documentMap.add(Map.entry(id, obj));
}
}
documents = null;
System.gc();
System.out.println("parsed documents");
var protocol = new TBinaryProtocol.Factory();
var clients = ThreadLocal.withInitial(() -> {
try {
var socket = new TSocket("10.0.0.9", 5332);
var transport = new TFramedTransport(socket);
transport.open();
return new RocksDB.Client(new TBinaryProtocol(transport));
} catch (TTransportException e) {
throw new RuntimeException(e);
}
});
var columnId = clients.get().createColumn(columnName, columnSchema);
var encoder = ThreadLocal.withInitial(BasicBSONEncoder::new);
var keyBBLocal = ThreadLocal.withInitial(() -> peerMode ? ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN) : ByteBuffer.allocate(1).order(ByteOrder.BIG_ENDIAN));
AtomicLong next = new AtomicLong();
long total = documentMap.size();
var initTime = Instant.now();
documentMap.stream().parallel().forEach(longDocumentEntry -> {
ByteBuffer bb = keyBBLocal.get();
if (peerMode) {
bb.asLongBuffer().put((Long) longDocumentEntry.getKey());
} else {
bb.put((Byte) longDocumentEntry.getKey()).flip();
}
var valueArray = encoder.get().encode(longDocumentEntry.getValue());
var valueBuf = ByteBuffer.wrap(valueArray);
try {
clients.get().putFast(0, columnId, List.of(bb), valueBuf);
} catch (TException e) {
throw new RuntimeException(e);
}
var nn = next.incrementAndGet();
if (nn > 0 && nn % 10_000 == 0) {
var endTime = Instant.now();
var dur = Duration.between(initTime, endTime);
System.out.printf("Written %d/%d elements... %.2f, speed: %.2fHz%n", nn, total, ((nn * 100d) / total), nn / (dur.toMillis() / 1000d));
}
});
if (!peerMode) {
System.out.println("Schema: " + new BasicBSONDecoder().readObject(clients.get().get(0, columnId, List.of(ByteBuffer.wrap(new byte[] {0}))).getValue()));
}
var endTime = Instant.now();
var dur = Duration.between(initTime, endTime);
if (total > 0) System.out.printf("Took %s, speed: %.2fHz%n", dur, total / (dur.toMillis() / 1000d));
}
}

View File

@ -31,7 +31,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private");
private final ExecutorService exeuctor;
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) {
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) throws IOException {
super(name);
this.db = new EmbeddedDB(path, name, embeddedConfig);
this.exeuctor = Executors.newVirtualThreadPerTaskExecutor();

View File

@ -5,6 +5,7 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
import it.cavallium.rockserver.core.common.ColumnHashType;
import it.cavallium.rockserver.core.common.RequestType;
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
@ -25,7 +26,13 @@ import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
import it.cavallium.rockserver.core.impl.rocksdb.Tx;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
@ -33,6 +40,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
@ -58,9 +66,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
public static final long MAX_TRANSACTION_DURATION_MS = 10_000L;
private static final boolean USE_FAST_GET = true;
private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8);
private final Logger logger;
private final @Nullable Path path;
private final TransactionalDB db;
private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle;
private final NonBlockingHashMapLong<ColumnInstance> columns;
private final ConcurrentMap<String, Long> columnNamesIndex;
private final NonBlockingHashMapLong<Tx> txs;
@ -68,7 +78,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
private final SafeShutdown ops;
private final Object columnEditLock = new Object();
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) {
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException {
this.path = path;
this.logger = Logger.getLogger("db." + name);
this.columns = new NonBlockingHashMapLong<>();
@ -78,11 +88,82 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.ops = new SafeShutdown();
DatabaseConfig config = ConfigParser.parse(embeddedConfigPath);
this.db = RocksDBLoader.load(path, config, logger);
var existingColumnSchemasColumnDescriptorOptional = db
.getStartupColumns()
.entrySet()
.stream()
.filter(e -> Arrays.equals(e.getKey().getName(), COLUMN_SCHEMAS_COLUMN))
.findAny();
if (existingColumnSchemasColumnDescriptorOptional.isEmpty()) {
var columnSchemasColumnDescriptor = new ColumnFamilyDescriptor(COLUMN_SCHEMAS_COLUMN);
try {
columnSchemasColumnDescriptorHandle = db.get().createColumnFamily(columnSchemasColumnDescriptor);
} catch (RocksDBException e) {
throw new IOException("Cannot create system column", e);
}
} else {
this.columnSchemasColumnDescriptorHandle = existingColumnSchemasColumnDescriptorOptional.get().getValue();
}
try (var it = this.db.get().newIterator(columnSchemasColumnDescriptorHandle)) {
it.seekToFirst();
while (it.isValid()) {
var key = it.key();
ColumnSchema value = decodeColumnSchema(it.value());
this.db
.getStartupColumns()
.entrySet()
.stream()
.filter(entry -> Arrays.equals(entry.getKey().getName(), key))
.findAny()
.ifPresent(entry -> registerColumn(new ColumnInstance(entry.getValue(), value)));
it.next();
}
}
if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) {
logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringify(config));
}
}
private ColumnSchema decodeColumnSchema(byte[] value) {
try (var is = new ByteArrayInputStream(value); var dis = new DataInputStream(is)) {
var check = dis.readByte();
assert check == 2;
var size = dis.readInt();
var keys = new IntArrayList(size);
for (int i = 0; i < size; i++) {
keys.add(dis.readInt());
}
size = dis.readInt();
var colHashTypes = new ObjectArrayList<ColumnHashType>(size);
for (int i = 0; i < size; i++) {
colHashTypes.add(ColumnHashType.values()[dis.readUnsignedByte()]);
}
var hasValue = dis.readBoolean();
return new ColumnSchema(keys, colHashTypes, hasValue);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private byte[] encodeColumnSchema(ColumnSchema schema) {
try (var baos = new ByteArrayOutputStream(); var daos = new DataOutputStream(baos)) {
daos.writeByte(2);
daos.writeInt(schema.keys().size());
for (int key : schema.keys()) {
daos.writeInt(key);
}
daos.writeInt(schema.variableTailKeys().size());
for (ColumnHashType variableTailKey : schema.variableTailKeys()) {
daos.writeByte(variableTailKey.ordinal());
}
daos.writeBoolean(schema.hasValue());
baos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* The column must be registered once!!!
* Do not try to register a column that may already be registered
@ -97,6 +178,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
this.columns.remove(id);
throw new UnsupportedOperationException("Column already registered!");
}
logger.info("Registered column: " + column);
return id;
} catch (RocksDBException e) {
throw new RuntimeException(e);
@ -132,6 +214,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
// Wait for 10 seconds
try {
ops.closeAndWait(MAX_TRANSACTION_DURATION_MS);
columnSchemasColumnDescriptorHandle.close();
db.close();
if (path == null) {
Utils.deleteDirectory(db.getPath());
}
@ -250,7 +334,10 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
}
} else {
try {
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8)));
byte[] key = name.getBytes(StandardCharsets.UTF_8);
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(key));
byte[] value = encodeColumnSchema(schema);
db.get().put(columnSchemasColumnDescriptorHandle, key, value);
return registerColumn(new ColumnInstance(cf, schema));
} catch (RocksDBException e) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e);

View File

@ -576,7 +576,7 @@ public class RocksDBLoader {
var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions);
var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig);
return TransactionalDB.create(definitiveDbPath.toString(), db, dbTasks);
return TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks);
} catch (IOException | RocksDBException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
} catch (GestaltException e) {

View File

@ -1,10 +1,14 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.BaseTransactionalDB;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.FlushOptions;
import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.OptimisticTransactionOptions;
import org.rocksdb.RocksDB;
@ -16,10 +20,13 @@ import org.rocksdb.WriteOptions;
public sealed interface TransactionalDB extends Closeable {
static TransactionalDB create(String path, RocksDB db, DatabaseTasks databaseTasks) {
static TransactionalDB create(String path, RocksDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
return switch (db) {
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, databaseTasks);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, databaseTasks);
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, descriptors, handles, databaseTasks);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, descriptors, handles, databaseTasks);
default -> throw new UnsupportedOperationException("This database is not transactional");
};
}
@ -29,6 +36,7 @@ public sealed interface TransactionalDB extends Closeable {
String getPath();
RocksDB get();
Map<ColumnFamilyDescriptor, ColumnFamilyHandle> getStartupColumns();
/**
* Starts a new Transaction.
* <p>
@ -97,10 +105,17 @@ public sealed interface TransactionalDB extends Closeable {
private final String path;
protected final RDB db;
private final DatabaseTasks databaseTasks;
private final List<ColumnFamilyDescriptor> descriptors;
private final ArrayList<ColumnFamilyHandle> handles;
public BaseTransactionalDB(String path, RDB db, DatabaseTasks databaseTasks) {
public BaseTransactionalDB(String path, RDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
this.path = path;
this.db = db;
this.descriptors = descriptors;
this.handles = handles;
this.databaseTasks = databaseTasks;
databaseTasks.start();
@ -116,6 +131,16 @@ public sealed interface TransactionalDB extends Closeable {
return db;
}
@Override
public Map<ColumnFamilyDescriptor, ColumnFamilyHandle> getStartupColumns() {
var cols = new HashMap<ColumnFamilyDescriptor, ColumnFamilyHandle>();
assert this.descriptors.size() == this.handles.size();
for (int i = 0; i < descriptors.size(); i++) {
cols.put(this.descriptors.get(i), this.handles.get(i));
}
return cols;
}
@Override
public void close() throws IOException {
List<Exception> exceptions = new ArrayList<>();
@ -124,6 +149,23 @@ public sealed interface TransactionalDB extends Closeable {
} catch (Exception ex) {
exceptions.add(ex);
}
for (ColumnFamilyHandle handle : handles) {
try {
handle.close();
} catch (Exception ex) {
exceptions.add(ex);
}
}
try {
db.flushWal(true);
} catch (RocksDBException e) {
exceptions.add(e);
}
try (var options = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
db.flush(options);
} catch (RocksDBException e) {
exceptions.add(e);
}
try {
db.closeE();
} catch (RocksDBException e) {
@ -144,8 +186,11 @@ public sealed interface TransactionalDB extends Closeable {
final class PessimisticTransactionalDB extends BaseTransactionalDB<TransactionDB> {
public PessimisticTransactionalDB(String path, TransactionDB db, DatabaseTasks databaseTasks) {
super(path, db, databaseTasks);
public PessimisticTransactionalDB(String path, TransactionDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
super(path, db, descriptors, handles, databaseTasks);
}
@Override
@ -192,8 +237,11 @@ public sealed interface TransactionalDB extends Closeable {
final class OptimisticTransactionalDB extends BaseTransactionalDB<OptimisticTransactionDB> {
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db, DatabaseTasks databaseTasks) {
super(path, db, databaseTasks);
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db,
List<ColumnFamilyDescriptor> descriptors,
ArrayList<ColumnFamilyHandle> handles,
DatabaseTasks databaseTasks) {
super(path, db, descriptors, handles, databaseTasks);
}
@Override

View File

@ -14,6 +14,6 @@ public class Server implements Closeable {
@Override
public void close() throws IOException {
client.close();
}
}

View File

@ -9,6 +9,7 @@ module rockserver.core {
requires it.unimi.dsi.fastutil;
requires org.apache.thrift;
requires org.slf4j;
requires org.mongodb.bson;
exports it.cavallium.rockserver.core.client;
exports it.cavallium.rockserver.core.common;