diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index e6ffa61..f514d72 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -332,17 +332,11 @@ public sealed abstract class AbstractRocksDBColumn implements } protected void ensureOpen() { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called in a nonblocking thread"); - } - ensureOwned(db); - ensureOwned(cfh); + RocksDBUtils.ensureOpen(db, cfh); } protected void ensureOwned(org.rocksdb.RocksObject rocksObject) { - if (!rocksObject.isOwningHandle()) { - throw new IllegalStateException("Not owning handle"); - } + RocksDBUtils.ensureOwned(rocksObject); } @Override @@ -939,7 +933,13 @@ public sealed abstract class AbstractRocksDBColumn implements } protected int getLevels() { - return RocksDBUtils.getLevels(db, cfh); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + return RocksDBUtils.getLevels(db, cfh); + } finally { + closeLock.unlockRead(closeReadLock); + } } @Override diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 88669b4..e9078bf 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -516,6 +516,17 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { || Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false")); } + protected void ensureOpen() { + if (closed) { + throw new IllegalStateException("Database closed"); + } + RocksDBUtils.ensureOpen(db, null); + } + + protected void ensureOwned(org.rocksdb.RocksObject rocksObject) { + RocksDBUtils.ensureOwned(rocksObject); + } + private synchronized PersistentCache resolvePersistentCache(HashMap caches, DBOptions rocksdbOptions, List persistentCaches, @@ -562,22 +573,51 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } public int getLevels(Column column) { - return RocksDBUtils.getLevels(db, handles.get(column)); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + var cfh = handles.get(column); + ensureOwned(cfh); + return RocksDBUtils.getLevels(db, cfh); + } finally { + closeLock.unlockRead(closeReadLock); + } } public List getColumnFiles(Column column, boolean excludeLastLevel) { - var cfh = handles.get(column); - return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + var cfh = handles.get(column); + ensureOwned(cfh); + return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel); + } finally { + closeLock.unlockRead(closeReadLock); + } } public void forceCompaction(int volumeId) throws RocksDBException { - for (var cfh : this.handles.values()) { - RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + for (var cfh : this.handles.values()) { + ensureOwned(cfh); + RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger); + } + } finally { + closeLock.unlockRead(closeReadLock); } } public void flush(FlushOptions flushOptions) throws RocksDBException { - db.flush(flushOptions); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(flushOptions); + db.flush(flushOptions); + } finally { + closeLock.unlockRead(closeReadLock); + } } private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {} @@ -601,7 +641,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { if (closed) { return 0d; } + var closeReadLock = closeLock.readLock(); try { + if (closed) { + return 0d; + } return database.getAggregatedLongProperty(propertyName) / (divideByAllColumns ? getAllColumnFamilyHandles().size() : 1d); } catch (RocksDBException e) { @@ -609,6 +653,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return 0d; } throw new RuntimeException(e); + } finally { + closeLock.unlockRead(closeReadLock); } } ); @@ -949,19 +995,41 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } + private Snapshot getSnapshotLambda(LLSnapshot snapshot) { + var closeReadSnapLock = closeLock.readLock(); + try { + ensureOpen(); + var snapshotHandle = snapshotsHandles.get(snapshot.getSequenceNumber()); + ensureOwned(snapshotHandle); + return snapshotHandle; + } finally { + closeLock.unlockRead(closeReadSnapLock); + } + } + @Override public Mono getSingleton(byte[] singletonListColumnName, byte[] name, byte @Nullable[] defaultValue) { return Mono - .fromCallable(() -> new LLLocalSingleton( - getRocksDBColumn(db, getCfh(singletonListColumnName)), - (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), - LLLocalKeyValueDatabase.this.name, - name, - ColumnUtils.toString(singletonListColumnName), - dbWScheduler, dbRScheduler, defaultValue - )) + .fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + var cfh = getCfh(singletonListColumnName); + ensureOwned(cfh); + return new LLLocalSingleton( + getRocksDBColumn(db, cfh), + this::getSnapshotLambda, + LLLocalKeyValueDatabase.this.name, + name, + ColumnUtils.toString(singletonListColumnName), + dbWScheduler, dbRScheduler, defaultValue + ); + } finally { + closeLock.unlockRead(closeReadLock); + } + }) .onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause)) .subscribeOn(dbRScheduler); } @@ -969,28 +1037,45 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @Override public Mono getDictionary(byte[] columnName, UpdateMode updateMode) { return Mono - .fromCallable(() -> new LLLocalDictionary( - allocator, - getRocksDBColumn(db, getCfh(columnName)), - name, - ColumnUtils.toString(columnName), - dbWScheduler, - dbRScheduler, - (snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()), - updateMode, - databaseOptions - )) + .fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + var cfh = getCfh(columnName); + ensureOwned(cfh); + return new LLLocalDictionary( + allocator, + getRocksDBColumn(db, cfh), + name, + ColumnUtils.toString(columnName), + dbWScheduler, + dbRScheduler, + this::getSnapshotLambda, + updateMode, + databaseOptions + ); + } finally { + closeLock.unlockRead(closeReadLock); + } + }) .subscribeOn(dbRScheduler); } public RocksDBColumn getRocksDBColumn(byte[] columnName) { - ColumnFamilyHandle cfh; + var closeReadLock = closeLock.readLock(); try { - cfh = getCfh(columnName); - } catch (RocksDBException e) { - throw new UnsupportedOperationException("Column family doesn't exist: " + Arrays.toString(columnName), e); + ensureOpen(); + ColumnFamilyHandle cfh; + try { + cfh = getCfh(columnName); + ensureOwned(cfh); + } catch (RocksDBException e) { + throw new UnsupportedOperationException("Column family doesn't exist: " + Arrays.toString(columnName), e); + } + return getRocksDBColumn(db, cfh); + } finally { + closeLock.unlockRead(closeReadLock); } - return getRocksDBColumn(db, cfh); } private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { @@ -1031,15 +1116,29 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @Override public Mono getProperty(String propertyName) { - return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName)) - .onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)) - .subscribeOn(dbRScheduler); + return Mono.fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + return db.getAggregatedLongProperty(propertyName); + } finally { + closeLock.unlockRead(closeReadLock); + } + }).onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)).subscribeOn(dbRScheduler); } public Flux getSSTS() { var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes()); return Mono - .fromCallable(() -> db.getLiveFiles()) + .fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + return db.getLiveFiles(); + } finally { + closeLock.unlockRead(closeReadLock); + } + }) .flatMapIterable(liveFiles -> liveFiles.files) .filter(file -> file.endsWith(".sst")) .map(file -> file.substring(1)) @@ -1064,6 +1163,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return sstsFlux .map(path -> path.toAbsolutePath().toString()) .flatMap(sst -> Mono.fromCallable(() -> { + var closeReadLock = closeLock.readLock(); try (var opts = new IngestExternalFileOptions()) { try { logger.info("Ingesting SST \"{}\"...", sst); @@ -1072,6 +1172,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } catch (RocksDBException e) { logger.error("Can't ingest SST \"{}\"", sst, e); } + } finally { + closeLock.unlockRead(closeReadLock); } return null; }).subscribeOn(Schedulers.boundedElastic())) @@ -1082,16 +1184,23 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono getMemoryStats() { return Mono .fromCallable(() -> { - if (!closed) { - return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"), - db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"), - db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"), - db.getAggregatedLongProperty("rocksdb.estimate-num-keys"), - db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(), - db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size() - ); - } else { - return null; + if (closed) return null; + var closeReadLock = closeLock.readLock(); + try { + if (!closed) { + ensureOpen(); + return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"), + db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"), + db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"), + db.getAggregatedLongProperty("rocksdb.estimate-num-keys"), + db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(), + db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size() + ); + } else { + return null; + } + } finally { + closeLock.unlockRead(closeReadLock); } }) .onErrorMap(cause -> new IOException("Failed to read memory stats", cause)) @@ -1102,18 +1211,25 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono getRocksDBStats() { return Mono .fromCallable(() -> { - if (!closed) { - StringBuilder aggregatedStats = new StringBuilder(); - for (var entry : this.handles.entrySet()) { - aggregatedStats - .append(entry.getKey().name()) - .append("\n") - .append(db.getProperty(entry.getValue(), "rocksdb.stats")) - .append("\n"); + if (closed) return null; + var closeReadLock = closeLock.readLock(); + try { + if (!closed) { + ensureOpen(); + StringBuilder aggregatedStats = new StringBuilder(); + for (var entry : this.handles.entrySet()) { + aggregatedStats + .append(entry.getKey().name()) + .append("\n") + .append(db.getProperty(entry.getValue(), "rocksdb.stats")) + .append("\n"); + } + return aggregatedStats.toString(); + } else { + return null; } - return aggregatedStats.toString(); - } else { - return null; + } finally { + closeLock.unlockRead(closeReadLock); } }) .onErrorMap(cause -> new IOException("Failed to read stats", cause)) @@ -1126,10 +1242,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .fromIterable(handles.entrySet()) .flatMapSequential(handle -> Mono .fromCallable(() -> { - if (!closed) { + if (closed) return null; + var closeReadLock = closeLock.readLock(); + try { + if (closed) return null; + ensureOpen(); return db.getPropertiesOfAllTables(handle.getValue()); - } else { - return null; + } finally { + closeLock.unlockRead(closeReadLock); } }) .subscribeOn(dbRScheduler) @@ -1143,7 +1263,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono verifyChecksum() { return Mono .fromCallable(() -> { - db.verifyChecksum(); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + db.verifyChecksum(); + } finally { + closeLock.unlockRead(closeReadLock); + } return null; }) .onErrorMap(cause -> new IOException("Failed to verify checksum of database \"" @@ -1181,22 +1307,20 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { @Override public Mono takeSnapshot() { - return Mono - .fromCallable(() -> snapshotTime.recordCallable(() -> { - var closeReadLock = closeLock.readLock(); - try { - if (closed) { - throw new IllegalStateException("Database closed"); - } - var snapshot = db.getSnapshot(); - long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); - this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); - return new LLSnapshot(currentSnapshotSequenceNumber); - } finally { - closeLock.unlockRead(closeReadLock); - } - })) - .subscribeOn(dbRScheduler); + return Mono.fromCallable(() -> { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + return snapshotTime.recordCallable(() -> { + var snapshot = db.getSnapshot(); + long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement(); + this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); + return new LLSnapshot(currentSnapshotSequenceNumber); + }); + } finally { + closeLock.unlockRead(closeReadLock); + } + }).subscribeOn(dbRScheduler); } @Override @@ -1205,9 +1329,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .fromCallable(() -> { var closeReadLock = closeLock.readLock(); try { - if (closed) { - throw new IllegalStateException("Database closed"); - } Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber()); if (dbSnapshot == null) { throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!"); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java index db95a02..5b20ae2 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java @@ -7,6 +7,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.CompactionJobInfo; import org.rocksdb.CompactionOptions; @@ -94,4 +95,18 @@ public class RocksDBUtils { } } } + + public static void ensureOpen(RocksDB db, @Nullable ColumnFamilyHandle cfh) { + if (Schedulers.isInNonBlockingThread()) { + throw new UnsupportedOperationException("Called in a nonblocking thread"); + } + ensureOwned(db); + ensureOwned(cfh); + } + + public static void ensureOwned(@Nullable org.rocksdb.RocksObject rocksObject) { + if (rocksObject != null && !rocksObject.isOwningHandle()) { + throw new IllegalStateException("Not owning handle"); + } + } }