From 8c9aca21b3ee0001390ca4ae5347e67c9f9e89b4 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Tue, 21 Jun 2022 22:52:42 +0200 Subject: [PATCH] Implement preClose --- .../dbengine/database/LLKeyValueDatabase.java | 1 + .../disk/LLLocalKeyValueDatabase.java | 39 +++++++++++++++---- .../memory/LLMemoryKeyValueDatabase.java | 22 ++++++----- .../database/remote/LLQuicConnection.java | 5 +++ 4 files changed, 51 insertions(+), 16 deletions(-) diff --git a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java index fd3e720..8b8623f 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/LLKeyValueDatabase.java @@ -64,5 +64,6 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS MeterRegistry getMeterRegistry(); + Mono preClose(); Mono close(); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index ee6d634..5bdc9c5 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -75,6 +75,8 @@ import org.rocksdb.IndexType; import org.rocksdb.InfoLogLevel; import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.LRUCache; +import org.rocksdb.LogFile; +import org.rocksdb.MutableDBOptions; import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.PersistentCache; import org.rocksdb.RocksDB; @@ -317,9 +319,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { // tableOptions.setOptimizeFiltersForMemory(true); columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB); } - if (columnOptions.writeBufferSize().isPresent()) { - columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get()); - } + } + if (columnOptions.writeBufferSize().isPresent()) { + columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get()); } tableOptions.setVerifyCompression(false); if (columnOptions.filter().isPresent()) { @@ -660,12 +662,30 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { try { ensureOpen(); ensureOwned(flushOptions); - db.flush(flushOptions); + db.flush(flushOptions, List.copyOf(getAllColumnFamilyHandles().values())); + db.flushWal(true); } finally { closeLock.unlockRead(closeReadLock); } } + @Override + public Mono preClose() { + return Mono.fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + try (var fo = new FlushOptions().setWaitForFlush(true)) { + flush(fo); + } + db.cancelAllBackgroundWork(true); + return null; + } finally { + closeLock.unlockRead(closeReadLock); + } + }).subscribeOn(dbWScheduler); + } + private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) { var compressionType = levelOptions.compression().getType(); @@ -955,7 +975,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setWalBytesPerSync(0) // default .setIncreaseParallelism(1) .setDbWriteBufferSize(8 * SizeUnit.MB) - .setWalTtlSeconds(0) + .setWalTtlSeconds(60) .setWalSizeLimitMB(0) .setMaxTotalWalSize(0) // automatic ; @@ -991,7 +1011,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .setBytesPerSync(64 * SizeUnit.MB) .setWalBytesPerSync(64 * SizeUnit.MB) - .setWalTtlSeconds(0) // Auto + .setWalTtlSeconds(60) // Auto .setWalSizeLimitMB(0) // Auto .setMaxTotalWalSize(0) // Auto ; @@ -1507,8 +1527,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return Mono.fromCallable(() -> { try (var fo = new FlushOptions().setWaitForFlush(true)) { this.flush(fo); - return null; + } catch (RocksDBException ex) { + if (!"ShutdownInProgress".equals(ex.getMessage())) { + throw ex; + } + logger.warn("Shutdown in progress. Flush cancelled", ex); } + return null; }).subscribeOn(dbWScheduler); } diff --git a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java index c840758..8425baf 100644 --- a/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/memory/LLMemoryKeyValueDatabase.java @@ -170,17 +170,21 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase { return meterRegistry; } + @Override + public Mono preClose() { + return null; + } + @Override public Mono close() { - return Mono - .fromRunnable(() -> { - snapshots.forEach((snapshot, dbs) -> dbs.forEach((columnName, db) -> { - db.clear(); - })); - mainDb.forEach((columnName, db) -> { - db.clear(); - }); - }); + return Mono.fromRunnable(() -> { + snapshots.forEach((snapshot, dbs) -> dbs.forEach((columnName, db) -> { + db.clear(); + })); + mainDb.forEach((columnName, db) -> { + db.clear(); + }); + }); } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java index b564ffd..8a8a869 100644 --- a/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java +++ b/src/main/java/it/cavallium/dbengine/database/remote/LLQuicConnection.java @@ -398,6 +398,11 @@ public class LLQuicConnection implements LLDatabaseConnection { return meterRegistry; } + @Override + public Mono preClose() { + return null; + } + @Override public Mono close() { return sendRequest(new CloseDatabase(id)).then();