diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 76e7230..2f6ac2f 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -130,6 +130,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); private final StampedLock closeLock = new StampedLock(); + private volatile boolean closeRequested = false; private volatile boolean closed = false; @SuppressWarnings("SwitchStatementWithTooFewBranches") @@ -189,6 +190,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } + var rocksLogger = new RocksLog4jLogger(rocksdbOptions, logger); this.persistentCaches = new HashMap<>(); for (Column column : columns) { @@ -361,7 +363,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { rocksdbOptions, databaseOptions.persistentCaches(), columnOptions.persistentCacheId(), - refs + refs, + rocksLogger )); columnFamilyOptions.setTableFormatConfig(tableOptions); @@ -560,7 +563,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { DBOptions rocksdbOptions, List persistentCaches, NullableString persistentCacheId, - RocksDBRefs refs) throws RocksDBException { + RocksDBRefs refs, + RocksLog4jLogger rocksLogger) throws RocksDBException { if (persistentCacheId.isEmpty()) { return null; } @@ -581,7 +585,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { var persistentCache = new PersistentCache(Env.getDefault(), foundCache.path(), foundCache.size(), - new RocksLog4jLogger(rocksdbOptions, logger), + rocksLogger, foundCache.optimizeForNvm() ); refs.track(persistentCache); @@ -1057,14 +1061,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { List.of(Tag.of("db.name", dbName), Tag.of("db.statistics.name", tickerType.name())), stats, statistics -> { - if (closed) { - return 0d; - } - var closeReadLock = closeLock.readLock(); + if (closeRequested || closed) return 0d; + long closeReadLock = 0; try { - if (closed) { - return 0d; - } + closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS); + } catch (InterruptedException ignored) {} + try { + if (closeRequested || closed || closeReadLock == 0) return 0d; return statistics.getTickerCount(tickerType); } finally { closeLock.unlockRead(closeReadLock); @@ -1247,21 +1250,22 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono getMemoryStats() { return Mono .fromCallable(() -> { - if (closed) return null; - var closeReadLock = closeLock.readLock(); + if (closeRequested || closed) return null; + long closeReadLock = 0; try { - if (!closed) { - ensureOpen(); - return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"), - db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"), - db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"), - db.getAggregatedLongProperty("rocksdb.estimate-num-keys"), - db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(), - db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size() - ); - } else { - return null; - } + //noinspection BlockingMethodInNonBlockingContext + closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS); + } catch (InterruptedException ignored) {} + try { + if (closeRequested || closed || closeReadLock == 0) return null; + ensureOpen(); + return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"), + db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"), + db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"), + db.getAggregatedLongProperty("rocksdb.estimate-num-keys"), + db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(), + db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size() + ); } finally { closeLock.unlockRead(closeReadLock); } @@ -1394,23 +1398,24 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono getRocksDBStats() { return Mono .fromCallable(() -> { - if (closed) return null; - var closeReadLock = closeLock.readLock(); + if (closeRequested || closed) return null; + long closeReadLock = 0; try { - if (!closed) { - ensureOpen(); - StringBuilder aggregatedStats = new StringBuilder(); - for (var entry : this.handles.entrySet()) { - aggregatedStats - .append(entry.getKey().name()) - .append("\n") - .append(db.getProperty(entry.getValue(), "rocksdb.stats")) - .append("\n"); - } - return aggregatedStats.toString(); - } else { - return null; + //noinspection BlockingMethodInNonBlockingContext + closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS); + } catch (InterruptedException ignored) {} + try { + if (closeRequested || closed || closeReadLock == 0) return null; + ensureOpen(); + StringBuilder aggregatedStats = new StringBuilder(); + for (var entry : this.handles.entrySet()) { + aggregatedStats + .append(entry.getKey().name()) + .append("\n") + .append(db.getProperty(entry.getValue(), "rocksdb.stats")) + .append("\n"); } + return aggregatedStats.toString(); } finally { closeLock.unlockRead(closeReadLock); } @@ -1425,10 +1430,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .fromIterable(handles.entrySet()) .flatMapSequential(handle -> Mono .fromCallable(() -> { - if (closed) return null; - var closeReadLock = closeLock.readLock(); + if (closeRequested || closed) return null; + long closeReadLock = 0; try { - if (closed) return null; + //noinspection BlockingMethodInNonBlockingContext + closeReadLock = closeLock.tryReadLock(1, TimeUnit.SECONDS); + } catch (InterruptedException ignored) {} + try { + if (closeRequested || closed || closeReadLock == 0) return null; ensureOpen(); return db.getPropertiesOfAllTables(handle.getValue()); } finally { @@ -1531,6 +1540,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono close() { return Mono .fromCallable(() -> { + closeRequested = true; if (statistics != null) { statistics.close(); statistics = null;