Implement closeRequested

This commit is contained in:
Andrea Cavalli 2022-06-08 18:52:15 +02:00
parent 3be0d3710c
commit 563defb2ff

View File

@ -130,6 +130,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
private final ConcurrentHashMap<Long, Snapshot> snapshotsHandles = new ConcurrentHashMap<>();
private final AtomicLong nextSnapshotNumbers = new AtomicLong(1);
private final StampedLock closeLock = new StampedLock();
private volatile boolean closeRequested = false;
private volatile boolean closed = false;
@SuppressWarnings("SwitchStatementWithTooFewBranches")
@ -189,6 +190,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
}
var rocksLogger = new RocksLog4jLogger(rocksdbOptions, logger);
this.persistentCaches = new HashMap<>();
for (Column column : columns) {
@ -361,7 +363,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
rocksdbOptions,
databaseOptions.persistentCaches(),
columnOptions.persistentCacheId(),
refs
refs,
rocksLogger
));
columnFamilyOptions.setTableFormatConfig(tableOptions);
@ -560,7 +563,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
DBOptions rocksdbOptions,
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
NullableString persistentCacheId,
RocksDBRefs refs) throws RocksDBException {
RocksDBRefs refs,
RocksLog4jLogger rocksLogger) throws RocksDBException {
if (persistentCacheId.isEmpty()) {
return null;
}
@ -581,7 +585,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
var persistentCache = new PersistentCache(Env.getDefault(),
foundCache.path(),
foundCache.size(),
new RocksLog4jLogger(rocksdbOptions, logger),
rocksLogger,
foundCache.optimizeForNvm()
);
refs.track(persistentCache);
@ -1057,14 +1061,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
List.of(Tag.of("db.name", dbName), Tag.of("db.statistics.name", tickerType.name())),
stats,
statistics -> {
if (closed) {
return 0d;
}
var closeReadLock = closeLock.readLock();
if (closeRequested || closed) return 0d;
long closeReadLock = 0;
try {
if (closed) {
return 0d;
}
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {}
try {
if (closeRequested || closed || closeReadLock == 0) return 0d;
return statistics.getTickerCount(tickerType);
} finally {
closeLock.unlockRead(closeReadLock);
@ -1247,21 +1250,22 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<MemoryStats> getMemoryStats() {
return Mono
.fromCallable(() -> {
if (closed) return null;
var closeReadLock = closeLock.readLock();
if (closeRequested || closed) return null;
long closeReadLock = 0;
try {
if (!closed) {
ensureOpen();
return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"),
db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.estimate-num-keys"),
db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(),
db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size()
);
} else {
return null;
}
//noinspection BlockingMethodInNonBlockingContext
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {}
try {
if (closeRequested || closed || closeReadLock == 0) return null;
ensureOpen();
return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"),
db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.estimate-num-keys"),
db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(),
db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size()
);
} finally {
closeLock.unlockRead(closeReadLock);
}
@ -1394,23 +1398,24 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<String> getRocksDBStats() {
return Mono
.fromCallable(() -> {
if (closed) return null;
var closeReadLock = closeLock.readLock();
if (closeRequested || closed) return null;
long closeReadLock = 0;
try {
if (!closed) {
ensureOpen();
StringBuilder aggregatedStats = new StringBuilder();
for (var entry : this.handles.entrySet()) {
aggregatedStats
.append(entry.getKey().name())
.append("\n")
.append(db.getProperty(entry.getValue(), "rocksdb.stats"))
.append("\n");
}
return aggregatedStats.toString();
} else {
return null;
//noinspection BlockingMethodInNonBlockingContext
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {}
try {
if (closeRequested || closed || closeReadLock == 0) return null;
ensureOpen();
StringBuilder aggregatedStats = new StringBuilder();
for (var entry : this.handles.entrySet()) {
aggregatedStats
.append(entry.getKey().name())
.append("\n")
.append(db.getProperty(entry.getValue(), "rocksdb.stats"))
.append("\n");
}
return aggregatedStats.toString();
} finally {
closeLock.unlockRead(closeReadLock);
}
@ -1425,10 +1430,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.fromIterable(handles.entrySet())
.flatMapSequential(handle -> Mono
.fromCallable(() -> {
if (closed) return null;
var closeReadLock = closeLock.readLock();
if (closeRequested || closed) return null;
long closeReadLock = 0;
try {
if (closed) return null;
//noinspection BlockingMethodInNonBlockingContext
closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {}
try {
if (closeRequested || closed || closeReadLock == 0) return null;
ensureOpen();
return db.getPropertiesOfAllTables(handle.getValue());
} finally {
@ -1531,6 +1540,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<Void> close() {
return Mono
.<Void>fromCallable(() -> {
closeRequested = true;
if (statistics != null) {
statistics.close();
statistics = null;