Compare commits
14 Commits
1fa2c40c3f
...
ecdb0b05b8
Author | SHA1 | Date | |
---|---|---|---|
|
ecdb0b05b8 | ||
|
7a8e3d6158 | ||
|
9729704bb9 | ||
|
bb9e9ad33c | ||
|
0c98f6ef29 | ||
|
ea6808948b | ||
|
59b8f2ddac | ||
|
82b2848222 | ||
|
e038effde9 | ||
|
9e66b17e58 | ||
|
c57a3d2a45 | ||
|
e89728e0fd | ||
|
78a30a71f4 | ||
|
e7b5718fdc |
7
pom.xml
7
pom.xml
@ -34,6 +34,11 @@
|
||||
</repositories>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.mongodb</groupId>
|
||||
<artifactId>bson</artifactId>
|
||||
<version>5.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.rocksdb</groupId>
|
||||
<artifactId>rocksdbjni</artifactId>
|
||||
@ -236,7 +241,7 @@
|
||||
<buildArg>-H:IncludeResources=it/cavallium/rockserver/core/resources/build.properties</buildArg>
|
||||
<buildArg>-H:DynamicProxyConfigurationFiles=proxy-config.json</buildArg>
|
||||
<buildArg>-H:ReflectionConfigurationFiles=reflect-config.json</buildArg>
|
||||
<buildArg>--initialize-at-run-time=org.rocksdb.RocksDB,org.rocksdb.RocksObject</buildArg>
|
||||
<buildArg>--initialize-at-run-time=org.rocksdb.RocksDB,org.rocksdb.RocksObject,org.rocksdb.RocksDBException</buildArg>
|
||||
<!--<buildArg>_-no-fallback</buildArg>-->
|
||||
<buildArg>--gc=G1</buildArg>
|
||||
<buildArg>--link-at-build-time</buildArg>
|
||||
|
@ -47,5 +47,17 @@
|
||||
{ "name" : "<init>", "parameterTypes" : ["java.lang.String"] },
|
||||
{ "name" : "of", "parameterTypes" : ["java.lang.String"] }
|
||||
]
|
||||
},
|
||||
{
|
||||
"name" : "org.rocksdb.RocksDBException",
|
||||
"methods": [
|
||||
{ "name" : "<init>", "parameterTypes" : ["java.lang.String"] },
|
||||
{ "name" : "<init>", "parameterTypes" : ["java.lang.String", "org.rocksdb.Status"] },
|
||||
{ "name" : "<init>", "parameterTypes" : ["org.rocksdb.Status"] }
|
||||
],
|
||||
"allPublicConstructors": true,
|
||||
"allPublicClasses": true,
|
||||
"queryAllDeclaredConstructors": true,
|
||||
"queryAllPublicConstructors": true
|
||||
}
|
||||
]
|
147
src/main/java/it/cavallium/rockserver/core/Migrate.java
Normal file
147
src/main/java/it/cavallium/rockserver/core/Migrate.java
Normal file
@ -0,0 +1,147 @@
|
||||
package it.cavallium.rockserver.core;
|
||||
|
||||
import it.cavallium.rockserver.core.common.api.ColumnSchema;
|
||||
import it.cavallium.rockserver.core.common.api.RocksDB;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HexFormat;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.apache.thrift.transport.TSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.thrift.transport.layered.TFramedTransport;
|
||||
import org.bson.BSONObject;
|
||||
import org.bson.BasicBSONDecoder;
|
||||
import org.bson.BasicBSONEncoder;
|
||||
import org.bson.BasicBSONObject;
|
||||
import org.bson.Document;
|
||||
|
||||
public class Migrate {
|
||||
|
||||
public static void main(String[] args) throws TException, IOException, InterruptedException {
|
||||
|
||||
// Tunables
|
||||
var columnName = args[0];
|
||||
var password = args[1];
|
||||
//
|
||||
|
||||
System.out.println("Column: " + columnName);
|
||||
var temp = Files.createTempFile("temp-out-" + columnName + "-", ".json");
|
||||
|
||||
boolean peerMode = columnName.startsWith("peers_");
|
||||
var columnSchema = peerMode ? new ColumnSchema(List.of(Long.BYTES), List.of(), true) : new ColumnSchema(List.of(Byte.BYTES), List.of(), true);
|
||||
var result = Runtime
|
||||
.getRuntime()
|
||||
.exec(new String[]{"psql", "--host", "home.cavallium.it", "-d", "sessions", "-U", "postgres", "-c",
|
||||
"SELECT json_agg(t) FROM (SELECT * FROM \"" + columnName + "\") t", "-qAtX",
|
||||
"--output=" + temp}, new String[] {"PGPASSWORD=" + password});
|
||||
result.waitFor();
|
||||
|
||||
var jo = Document.parse("{\"data\": " + Files.readString(temp) + "}");
|
||||
Files.delete(temp);
|
||||
|
||||
List<Document> documents = (List<Document>) jo.get("data");
|
||||
jo = null;
|
||||
System.gc();
|
||||
System.out.println("Read json file");
|
||||
var it = documents.iterator();
|
||||
List<Map.Entry<Object, BasicBSONObject>> documentMap = new ArrayList<>();
|
||||
while (it.hasNext()) {
|
||||
var document = it.next();
|
||||
var obj = new BasicBSONObject();
|
||||
if (peerMode) {
|
||||
var id = ((Number) document.get("id")).longValue();
|
||||
var accessHash = ((Number) document.get("access_hash")).longValue();
|
||||
var peerType = ((String) document.get("type"));
|
||||
var username = ((String) document.get("username"));
|
||||
var phoneNumber = ((String) document.get("phone_number"));
|
||||
var lastUpdateOn = ((Number) document.get("last_update_on")).longValue();
|
||||
obj.put("access_hash", accessHash);
|
||||
obj.put("peer_type", peerType);
|
||||
obj.put("username", username);
|
||||
obj.put("phone_number", phoneNumber);
|
||||
obj.put("last_update_on", lastUpdateOn);
|
||||
if (!peerType.equals("user")) {
|
||||
documentMap.add(Map.entry(id, obj));
|
||||
}
|
||||
} else {
|
||||
byte id = 0;
|
||||
Long dcId = ((Number) document.get("dc_id")).longValue();
|
||||
Long apiId = document.get("api_id") != null ? ((Number) document.get("api_id")).longValue() : null;
|
||||
var testMode = ((Boolean) document.get("test_mode"));
|
||||
var authKey = HexFormat.of().parseHex(((String) document.get("auth_key")).substring(2));
|
||||
var date = ((Number) document.get("date")).longValue();
|
||||
var userId = ((Number) document.get("user_id")).longValue();
|
||||
var isBot = ((Boolean) document.get("is_bot"));
|
||||
var phone = ((String) document.get("phone"));
|
||||
|
||||
obj.put("dc_id", dcId);
|
||||
obj.put("api_id", apiId);
|
||||
obj.put("test_mode", testMode);
|
||||
obj.put("auth_key", authKey);
|
||||
obj.put("date", date);
|
||||
obj.put("user_id", userId);
|
||||
obj.put("is_bot", isBot);
|
||||
obj.put("phone", phone);
|
||||
documentMap.add(Map.entry(id, obj));
|
||||
}
|
||||
}
|
||||
documents = null;
|
||||
System.gc();
|
||||
System.out.println("parsed documents");
|
||||
var protocol = new TBinaryProtocol.Factory();
|
||||
var clients = ThreadLocal.withInitial(() -> {
|
||||
try {
|
||||
var socket = new TSocket("10.0.0.9", 5332);
|
||||
var transport = new TFramedTransport(socket);
|
||||
transport.open();
|
||||
|
||||
return new RocksDB.Client(new TBinaryProtocol(transport));
|
||||
} catch (TTransportException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
var columnId = clients.get().createColumn(columnName, columnSchema);
|
||||
var encoder = ThreadLocal.withInitial(BasicBSONEncoder::new);
|
||||
var keyBBLocal = ThreadLocal.withInitial(() -> peerMode ? ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN) : ByteBuffer.allocate(1).order(ByteOrder.BIG_ENDIAN));
|
||||
AtomicLong next = new AtomicLong();
|
||||
long total = documentMap.size();
|
||||
var initTime = Instant.now();
|
||||
documentMap.stream().parallel().forEach(longDocumentEntry -> {
|
||||
ByteBuffer bb = keyBBLocal.get();
|
||||
if (peerMode) {
|
||||
bb.asLongBuffer().put((Long) longDocumentEntry.getKey());
|
||||
} else {
|
||||
bb.put((Byte) longDocumentEntry.getKey()).flip();
|
||||
}
|
||||
var valueArray = encoder.get().encode(longDocumentEntry.getValue());
|
||||
var valueBuf = ByteBuffer.wrap(valueArray);
|
||||
try {
|
||||
clients.get().putFast(0, columnId, List.of(bb), valueBuf);
|
||||
} catch (TException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
var nn = next.incrementAndGet();
|
||||
if (nn > 0 && nn % 10_000 == 0) {
|
||||
var endTime = Instant.now();
|
||||
var dur = Duration.between(initTime, endTime);
|
||||
System.out.printf("Written %d/%d elements... %.2f, speed: %.2fHz%n", nn, total, ((nn * 100d) / total), nn / (dur.toMillis() / 1000d));
|
||||
}
|
||||
});
|
||||
if (!peerMode) {
|
||||
System.out.println("Schema: " + new BasicBSONDecoder().readObject(clients.get().get(0, columnId, List.of(ByteBuffer.wrap(new byte[] {0}))).getValue()));
|
||||
}
|
||||
var endTime = Instant.now();
|
||||
var dur = Duration.between(initTime, endTime);
|
||||
if (total > 0) System.out.printf("Took %s, speed: %.2fHz%n", dur, total / (dur.toMillis() / 1000d));
|
||||
}
|
||||
}
|
@ -31,7 +31,7 @@ public class EmbeddedConnection extends BaseConnection implements RocksDBAPI {
|
||||
public static final URI PRIVATE_MEMORY_URL = URI.create("memory://private");
|
||||
private final ExecutorService exeuctor;
|
||||
|
||||
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) {
|
||||
public EmbeddedConnection(@Nullable Path path, String name, @Nullable Path embeddedConfig) throws IOException {
|
||||
super(name);
|
||||
this.db = new EmbeddedDB(path, name, embeddedConfig);
|
||||
this.exeuctor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
@ -5,6 +5,7 @@ import static it.cavallium.rockserver.core.impl.ColumnInstance.BIG_ENDIAN_BYTES;
|
||||
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithValue;
|
||||
import static org.rocksdb.KeyMayExist.KeyMayExistEnum.kExistsWithoutValue;
|
||||
|
||||
import it.cavallium.rockserver.core.common.ColumnHashType;
|
||||
import it.cavallium.rockserver.core.common.RequestType;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestGet;
|
||||
import it.cavallium.rockserver.core.common.RequestType.RequestNothing;
|
||||
@ -25,7 +26,13 @@ import it.cavallium.rockserver.core.impl.rocksdb.RocksDBObjects;
|
||||
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB;
|
||||
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.TransactionalOptions;
|
||||
import it.cavallium.rockserver.core.impl.rocksdb.Tx;
|
||||
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
||||
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.Closeable;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
import java.lang.foreign.MemorySegment;
|
||||
@ -33,6 +40,7 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.Objects;
|
||||
@ -58,9 +66,11 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
private static final int INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES = 4096;
|
||||
public static final long MAX_TRANSACTION_DURATION_MS = 10_000L;
|
||||
private static final boolean USE_FAST_GET = true;
|
||||
private static final byte[] COLUMN_SCHEMAS_COLUMN = "_column_schemas_".getBytes(StandardCharsets.UTF_8);
|
||||
private final Logger logger;
|
||||
private final @Nullable Path path;
|
||||
private final TransactionalDB db;
|
||||
private final ColumnFamilyHandle columnSchemasColumnDescriptorHandle;
|
||||
private final NonBlockingHashMapLong<ColumnInstance> columns;
|
||||
private final ConcurrentMap<String, Long> columnNamesIndex;
|
||||
private final NonBlockingHashMapLong<Tx> txs;
|
||||
@ -68,7 +78,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
private final SafeShutdown ops;
|
||||
private final Object columnEditLock = new Object();
|
||||
|
||||
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) {
|
||||
public EmbeddedDB(@Nullable Path path, String name, @Nullable Path embeddedConfigPath) throws IOException {
|
||||
this.path = path;
|
||||
this.logger = Logger.getLogger("db." + name);
|
||||
this.columns = new NonBlockingHashMapLong<>();
|
||||
@ -78,11 +88,82 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
this.ops = new SafeShutdown();
|
||||
DatabaseConfig config = ConfigParser.parse(embeddedConfigPath);
|
||||
this.db = RocksDBLoader.load(path, config, logger);
|
||||
var existingColumnSchemasColumnDescriptorOptional = db
|
||||
.getStartupColumns()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(e -> Arrays.equals(e.getKey().getName(), COLUMN_SCHEMAS_COLUMN))
|
||||
.findAny();
|
||||
if (existingColumnSchemasColumnDescriptorOptional.isEmpty()) {
|
||||
var columnSchemasColumnDescriptor = new ColumnFamilyDescriptor(COLUMN_SCHEMAS_COLUMN);
|
||||
try {
|
||||
columnSchemasColumnDescriptorHandle = db.get().createColumnFamily(columnSchemasColumnDescriptor);
|
||||
} catch (RocksDBException e) {
|
||||
throw new IOException("Cannot create system column", e);
|
||||
}
|
||||
} else {
|
||||
this.columnSchemasColumnDescriptorHandle = existingColumnSchemasColumnDescriptorOptional.get().getValue();
|
||||
}
|
||||
try (var it = this.db.get().newIterator(columnSchemasColumnDescriptorHandle)) {
|
||||
it.seekToFirst();
|
||||
while (it.isValid()) {
|
||||
var key = it.key();
|
||||
ColumnSchema value = decodeColumnSchema(it.value());
|
||||
this.db
|
||||
.getStartupColumns()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> Arrays.equals(entry.getKey().getName(), key))
|
||||
.findAny()
|
||||
.ifPresent(entry -> registerColumn(new ColumnInstance(entry.getValue(), value)));
|
||||
it.next();
|
||||
}
|
||||
}
|
||||
if (Boolean.parseBoolean(System.getProperty("rockserver.core.print-config", "true"))) {
|
||||
logger.log(Level.INFO, "Database configuration: {0}", ConfigPrinter.stringify(config));
|
||||
}
|
||||
}
|
||||
|
||||
private ColumnSchema decodeColumnSchema(byte[] value) {
|
||||
try (var is = new ByteArrayInputStream(value); var dis = new DataInputStream(is)) {
|
||||
var check = dis.readByte();
|
||||
assert check == 2;
|
||||
var size = dis.readInt();
|
||||
var keys = new IntArrayList(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
keys.add(dis.readInt());
|
||||
}
|
||||
size = dis.readInt();
|
||||
var colHashTypes = new ObjectArrayList<ColumnHashType>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
colHashTypes.add(ColumnHashType.values()[dis.readUnsignedByte()]);
|
||||
}
|
||||
var hasValue = dis.readBoolean();
|
||||
return new ColumnSchema(keys, colHashTypes, hasValue);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] encodeColumnSchema(ColumnSchema schema) {
|
||||
try (var baos = new ByteArrayOutputStream(); var daos = new DataOutputStream(baos)) {
|
||||
daos.writeByte(2);
|
||||
daos.writeInt(schema.keys().size());
|
||||
for (int key : schema.keys()) {
|
||||
daos.writeInt(key);
|
||||
}
|
||||
daos.writeInt(schema.variableTailKeys().size());
|
||||
for (ColumnHashType variableTailKey : schema.variableTailKeys()) {
|
||||
daos.writeByte(variableTailKey.ordinal());
|
||||
}
|
||||
daos.writeBoolean(schema.hasValue());
|
||||
baos.close();
|
||||
return baos.toByteArray();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The column must be registered once!!!
|
||||
* Do not try to register a column that may already be registered
|
||||
@ -97,6 +178,7 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
this.columns.remove(id);
|
||||
throw new UnsupportedOperationException("Column already registered!");
|
||||
}
|
||||
logger.info("Registered column: " + column);
|
||||
return id;
|
||||
} catch (RocksDBException e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -132,6 +214,8 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
// Wait for 10 seconds
|
||||
try {
|
||||
ops.closeAndWait(MAX_TRANSACTION_DURATION_MS);
|
||||
columnSchemasColumnDescriptorHandle.close();
|
||||
db.close();
|
||||
if (path == null) {
|
||||
Utils.deleteDirectory(db.getPath());
|
||||
}
|
||||
@ -250,7 +334,10 @@ public class EmbeddedDB implements RocksDBSyncAPI, Closeable {
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(name.getBytes(StandardCharsets.UTF_8)));
|
||||
byte[] key = name.getBytes(StandardCharsets.UTF_8);
|
||||
var cf = db.get().createColumnFamily(new ColumnFamilyDescriptor(key));
|
||||
byte[] value = encodeColumnSchema(schema);
|
||||
db.get().put(columnSchemasColumnDescriptorHandle, key, value);
|
||||
return registerColumn(new ColumnInstance(cf, schema));
|
||||
} catch (RocksDBException e) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(RocksDBErrorType.COLUMN_CREATE_FAIL, e);
|
||||
|
@ -576,7 +576,7 @@ public class RocksDBLoader {
|
||||
|
||||
var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions);
|
||||
var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig);
|
||||
return TransactionalDB.create(definitiveDbPath.toString(), db, dbTasks);
|
||||
return TransactionalDB.create(definitiveDbPath.toString(), db, descriptors, handles, dbTasks);
|
||||
} catch (IOException | RocksDBException ex) {
|
||||
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
|
||||
} catch (GestaltException e) {
|
||||
|
@ -1,10 +1,14 @@
|
||||
package it.cavallium.rockserver.core.impl.rocksdb;
|
||||
|
||||
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.BaseTransactionalDB;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.rocksdb.ColumnFamilyDescriptor;
|
||||
import org.rocksdb.ColumnFamilyHandle;
|
||||
import org.rocksdb.FlushOptions;
|
||||
import org.rocksdb.OptimisticTransactionDB;
|
||||
import org.rocksdb.OptimisticTransactionOptions;
|
||||
import org.rocksdb.RocksDB;
|
||||
@ -16,10 +20,13 @@ import org.rocksdb.WriteOptions;
|
||||
|
||||
public sealed interface TransactionalDB extends Closeable {
|
||||
|
||||
static TransactionalDB create(String path, RocksDB db, DatabaseTasks databaseTasks) {
|
||||
static TransactionalDB create(String path, RocksDB db,
|
||||
List<ColumnFamilyDescriptor> descriptors,
|
||||
ArrayList<ColumnFamilyHandle> handles,
|
||||
DatabaseTasks databaseTasks) {
|
||||
return switch (db) {
|
||||
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, databaseTasks);
|
||||
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, databaseTasks);
|
||||
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, descriptors, handles, databaseTasks);
|
||||
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, descriptors, handles, databaseTasks);
|
||||
default -> throw new UnsupportedOperationException("This database is not transactional");
|
||||
};
|
||||
}
|
||||
@ -29,6 +36,7 @@ public sealed interface TransactionalDB extends Closeable {
|
||||
String getPath();
|
||||
|
||||
RocksDB get();
|
||||
Map<ColumnFamilyDescriptor, ColumnFamilyHandle> getStartupColumns();
|
||||
/**
|
||||
* Starts a new Transaction.
|
||||
* <p>
|
||||
@ -97,10 +105,17 @@ public sealed interface TransactionalDB extends Closeable {
|
||||
private final String path;
|
||||
protected final RDB db;
|
||||
private final DatabaseTasks databaseTasks;
|
||||
private final List<ColumnFamilyDescriptor> descriptors;
|
||||
private final ArrayList<ColumnFamilyHandle> handles;
|
||||
|
||||
public BaseTransactionalDB(String path, RDB db, DatabaseTasks databaseTasks) {
|
||||
public BaseTransactionalDB(String path, RDB db,
|
||||
List<ColumnFamilyDescriptor> descriptors,
|
||||
ArrayList<ColumnFamilyHandle> handles,
|
||||
DatabaseTasks databaseTasks) {
|
||||
this.path = path;
|
||||
this.db = db;
|
||||
this.descriptors = descriptors;
|
||||
this.handles = handles;
|
||||
this.databaseTasks = databaseTasks;
|
||||
|
||||
databaseTasks.start();
|
||||
@ -116,6 +131,16 @@ public sealed interface TransactionalDB extends Closeable {
|
||||
return db;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<ColumnFamilyDescriptor, ColumnFamilyHandle> getStartupColumns() {
|
||||
var cols = new HashMap<ColumnFamilyDescriptor, ColumnFamilyHandle>();
|
||||
assert this.descriptors.size() == this.handles.size();
|
||||
for (int i = 0; i < descriptors.size(); i++) {
|
||||
cols.put(this.descriptors.get(i), this.handles.get(i));
|
||||
}
|
||||
return cols;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
List<Exception> exceptions = new ArrayList<>();
|
||||
@ -124,6 +149,23 @@ public sealed interface TransactionalDB extends Closeable {
|
||||
} catch (Exception ex) {
|
||||
exceptions.add(ex);
|
||||
}
|
||||
for (ColumnFamilyHandle handle : handles) {
|
||||
try {
|
||||
handle.close();
|
||||
} catch (Exception ex) {
|
||||
exceptions.add(ex);
|
||||
}
|
||||
}
|
||||
try {
|
||||
db.flushWal(true);
|
||||
} catch (RocksDBException e) {
|
||||
exceptions.add(e);
|
||||
}
|
||||
try (var options = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
|
||||
db.flush(options);
|
||||
} catch (RocksDBException e) {
|
||||
exceptions.add(e);
|
||||
}
|
||||
try {
|
||||
db.closeE();
|
||||
} catch (RocksDBException e) {
|
||||
@ -144,8 +186,11 @@ public sealed interface TransactionalDB extends Closeable {
|
||||
|
||||
final class PessimisticTransactionalDB extends BaseTransactionalDB<TransactionDB> {
|
||||
|
||||
public PessimisticTransactionalDB(String path, TransactionDB db, DatabaseTasks databaseTasks) {
|
||||
super(path, db, databaseTasks);
|
||||
public PessimisticTransactionalDB(String path, TransactionDB db,
|
||||
List<ColumnFamilyDescriptor> descriptors,
|
||||
ArrayList<ColumnFamilyHandle> handles,
|
||||
DatabaseTasks databaseTasks) {
|
||||
super(path, db, descriptors, handles, databaseTasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -192,8 +237,11 @@ public sealed interface TransactionalDB extends Closeable {
|
||||
|
||||
final class OptimisticTransactionalDB extends BaseTransactionalDB<OptimisticTransactionDB> {
|
||||
|
||||
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db, DatabaseTasks databaseTasks) {
|
||||
super(path, db, databaseTasks);
|
||||
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db,
|
||||
List<ColumnFamilyDescriptor> descriptors,
|
||||
ArrayList<ColumnFamilyHandle> handles,
|
||||
DatabaseTasks databaseTasks) {
|
||||
super(path, db, descriptors, handles, databaseTasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -14,6 +14,6 @@ public class Server implements Closeable {
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
@ -9,6 +9,7 @@ module rockserver.core {
|
||||
requires it.unimi.dsi.fastutil;
|
||||
requires org.apache.thrift;
|
||||
requires org.slf4j;
|
||||
requires org.mongodb.bson;
|
||||
|
||||
exports it.cavallium.rockserver.core.client;
|
||||
exports it.cavallium.rockserver.core.common;
|
||||
|
Loading…
Reference in New Issue
Block a user