Clean rocksdb code
This commit is contained in:
parent
2a47e6f3fd
commit
c85dcfb54e
@ -38,6 +38,7 @@ import org.rocksdb.DBOptions;
|
|||||||
import org.rocksdb.DbPath;
|
import org.rocksdb.DbPath;
|
||||||
import org.rocksdb.FlushOptions;
|
import org.rocksdb.FlushOptions;
|
||||||
import org.rocksdb.LRUCache;
|
import org.rocksdb.LRUCache;
|
||||||
|
import org.rocksdb.MemoryUtil;
|
||||||
import org.rocksdb.Options;
|
import org.rocksdb.Options;
|
||||||
import org.rocksdb.RateLimiter;
|
import org.rocksdb.RateLimiter;
|
||||||
import org.rocksdb.RocksDB;
|
import org.rocksdb.RocksDB;
|
||||||
@ -121,9 +122,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
|||||||
|
|
||||||
createIfNotExists(descriptors, rocksdbOptions, databaseOptions, dbPath, dbPathString);
|
createIfNotExists(descriptors, rocksdbOptions, databaseOptions, dbPath, dbPathString);
|
||||||
|
|
||||||
// Create all column families that don't exist
|
|
||||||
createAllColumns(descriptors, rocksdbOptions, databaseOptions, dbPathString);
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
// a factory method that returns a RocksDB instance
|
// a factory method that returns a RocksDB instance
|
||||||
@ -181,9 +179,12 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
|||||||
flushDb(db, handles);
|
flushDb(db, handles);
|
||||||
|
|
||||||
for (ColumnFamilyHandle handle : handles) {
|
for (ColumnFamilyHandle handle : handles) {
|
||||||
handle.close();
|
try {
|
||||||
|
handle.close();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
logger.error("Can't close column family", ex);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
db.closeE();
|
db.closeE();
|
||||||
} catch (RocksDBException ex) {
|
} catch (RocksDBException ex) {
|
||||||
@ -204,11 +205,14 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
|||||||
|
|
||||||
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
|
private void flushDb(RocksDB db, List<ColumnFamilyHandle> handles) throws RocksDBException {
|
||||||
// force flush the database
|
// force flush the database
|
||||||
for (int i = 0; i < 2; i++) {
|
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
|
||||||
db.flush(new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true), handles);
|
db.flush(flushOptions);
|
||||||
db.flushWal(true);
|
|
||||||
db.syncWal();
|
|
||||||
}
|
}
|
||||||
|
try (var flushOptions = new FlushOptions().setWaitForFlush(true).setAllowWriteStall(true)) {
|
||||||
|
db.flush(flushOptions, handles);
|
||||||
|
}
|
||||||
|
db.flushWal(true);
|
||||||
|
db.syncWal();
|
||||||
// end force flush
|
// end force flush
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -254,6 +258,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
|||||||
// that determines the behaviour of the database.
|
// that determines the behaviour of the database.
|
||||||
var options = new Options();
|
var options = new Options();
|
||||||
options.setCreateIfMissing(true);
|
options.setCreateIfMissing(true);
|
||||||
|
options.setCreateMissingColumnFamilies(true);
|
||||||
options.setCompactionStyle(CompactionStyle.LEVEL);
|
options.setCompactionStyle(CompactionStyle.LEVEL);
|
||||||
options.setTargetFileSizeBase(64 * 1024 * 1024); // 64MiB sst file
|
options.setTargetFileSizeBase(64 * 1024 * 1024); // 64MiB sst file
|
||||||
options.setTargetFileSizeMultiplier(2); // Each level is 2 times the previous level
|
options.setTargetFileSizeMultiplier(2); // Each level is 2 times the previous level
|
||||||
@ -362,52 +367,6 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase {
|
|||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createAllColumns(List<ColumnFamilyDescriptor> totalDescriptors,
|
|
||||||
Options options,
|
|
||||||
DatabaseOptions databaseOptions,
|
|
||||||
String dbPathString) throws RocksDBException {
|
|
||||||
if (databaseOptions.inMemory()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
List<byte[]> columnFamiliesToCreate = new LinkedList<>();
|
|
||||||
|
|
||||||
for (ColumnFamilyDescriptor descriptor : totalDescriptors) {
|
|
||||||
columnFamiliesToCreate.add(descriptor.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
List<byte[]> existingColumnFamilies = RocksDB.listColumnFamilies(options, dbPathString);
|
|
||||||
|
|
||||||
columnFamiliesToCreate.removeIf((columnFamilyName) -> {
|
|
||||||
for (byte[] cfn : existingColumnFamilies) {
|
|
||||||
if (Arrays.equals(cfn, columnFamilyName)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
});
|
|
||||||
|
|
||||||
List<ColumnFamilyDescriptor> descriptors = new LinkedList<>();
|
|
||||||
descriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
|
|
||||||
for (byte[] existingColumnFamily : existingColumnFamilies) {
|
|
||||||
descriptors.add(new ColumnFamilyDescriptor(existingColumnFamily));
|
|
||||||
}
|
|
||||||
|
|
||||||
var handles = new LinkedList<ColumnFamilyHandle>();
|
|
||||||
|
|
||||||
/*
|
|
||||||
SkipStatsUpdateOnDbOpen = true because this RocksDB.open session is used only to add just some columns
|
|
||||||
*/
|
|
||||||
//var dbOptionsFastLoadSlowEdit = new DBOptions(options.setSkipStatsUpdateOnDbOpen(true));
|
|
||||||
|
|
||||||
this.db = RocksDB.open(new DBOptions(options), dbPathString, descriptors, handles);
|
|
||||||
|
|
||||||
for (byte[] name : columnFamiliesToCreate) {
|
|
||||||
db.createColumnFamily(new ColumnFamilyDescriptor(name)).close();
|
|
||||||
}
|
|
||||||
|
|
||||||
flushAndCloseDb(db, handles);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void createInMemoryColumns(List<ColumnFamilyDescriptor> totalDescriptors,
|
private void createInMemoryColumns(List<ColumnFamilyDescriptor> totalDescriptors,
|
||||||
DatabaseOptions databaseOptions,
|
DatabaseOptions databaseOptions,
|
||||||
List<ColumnFamilyHandle> handles)
|
List<ColumnFamilyHandle> handles)
|
||||||
|
Loading…
Reference in New Issue
Block a user