Flush wal every x seconds

This commit is contained in:
Andrea Cavalli 2024-03-29 21:25:26 +01:00
parent 5b87231c26
commit 2546a76b96
6 changed files with 131 additions and 56 deletions

View File

@ -5,5 +5,4 @@ import org.github.gestalt.config.exceptions.GestaltException;
public interface DatabaseConfig { public interface DatabaseConfig {
GlobalDatabaseConfig global() throws GestaltException; GlobalDatabaseConfig global() throws GestaltException;
} }

View File

@ -1,5 +1,6 @@
package it.cavallium.rockserver.core.config; package it.cavallium.rockserver.core.config;
import java.time.Duration;
import org.github.gestalt.config.exceptions.GestaltException; import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -32,6 +33,9 @@ public interface GlobalDatabaseConfig {
@Nullable @Nullable
Path walPath() throws GestaltException; Path walPath() throws GestaltException;
@Nullable
Duration delayWalFlushDuration() throws GestaltException;
boolean absoluteConsistency() throws GestaltException; boolean absoluteConsistency() throws GestaltException;
VolumeConfig[] volumes() throws GestaltException; VolumeConfig[] volumes() throws GestaltException;

View File

@ -0,0 +1,53 @@
package it.cavallium.rockserver.core.impl.rocksdb;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
public class DatabaseTasks implements Closeable {
private final RocksDB db;
private final boolean inMemory;
private final Logger logger;
private final Duration delayWalFlushConfig;
private Thread walFlushThread;
public DatabaseTasks(RocksDB db, boolean inMemory, Duration delayWalFlushConfig) {
this.db = db;
this.inMemory = inMemory;
this.logger = Logger.getLogger("db." + db.getName() + ".tasks");
this.delayWalFlushConfig = inMemory ? Duration.ZERO : delayWalFlushConfig;
}
public synchronized void start() {
if (delayWalFlushConfig.toMillis() > 0) {
this.walFlushThread = Thread.ofVirtual().name("db." + db.getName() + ".tasks.wal.flush").start(() -> {
logger.info("Database delayed flush thread is enabled, it will flush the database every %.2f seconds".formatted(delayWalFlushConfig.toMillis() / 1000d));
while (!Thread.interrupted()) {
try {
//noinspection BusyWait
Thread.sleep(delayWalFlushConfig.toMillis());
} catch (InterruptedException _) {
return;
}
try {
db.flushWal(true);
} catch (RocksDBException e) {
logger.log(Level.SEVERE, "Failed to flush database \"%s\" wal".formatted(db.getName()), e);
}
}
});
}
}
@Override
public synchronized void close() throws IOException {
if (walFlushThread != null) {
walFlushThread.interrupt();
}
}
}

View File

@ -2,9 +2,9 @@ package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType;
import it.cavallium.rockserver.core.config.*; import it.cavallium.rockserver.core.config.*;
import java.io.File;
import java.io.InputStream; import java.io.InputStream;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.time.Duration;
import org.github.gestalt.config.exceptions.GestaltException; import org.github.gestalt.config.exceptions.GestaltException;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
@ -145,9 +145,11 @@ public class RocksDBLoader {
options.setSkipStatsUpdateOnDbOpen(true); options.setSkipStatsUpdateOnDbOpen(true);
options.setCreateMissingColumnFamilies(true); options.setCreateMissingColumnFamilies(true);
options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL); options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL);
// todo: automatically flush every x seconds?
var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions);
if (delayWalFlushConfig.isPositive()) {
options.setManualWalFlush(true); options.setManualWalFlush(true);
}
options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown
options.setAvoidFlushDuringRecovery(true); // Flush all WALs during startup options.setAvoidFlushDuringRecovery(true); // Flush all WALs during startup
@ -257,6 +259,10 @@ public class RocksDBLoader {
} }
} }
private static Duration getWalFlushDelayConfig(DatabaseConfig databaseOptions) throws GestaltException {
return Objects.requireNonNullElse(databaseOptions.global().delayWalFlushDuration(), Duration.ZERO);
}
private static Optional<Path> getWalDir(Path definitiveDbPath, DatabaseConfig databaseOptions) private static Optional<Path> getWalDir(Path definitiveDbPath, DatabaseConfig databaseOptions)
throws GestaltException { throws GestaltException {
return Optional.ofNullable(databaseOptions.global().walPath()) return Optional.ofNullable(databaseOptions.global().walPath())
@ -287,6 +293,7 @@ public class RocksDBLoader {
private static TransactionalDB loadDb(@Nullable Path path, private static TransactionalDB loadDb(@Nullable Path path,
@NotNull Path definitiveDbPath, @NotNull Path definitiveDbPath,
DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) { DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) {
var inMemory = path == null;
var rocksdbOptions = optionsWithCache.options(); var rocksdbOptions = optionsWithCache.options();
try { try {
List<DbPathRecord> volumeConfigs = getVolumeConfigs(definitiveDbPath, databaseOptions); List<DbPathRecord> volumeConfigs = getVolumeConfigs(definitiveDbPath, databaseOptions);
@ -566,7 +573,10 @@ public class RocksDBLoader {
} catch (RocksDBException ex) { } catch (RocksDBException ex) {
logger.log(Level.FINE, "Failed to obtain stats", ex); logger.log(Level.FINE, "Failed to obtain stats", ex);
} }
return TransactionalDB.create(definitiveDbPath.toString(), db);
var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions);
var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig);
return TransactionalDB.create(definitiveDbPath.toString(), db, dbTasks);
} catch (IOException | RocksDBException ex) { } catch (IOException | RocksDBException ex) {
throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex); throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex);
} catch (GestaltException e) { } catch (GestaltException e) {

View File

@ -1,7 +1,10 @@
package it.cavallium.rockserver.core.impl.rocksdb; package it.cavallium.rockserver.core.impl.rocksdb;
import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.BaseTransactionalDB;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.OptimisticTransactionOptions; import org.rocksdb.OptimisticTransactionOptions;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
@ -13,10 +16,10 @@ import org.rocksdb.WriteOptions;
public sealed interface TransactionalDB extends Closeable { public sealed interface TransactionalDB extends Closeable {
static TransactionalDB create(String path, RocksDB db) { static TransactionalDB create(String path, RocksDB db, DatabaseTasks databaseTasks) {
return switch (db) { return switch (db) {
case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB); case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, databaseTasks);
case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB); case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, databaseTasks);
default -> throw new UnsupportedOperationException("This database is not transactional"); default -> throw new UnsupportedOperationException("This database is not transactional");
}; };
} }
@ -89,14 +92,60 @@ public sealed interface TransactionalDB extends Closeable {
void close(); void close();
} }
final class PessimisticTransactionalDB implements TransactionalDB { abstract sealed class BaseTransactionalDB<RDB extends RocksDB> implements TransactionalDB {
private final String path; private final String path;
private final TransactionDB db; protected final RDB db;
private final DatabaseTasks databaseTasks;
public PessimisticTransactionalDB(String path, TransactionDB db) { public BaseTransactionalDB(String path, RDB db, DatabaseTasks databaseTasks) {
this.path = path; this.path = path;
this.db = db; this.db = db;
this.databaseTasks = databaseTasks;
databaseTasks.start();
}
@Override
public final String getPath() {
return path;
}
@Override
public final RocksDB get() {
return db;
}
@Override
public void close() throws IOException {
List<Exception> exceptions = new ArrayList<>();
try {
databaseTasks.close();
} catch (Exception ex) {
exceptions.add(ex);
}
try {
db.closeE();
} catch (RocksDBException e) {
exceptions.add(e);
}
if (!exceptions.isEmpty()) {
IOException ex;
if (exceptions.size() == 1) {
ex = new IOException("Failed to close the database", exceptions.getFirst());
} else {
ex = new IOException("Failed to close the database");
exceptions.forEach(ex::addSuppressed);
}
throw ex;
}
}
}
final class PessimisticTransactionalDB extends BaseTransactionalDB<TransactionDB> {
public PessimisticTransactionalDB(String path, TransactionDB db, DatabaseTasks databaseTasks) {
super(path, db, databaseTasks);
} }
@Override @Override
@ -104,16 +153,6 @@ public sealed interface TransactionalDB extends Closeable {
return new TransactionalOptionsPessimistic(new TransactionOptions().setExpiration(timeoutMs)); return new TransactionalOptionsPessimistic(new TransactionOptions().setExpiration(timeoutMs));
} }
@Override
public String getPath() {
return path;
}
@Override
public RocksDB get() {
return db;
}
@Override @Override
public Transaction beginTransaction(WriteOptions writeOptions) { public Transaction beginTransaction(WriteOptions writeOptions) {
return db.beginTransaction(writeOptions); return db.beginTransaction(writeOptions);
@ -141,15 +180,6 @@ public sealed interface TransactionalDB extends Closeable {
); );
} }
@Override
public void close() throws IOException {
try {
db.closeE();
} catch (RocksDBException e) {
throw new IOException(e);
}
}
private record TransactionalOptionsPessimistic(TransactionOptions transactionOptions) implements private record TransactionalOptionsPessimistic(TransactionOptions transactionOptions) implements
TransactionalOptions { TransactionalOptions {
@ -160,14 +190,10 @@ public sealed interface TransactionalDB extends Closeable {
} }
} }
final class OptimisticTransactionalDB implements TransactionalDB { final class OptimisticTransactionalDB extends BaseTransactionalDB<OptimisticTransactionDB> {
private final String path; public OptimisticTransactionalDB(String path, OptimisticTransactionDB db, DatabaseTasks databaseTasks) {
private final OptimisticTransactionDB db; super(path, db, databaseTasks);
public OptimisticTransactionalDB(String path, OptimisticTransactionDB db) {
this.path = path;
this.db = db;
} }
@Override @Override
@ -175,16 +201,6 @@ public sealed interface TransactionalDB extends Closeable {
return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions()); return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions());
} }
@Override
public String getPath() {
return path;
}
@Override
public RocksDB get() {
return db;
}
@Override @Override
public Transaction beginTransaction(WriteOptions writeOptions) { public Transaction beginTransaction(WriteOptions writeOptions) {
return db.beginTransaction(writeOptions); return db.beginTransaction(writeOptions);
@ -212,15 +228,6 @@ public sealed interface TransactionalDB extends Closeable {
); );
} }
@Override
public void close() throws IOException {
try {
db.closeE();
} catch (RocksDBException e) {
throw new IOException(e);
}
}
private record TransactionalOptionsOptimistic(OptimisticTransactionOptions transactionOptions) implements private record TransactionalOptionsOptimistic(OptimisticTransactionOptions transactionOptions) implements
TransactionalOptions { TransactionalOptions {

View File

@ -36,6 +36,8 @@ database: {
log-path: ./logs log-path: ./logs
# Write-Ahead-Log data path # Write-Ahead-Log data path
wal-path: ./wal wal-path: ./wal
# If set and greater than zero, the WAL will not be flushed on every write, but every x seconds
delay-wal-flush-duration: PT5S
fallback-column-options: { fallback-column-options: {
# RocksDB data levels # RocksDB data levels
# Available compression types: PLAIN, SNAPPY, LZ4, LZ4_HC, ZSTD, ZLIB, BZLIB2 # Available compression types: PLAIN, SNAPPY, LZ4, LZ4_HC, ZSTD, ZLIB, BZLIB2