Implement preClose

This commit is contained in:
Andrea Cavalli 2022-06-21 22:52:42 +02:00
parent 8083364ebf
commit 8c9aca21b3
4 changed files with 51 additions and 16 deletions

View File

@ -64,5 +64,6 @@ public interface LLKeyValueDatabase extends LLSnapshottable, LLKeyValueDatabaseS
MeterRegistry getMeterRegistry(); MeterRegistry getMeterRegistry();
Mono<Void> preClose();
Mono<Void> close(); Mono<Void> close();
} }

View File

@ -75,6 +75,8 @@ import org.rocksdb.IndexType;
import org.rocksdb.InfoLogLevel; import org.rocksdb.InfoLogLevel;
import org.rocksdb.IngestExternalFileOptions; import org.rocksdb.IngestExternalFileOptions;
import org.rocksdb.LRUCache; import org.rocksdb.LRUCache;
import org.rocksdb.LogFile;
import org.rocksdb.MutableDBOptions;
import org.rocksdb.OptimisticTransactionDB; import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.PersistentCache; import org.rocksdb.PersistentCache;
import org.rocksdb.RocksDB; import org.rocksdb.RocksDB;
@ -317,9 +319,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
// tableOptions.setOptimizeFiltersForMemory(true); // tableOptions.setOptimizeFiltersForMemory(true);
columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB); columnFamilyOptions.setWriteBufferSize(256 * SizeUnit.MB);
} }
if (columnOptions.writeBufferSize().isPresent()) { }
columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get()); if (columnOptions.writeBufferSize().isPresent()) {
} columnFamilyOptions.setWriteBufferSize(columnOptions.writeBufferSize().get());
} }
tableOptions.setVerifyCompression(false); tableOptions.setVerifyCompression(false);
if (columnOptions.filter().isPresent()) { if (columnOptions.filter().isPresent()) {
@ -660,12 +662,30 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
try { try {
ensureOpen(); ensureOpen();
ensureOwned(flushOptions); ensureOwned(flushOptions);
db.flush(flushOptions); db.flush(flushOptions, List.copyOf(getAllColumnFamilyHandles().values()));
db.flushWal(true);
} finally { } finally {
closeLock.unlockRead(closeReadLock); closeLock.unlockRead(closeReadLock);
} }
} }
@Override
public Mono<Void> preClose() {
return Mono.<Void>fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
try (var fo = new FlushOptions().setWaitForFlush(true)) {
flush(fo);
}
db.cancelAllBackgroundWork(true);
return null;
} finally {
closeLock.unlockRead(closeReadLock);
}
}).subscribeOn(dbWScheduler);
}
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) { private RocksLevelOptions getRocksLevelOptions(DatabaseLevel levelOptions, RocksDBRefs refs) {
var compressionType = levelOptions.compression().getType(); var compressionType = levelOptions.compression().getType();
@ -955,7 +975,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setWalBytesPerSync(0) // default .setWalBytesPerSync(0) // default
.setIncreaseParallelism(1) .setIncreaseParallelism(1)
.setDbWriteBufferSize(8 * SizeUnit.MB) .setDbWriteBufferSize(8 * SizeUnit.MB)
.setWalTtlSeconds(0) .setWalTtlSeconds(60)
.setWalSizeLimitMB(0) .setWalSizeLimitMB(0)
.setMaxTotalWalSize(0) // automatic .setMaxTotalWalSize(0) // automatic
; ;
@ -991,7 +1011,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.setBytesPerSync(64 * SizeUnit.MB) .setBytesPerSync(64 * SizeUnit.MB)
.setWalBytesPerSync(64 * SizeUnit.MB) .setWalBytesPerSync(64 * SizeUnit.MB)
.setWalTtlSeconds(0) // Auto .setWalTtlSeconds(60) // Auto
.setWalSizeLimitMB(0) // Auto .setWalSizeLimitMB(0) // Auto
.setMaxTotalWalSize(0) // Auto .setMaxTotalWalSize(0) // Auto
; ;
@ -1507,8 +1527,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return Mono.<Void>fromCallable(() -> { return Mono.<Void>fromCallable(() -> {
try (var fo = new FlushOptions().setWaitForFlush(true)) { try (var fo = new FlushOptions().setWaitForFlush(true)) {
this.flush(fo); this.flush(fo);
return null; } catch (RocksDBException ex) {
if (!"ShutdownInProgress".equals(ex.getMessage())) {
throw ex;
}
logger.warn("Shutdown in progress. Flush cancelled", ex);
} }
return null;
}).subscribeOn(dbWScheduler); }).subscribeOn(dbWScheduler);
} }

View File

@ -170,17 +170,21 @@ public class LLMemoryKeyValueDatabase implements LLKeyValueDatabase {
return meterRegistry; return meterRegistry;
} }
@Override
public Mono<Void> preClose() {
return null;
}
@Override @Override
public Mono<Void> close() { public Mono<Void> close() {
return Mono return Mono.fromRunnable(() -> {
.fromRunnable(() -> { snapshots.forEach((snapshot, dbs) -> dbs.forEach((columnName, db) -> {
snapshots.forEach((snapshot, dbs) -> dbs.forEach((columnName, db) -> { db.clear();
db.clear(); }));
})); mainDb.forEach((columnName, db) -> {
mainDb.forEach((columnName, db) -> { db.clear();
db.clear(); });
}); });
});
} }
@Override @Override

View File

@ -398,6 +398,11 @@ public class LLQuicConnection implements LLDatabaseConnection {
return meterRegistry; return meterRegistry;
} }
@Override
public Mono<Void> preClose() {
return null;
}
@Override @Override
public Mono<Void> close() { public Mono<Void> close() {
return sendRequest(new CloseDatabase(id)).then(); return sendRequest(new CloseDatabase(id)).then();