This commit is contained in:
Andrea Cavalli 2022-04-10 20:15:05 +02:00
parent fb44c182fa
commit eb5792bbe0
3 changed files with 26 additions and 18 deletions

View File

@ -2,4 +2,4 @@ package it.cavallium.dbengine.database;
import org.rocksdb.TableProperties;
public record TableWithProperties(String name, TableProperties properties) {}
public record TableWithProperties(String column, String table, TableProperties properties) {}

View File

@ -37,7 +37,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
@ -69,7 +68,6 @@ import org.rocksdb.OptimisticTransactionDB;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.TableProperties;
import org.rocksdb.TransactionDB;
import org.rocksdb.TransactionDBOptions;
import org.rocksdb.TxnDBWritePolicy;
@ -81,7 +79,6 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@ -850,7 +847,15 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
return Mono
.fromCallable(() -> {
if (!closed) {
return db.getProperty("rocksdb.stats");
StringBuilder aggregatedStats = new StringBuilder();
for (var entry : this.handles.entrySet()) {
aggregatedStats
.append(entry.getKey().name())
.append("\n")
.append(db.getProperty(entry.getValue(), "rocksdb.stats"))
.append("\n");
}
return aggregatedStats.toString();
} else {
return null;
}
@ -861,18 +866,21 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
@Override
public Flux<TableWithProperties> getTableProperties() {
return Mono
.fromCallable(() -> {
if (!closed) {
return db.getPropertiesOfAllTables();
} else {
return null;
}
})
.flatMapIterable(Map::entrySet)
.map(entry -> new TableWithProperties(entry.getKey(), entry.getValue()))
.onErrorMap(cause -> new IOException("Failed to read stats", cause))
.subscribeOn(dbRScheduler);
return Flux
.fromIterable(handles.entrySet())
.flatMapSequential(handle -> Mono
.fromCallable(() -> {
if (!closed) {
return db.getPropertiesOfAllTables(handle.getValue());
} else {
return null;
}
})
.subscribeOn(dbRScheduler)
.flatMapIterable(Map::entrySet)
.map(entry -> new TableWithProperties(handle.getKey().name(), entry.getKey(), entry.getValue()))
)
.onErrorMap(cause -> new IOException("Failed to read stats", cause));
}
@Override

View File

@ -81,7 +81,7 @@ public class LLLocalLuceneIndex implements LLLuceneIndex {
protected static final Logger logger = LogManager.getLogger(LLLocalLuceneIndex.class);
private static final ReentrantLock shutdownLock = new ReentrantLock();
private final ReentrantLock shutdownLock = new ReentrantLock();
/**
* Global lucene index scheduler.
* There is only a single thread globally to not overwhelm the disk with