From 2546a76b9616b73283f29f8ff4166c7893d88981 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Fri, 29 Mar 2024 21:25:26 +0100 Subject: [PATCH] Flush wal every x seconds --- .../core/config/DatabaseConfig.java | 1 - .../core/config/GlobalDatabaseConfig.java | 4 + .../core/impl/rocksdb/DatabaseTasks.java | 53 +++++++++ .../core/impl/rocksdb/RocksDBLoader.java | 18 ++- .../core/impl/rocksdb/TransactionalDB.java | 109 ++++++++++-------- .../rockserver/core/resources/default.conf | 2 + 6 files changed, 131 insertions(+), 56 deletions(-) create mode 100644 src/main/java/it/cavallium/rockserver/core/impl/rocksdb/DatabaseTasks.java diff --git a/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java index 3534e36..ae21e06 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/DatabaseConfig.java @@ -5,5 +5,4 @@ import org.github.gestalt.config.exceptions.GestaltException; public interface DatabaseConfig { GlobalDatabaseConfig global() throws GestaltException; - } diff --git a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java index cad7a5e..8d2bd0a 100644 --- a/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java +++ b/src/main/java/it/cavallium/rockserver/core/config/GlobalDatabaseConfig.java @@ -1,5 +1,6 @@ package it.cavallium.rockserver.core.config; +import java.time.Duration; import org.github.gestalt.config.exceptions.GestaltException; import org.jetbrains.annotations.Nullable; @@ -32,6 +33,9 @@ public interface GlobalDatabaseConfig { @Nullable Path walPath() throws GestaltException; + @Nullable + Duration delayWalFlushDuration() throws GestaltException; + boolean absoluteConsistency() throws GestaltException; VolumeConfig[] volumes() throws GestaltException; diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/DatabaseTasks.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/DatabaseTasks.java new file mode 100644 index 0000000..c66847c --- /dev/null +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/DatabaseTasks.java @@ -0,0 +1,53 @@ +package it.cavallium.rockserver.core.impl.rocksdb; + +import java.io.Closeable; +import java.io.IOException; +import java.time.Duration; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +public class DatabaseTasks implements Closeable { + + private final RocksDB db; + private final boolean inMemory; + private final Logger logger; + private final Duration delayWalFlushConfig; + private Thread walFlushThread; + + public DatabaseTasks(RocksDB db, boolean inMemory, Duration delayWalFlushConfig) { + this.db = db; + this.inMemory = inMemory; + this.logger = Logger.getLogger("db." + db.getName() + ".tasks"); + this.delayWalFlushConfig = inMemory ? Duration.ZERO : delayWalFlushConfig; + } + + public synchronized void start() { + if (delayWalFlushConfig.toMillis() > 0) { + this.walFlushThread = Thread.ofVirtual().name("db." + db.getName() + ".tasks.wal.flush").start(() -> { + logger.info("Database delayed flush thread is enabled, it will flush the database every %.2f seconds".formatted(delayWalFlushConfig.toMillis() / 1000d)); + while (!Thread.interrupted()) { + try { + //noinspection BusyWait + Thread.sleep(delayWalFlushConfig.toMillis()); + } catch (InterruptedException _) { + return; + } + try { + db.flushWal(true); + } catch (RocksDBException e) { + logger.log(Level.SEVERE, "Failed to flush database \"%s\" wal".formatted(db.getName()), e); + } + } + }); + } + } + + @Override + public synchronized void close() throws IOException { + if (walFlushThread != null) { + walFlushThread.interrupt(); + } + } +} diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java index 19e38ba..d36e7b8 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/RocksDBLoader.java @@ -2,9 +2,9 @@ package it.cavallium.rockserver.core.impl.rocksdb; import it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType; import it.cavallium.rockserver.core.config.*; -import java.io.File; import java.io.InputStream; import java.nio.file.StandardCopyOption; +import java.time.Duration; import org.github.gestalt.config.exceptions.GestaltException; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -145,9 +145,11 @@ public class RocksDBLoader { options.setSkipStatsUpdateOnDbOpen(true); options.setCreateMissingColumnFamilies(true); options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL); - // todo: automatically flush every x seconds? - options.setManualWalFlush(true); + var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions); + if (delayWalFlushConfig.isPositive()) { + options.setManualWalFlush(true); + } options.setAvoidFlushDuringShutdown(false); // Flush all WALs during shutdown options.setAvoidFlushDuringRecovery(true); // Flush all WALs during startup @@ -257,6 +259,10 @@ public class RocksDBLoader { } } + private static Duration getWalFlushDelayConfig(DatabaseConfig databaseOptions) throws GestaltException { + return Objects.requireNonNullElse(databaseOptions.global().delayWalFlushDuration(), Duration.ZERO); + } + private static Optional getWalDir(Path definitiveDbPath, DatabaseConfig databaseOptions) throws GestaltException { return Optional.ofNullable(databaseOptions.global().walPath()) @@ -287,6 +293,7 @@ public class RocksDBLoader { private static TransactionalDB loadDb(@Nullable Path path, @NotNull Path definitiveDbPath, DatabaseConfig databaseOptions, OptionsWithCache optionsWithCache, RocksDBObjects refs, Logger logger) { + var inMemory = path == null; var rocksdbOptions = optionsWithCache.options(); try { List volumeConfigs = getVolumeConfigs(definitiveDbPath, databaseOptions); @@ -566,7 +573,10 @@ public class RocksDBLoader { } catch (RocksDBException ex) { logger.log(Level.FINE, "Failed to obtain stats", ex); } - return TransactionalDB.create(definitiveDbPath.toString(), db); + + var delayWalFlushConfig = getWalFlushDelayConfig(databaseOptions); + var dbTasks = new DatabaseTasks(db, inMemory, delayWalFlushConfig); + return TransactionalDB.create(definitiveDbPath.toString(), db, dbTasks); } catch (IOException | RocksDBException ex) { throw it.cavallium.rockserver.core.common.RocksDBException.of(it.cavallium.rockserver.core.common.RocksDBException.RocksDBErrorType.ROCKSDB_LOAD_ERROR, "Failed to load rocksdb", ex); } catch (GestaltException e) { diff --git a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java index 690e030..193cf31 100644 --- a/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java +++ b/src/main/java/it/cavallium/rockserver/core/impl/rocksdb/TransactionalDB.java @@ -1,7 +1,10 @@ package it.cavallium.rockserver.core.impl.rocksdb; +import it.cavallium.rockserver.core.impl.rocksdb.TransactionalDB.BaseTransactionalDB; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionOptions; import org.rocksdb.RocksDB; @@ -13,10 +16,10 @@ import org.rocksdb.WriteOptions; public sealed interface TransactionalDB extends Closeable { - static TransactionalDB create(String path, RocksDB db) { + static TransactionalDB create(String path, RocksDB db, DatabaseTasks databaseTasks) { return switch (db) { - case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB); - case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB); + case OptimisticTransactionDB optimisticTransactionDB -> new OptimisticTransactionalDB(path, optimisticTransactionDB, databaseTasks); + case TransactionDB transactionDB -> new PessimisticTransactionalDB(path, transactionDB, databaseTasks); default -> throw new UnsupportedOperationException("This database is not transactional"); }; } @@ -89,14 +92,60 @@ public sealed interface TransactionalDB extends Closeable { void close(); } - final class PessimisticTransactionalDB implements TransactionalDB { + abstract sealed class BaseTransactionalDB implements TransactionalDB { private final String path; - private final TransactionDB db; + protected final RDB db; + private final DatabaseTasks databaseTasks; - public PessimisticTransactionalDB(String path, TransactionDB db) { + public BaseTransactionalDB(String path, RDB db, DatabaseTasks databaseTasks) { this.path = path; this.db = db; + this.databaseTasks = databaseTasks; + + databaseTasks.start(); + } + + @Override + public final String getPath() { + return path; + } + + @Override + public final RocksDB get() { + return db; + } + + @Override + public void close() throws IOException { + List exceptions = new ArrayList<>(); + try { + databaseTasks.close(); + } catch (Exception ex) { + exceptions.add(ex); + } + try { + db.closeE(); + } catch (RocksDBException e) { + exceptions.add(e); + } + if (!exceptions.isEmpty()) { + IOException ex; + if (exceptions.size() == 1) { + ex = new IOException("Failed to close the database", exceptions.getFirst()); + } else { + ex = new IOException("Failed to close the database"); + exceptions.forEach(ex::addSuppressed); + } + throw ex; + } + } + } + + final class PessimisticTransactionalDB extends BaseTransactionalDB { + + public PessimisticTransactionalDB(String path, TransactionDB db, DatabaseTasks databaseTasks) { + super(path, db, databaseTasks); } @Override @@ -104,16 +153,6 @@ public sealed interface TransactionalDB extends Closeable { return new TransactionalOptionsPessimistic(new TransactionOptions().setExpiration(timeoutMs)); } - @Override - public String getPath() { - return path; - } - - @Override - public RocksDB get() { - return db; - } - @Override public Transaction beginTransaction(WriteOptions writeOptions) { return db.beginTransaction(writeOptions); @@ -141,15 +180,6 @@ public sealed interface TransactionalDB extends Closeable { ); } - @Override - public void close() throws IOException { - try { - db.closeE(); - } catch (RocksDBException e) { - throw new IOException(e); - } - } - private record TransactionalOptionsPessimistic(TransactionOptions transactionOptions) implements TransactionalOptions { @@ -160,14 +190,10 @@ public sealed interface TransactionalDB extends Closeable { } } - final class OptimisticTransactionalDB implements TransactionalDB { + final class OptimisticTransactionalDB extends BaseTransactionalDB { - private final String path; - private final OptimisticTransactionDB db; - - public OptimisticTransactionalDB(String path, OptimisticTransactionDB db) { - this.path = path; - this.db = db; + public OptimisticTransactionalDB(String path, OptimisticTransactionDB db, DatabaseTasks databaseTasks) { + super(path, db, databaseTasks); } @Override @@ -175,16 +201,6 @@ public sealed interface TransactionalDB extends Closeable { return new TransactionalOptionsOptimistic(new OptimisticTransactionOptions()); } - @Override - public String getPath() { - return path; - } - - @Override - public RocksDB get() { - return db; - } - @Override public Transaction beginTransaction(WriteOptions writeOptions) { return db.beginTransaction(writeOptions); @@ -212,15 +228,6 @@ public sealed interface TransactionalDB extends Closeable { ); } - @Override - public void close() throws IOException { - try { - db.closeE(); - } catch (RocksDBException e) { - throw new IOException(e); - } - } - private record TransactionalOptionsOptimistic(OptimisticTransactionOptions transactionOptions) implements TransactionalOptions { diff --git a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf index 723130b..4682ffe 100644 --- a/src/main/resources/it/cavallium/rockserver/core/resources/default.conf +++ b/src/main/resources/it/cavallium/rockserver/core/resources/default.conf @@ -36,6 +36,8 @@ database: { log-path: ./logs # Write-Ahead-Log data path wal-path: ./wal + # If set and greater than zero, the WAL will not be flushed on every write, but every x seconds + delay-wal-flush-duration: PT5S fallback-column-options: { # RocksDB data levels # Available compression types: PLAIN, SNAPPY, LZ4, LZ4_HC, ZSTD, ZLIB, BZLIB2