Safer access to database elements

This commit is contained in:
Andrea Cavalli 2022-04-30 14:21:20 +02:00
parent 88a1add102
commit e03afafcee
3 changed files with 223 additions and 87 deletions

View File

@ -332,17 +332,11 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
protected void ensureOpen() {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called in a nonblocking thread");
}
ensureOwned(db);
ensureOwned(cfh);
RocksDBUtils.ensureOpen(db, cfh);
}
protected void ensureOwned(org.rocksdb.RocksObject rocksObject) {
if (!rocksObject.isOwningHandle()) {
throw new IllegalStateException("Not owning handle");
}
RocksDBUtils.ensureOwned(rocksObject);
}
@Override
@ -939,7 +933,13 @@ public sealed abstract class AbstractRocksDBColumn<T extends RocksDB> implements
}
protected int getLevels() {
return RocksDBUtils.getLevels(db, cfh);
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
return RocksDBUtils.getLevels(db, cfh);
} finally {
closeLock.unlockRead(closeReadLock);
}
}
@Override

View File

@ -516,6 +516,17 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|| Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.disableslowdown", "false"));
}
protected void ensureOpen() {
if (closed) {
throw new IllegalStateException("Database closed");
}
RocksDBUtils.ensureOpen(db, null);
}
protected void ensureOwned(org.rocksdb.RocksObject rocksObject) {
RocksDBUtils.ensureOwned(rocksObject);
}
private synchronized PersistentCache resolvePersistentCache(HashMap<String, PersistentCache> caches,
DBOptions rocksdbOptions,
List<it.cavallium.dbengine.rpc.current.data.PersistentCache> persistentCaches,
@ -562,22 +573,51 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
public int getLevels(Column column) {
return RocksDBUtils.getLevels(db, handles.get(column));
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
var cfh = handles.get(column);
ensureOwned(cfh);
return RocksDBUtils.getLevels(db, cfh);
} finally {
closeLock.unlockRead(closeReadLock);
}
}
public List<String> getColumnFiles(Column column, boolean excludeLastLevel) {
var cfh = handles.get(column);
return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel);
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
var cfh = handles.get(column);
ensureOwned(cfh);
return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel);
} finally {
closeLock.unlockRead(closeReadLock);
}
}
public void forceCompaction(int volumeId) throws RocksDBException {
for (var cfh : this.handles.values()) {
RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger);
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
for (var cfh : this.handles.values()) {
ensureOwned(cfh);
RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger);
}
} finally {
closeLock.unlockRead(closeReadLock);
}
}
public void flush(FlushOptions flushOptions) throws RocksDBException {
db.flush(flushOptions);
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
ensureOwned(flushOptions);
db.flush(flushOptions);
} finally {
closeLock.unlockRead(closeReadLock);
}
}
private record RocksLevelOptions(CompressionType compressionType, CompressionOptions compressionOptions) {}
@ -601,7 +641,11 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
if (closed) {
return 0d;
}
var closeReadLock = closeLock.readLock();
try {
if (closed) {
return 0d;
}
return database.getAggregatedLongProperty(propertyName)
/ (divideByAllColumns ? getAllColumnFamilyHandles().size() : 1d);
} catch (RocksDBException e) {
@ -609,6 +653,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return 0d;
}
throw new RuntimeException(e);
} finally {
closeLock.unlockRead(closeReadLock);
}
}
);
@ -949,19 +995,41 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
}
}
private Snapshot getSnapshotLambda(LLSnapshot snapshot) {
var closeReadSnapLock = closeLock.readLock();
try {
ensureOpen();
var snapshotHandle = snapshotsHandles.get(snapshot.getSequenceNumber());
ensureOwned(snapshotHandle);
return snapshotHandle;
} finally {
closeLock.unlockRead(closeReadSnapLock);
}
}
@Override
public Mono<LLLocalSingleton> getSingleton(byte[] singletonListColumnName,
byte[] name,
byte @Nullable[] defaultValue) {
return Mono
.fromCallable(() -> new LLLocalSingleton(
getRocksDBColumn(db, getCfh(singletonListColumnName)),
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
LLLocalKeyValueDatabase.this.name,
name,
ColumnUtils.toString(singletonListColumnName),
dbWScheduler, dbRScheduler, defaultValue
))
.fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
var cfh = getCfh(singletonListColumnName);
ensureOwned(cfh);
return new LLLocalSingleton(
getRocksDBColumn(db, cfh),
this::getSnapshotLambda,
LLLocalKeyValueDatabase.this.name,
name,
ColumnUtils.toString(singletonListColumnName),
dbWScheduler, dbRScheduler, defaultValue
);
} finally {
closeLock.unlockRead(closeReadLock);
}
})
.onErrorMap(cause -> new IOException("Failed to read " + Arrays.toString(name), cause))
.subscribeOn(dbRScheduler);
}
@ -969,28 +1037,45 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@Override
public Mono<LLLocalDictionary> getDictionary(byte[] columnName, UpdateMode updateMode) {
return Mono
.fromCallable(() -> new LLLocalDictionary(
allocator,
getRocksDBColumn(db, getCfh(columnName)),
name,
ColumnUtils.toString(columnName),
dbWScheduler,
dbRScheduler,
(snapshot) -> snapshotsHandles.get(snapshot.getSequenceNumber()),
updateMode,
databaseOptions
))
.fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
var cfh = getCfh(columnName);
ensureOwned(cfh);
return new LLLocalDictionary(
allocator,
getRocksDBColumn(db, cfh),
name,
ColumnUtils.toString(columnName),
dbWScheduler,
dbRScheduler,
this::getSnapshotLambda,
updateMode,
databaseOptions
);
} finally {
closeLock.unlockRead(closeReadLock);
}
})
.subscribeOn(dbRScheduler);
}
public RocksDBColumn getRocksDBColumn(byte[] columnName) {
ColumnFamilyHandle cfh;
var closeReadLock = closeLock.readLock();
try {
cfh = getCfh(columnName);
} catch (RocksDBException e) {
throw new UnsupportedOperationException("Column family doesn't exist: " + Arrays.toString(columnName), e);
ensureOpen();
ColumnFamilyHandle cfh;
try {
cfh = getCfh(columnName);
ensureOwned(cfh);
} catch (RocksDBException e) {
throw new UnsupportedOperationException("Column family doesn't exist: " + Arrays.toString(columnName), e);
}
return getRocksDBColumn(db, cfh);
} finally {
closeLock.unlockRead(closeReadLock);
}
return getRocksDBColumn(db, cfh);
}
private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) {
@ -1031,15 +1116,29 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@Override
public Mono<Long> getProperty(String propertyName) {
return Mono.fromCallable(() -> db.getAggregatedLongProperty(propertyName))
.onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause))
.subscribeOn(dbRScheduler);
return Mono.fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
return db.getAggregatedLongProperty(propertyName);
} finally {
closeLock.unlockRead(closeReadLock);
}
}).onErrorMap(cause -> new IOException("Failed to read " + propertyName, cause)).subscribeOn(dbRScheduler);
}
public Flux<Path> getSSTS() {
var paths = convertPaths(dbPath.toAbsolutePath().getParent(), dbPath.getFileName(), databaseOptions.volumes());
return Mono
.fromCallable(() -> db.getLiveFiles())
.fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
return db.getLiveFiles();
} finally {
closeLock.unlockRead(closeReadLock);
}
})
.flatMapIterable(liveFiles -> liveFiles.files)
.filter(file -> file.endsWith(".sst"))
.map(file -> file.substring(1))
@ -1064,6 +1163,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return sstsFlux
.map(path -> path.toAbsolutePath().toString())
.flatMap(sst -> Mono.fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try (var opts = new IngestExternalFileOptions()) {
try {
logger.info("Ingesting SST \"{}\"...", sst);
@ -1072,6 +1172,8 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
} catch (RocksDBException e) {
logger.error("Can't ingest SST \"{}\"", sst, e);
}
} finally {
closeLock.unlockRead(closeReadLock);
}
return null;
}).subscribeOn(Schedulers.boundedElastic()))
@ -1082,16 +1184,23 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<MemoryStats> getMemoryStats() {
return Mono
.fromCallable(() -> {
if (!closed) {
return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"),
db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.estimate-num-keys"),
db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(),
db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size()
);
} else {
return null;
if (closed) return null;
var closeReadLock = closeLock.readLock();
try {
if (!closed) {
ensureOpen();
return new MemoryStats(db.getAggregatedLongProperty("rocksdb.estimate-table-readers-mem"),
db.getAggregatedLongProperty("rocksdb.size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.cur-size-all-mem-tables"),
db.getAggregatedLongProperty("rocksdb.estimate-num-keys"),
db.getAggregatedLongProperty("rocksdb.block-cache-usage") / this.handles.size(),
db.getAggregatedLongProperty("rocksdb.block-cache-pinned-usage") / this.handles.size()
);
} else {
return null;
}
} finally {
closeLock.unlockRead(closeReadLock);
}
})
.onErrorMap(cause -> new IOException("Failed to read memory stats", cause))
@ -1102,18 +1211,25 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<String> getRocksDBStats() {
return Mono
.fromCallable(() -> {
if (!closed) {
StringBuilder aggregatedStats = new StringBuilder();
for (var entry : this.handles.entrySet()) {
aggregatedStats
.append(entry.getKey().name())
.append("\n")
.append(db.getProperty(entry.getValue(), "rocksdb.stats"))
.append("\n");
if (closed) return null;
var closeReadLock = closeLock.readLock();
try {
if (!closed) {
ensureOpen();
StringBuilder aggregatedStats = new StringBuilder();
for (var entry : this.handles.entrySet()) {
aggregatedStats
.append(entry.getKey().name())
.append("\n")
.append(db.getProperty(entry.getValue(), "rocksdb.stats"))
.append("\n");
}
return aggregatedStats.toString();
} else {
return null;
}
return aggregatedStats.toString();
} else {
return null;
} finally {
closeLock.unlockRead(closeReadLock);
}
})
.onErrorMap(cause -> new IOException("Failed to read stats", cause))
@ -1126,10 +1242,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.fromIterable(handles.entrySet())
.flatMapSequential(handle -> Mono
.fromCallable(() -> {
if (!closed) {
if (closed) return null;
var closeReadLock = closeLock.readLock();
try {
if (closed) return null;
ensureOpen();
return db.getPropertiesOfAllTables(handle.getValue());
} else {
return null;
} finally {
closeLock.unlockRead(closeReadLock);
}
})
.subscribeOn(dbRScheduler)
@ -1143,7 +1263,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
public Mono<Void> verifyChecksum() {
return Mono
.<Void>fromCallable(() -> {
db.verifyChecksum();
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
db.verifyChecksum();
} finally {
closeLock.unlockRead(closeReadLock);
}
return null;
})
.onErrorMap(cause -> new IOException("Failed to verify checksum of database \""
@ -1181,22 +1307,20 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@Override
public Mono<LLSnapshot> takeSnapshot() {
return Mono
.fromCallable(() -> snapshotTime.recordCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
if (closed) {
throw new IllegalStateException("Database closed");
}
var snapshot = db.getSnapshot();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
} finally {
closeLock.unlockRead(closeReadLock);
}
}))
.subscribeOn(dbRScheduler);
return Mono.fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
ensureOpen();
return snapshotTime.recordCallable(() -> {
var snapshot = db.getSnapshot();
long currentSnapshotSequenceNumber = nextSnapshotNumbers.getAndIncrement();
this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot);
return new LLSnapshot(currentSnapshotSequenceNumber);
});
} finally {
closeLock.unlockRead(closeReadLock);
}
}).subscribeOn(dbRScheduler);
}
@Override
@ -1205,9 +1329,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
.<Void>fromCallable(() -> {
var closeReadLock = closeLock.readLock();
try {
if (closed) {
throw new IllegalStateException("Database closed");
}
Snapshot dbSnapshot = this.snapshotsHandles.remove(snapshot.getSequenceNumber());
if (dbSnapshot == null) {
throw new IOException("Snapshot " + snapshot.getSequenceNumber() + " not found!");

View File

@ -7,6 +7,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.CompactionJobInfo;
import org.rocksdb.CompactionOptions;
@ -94,4 +95,18 @@ public class RocksDBUtils {
}
}
}
public static void ensureOpen(RocksDB db, @Nullable ColumnFamilyHandle cfh) {
if (Schedulers.isInNonBlockingThread()) {
throw new UnsupportedOperationException("Called in a nonblocking thread");
}
ensureOwned(db);
ensureOwned(cfh);
}
public static void ensureOwned(@Nullable org.rocksdb.RocksObject rocksObject) {
if (rocksObject != null && !rocksObject.isOwningHandle()) {
throw new IllegalStateException("Not owning handle");
}
}
}