Close db optionally

This commit is contained in:
Andrea Cavalli 2022-03-08 02:12:13 +01:00
parent f0533a17c9
commit 35a70efec5

View File

@ -50,6 +50,7 @@ public class RocksdbFileStore {
@SuppressWarnings("UnstableApiUsage")
private final Striped<ReadWriteLock> metaLock;
private final ReadWriteLock[] readWriteLocks;
private static final ReadOptions DEFAULT_READ_OPTS = new ReadOptions()
.setVerifyChecksums(false)
@ -66,7 +67,7 @@ public class RocksdbFileStore {
private final ColumnFamilyHandle data;
private final ConcurrentHashMap<String, Long> filenameToId = new ConcurrentHashMap<>();
private final AtomicLong nextId;
private final AtomicLong flushCounter = new AtomicLong();
private final boolean closeDbOnClose;
private volatile boolean closed;
private RocksdbFileStore(RocksDB db,
@ -75,15 +76,22 @@ public class RocksdbFileStore {
ColumnFamilyHandle size,
ColumnFamilyHandle data,
int blockSize,
Striped<ReadWriteLock> metaLock) throws IOException {
Striped<ReadWriteLock> metaLock,
boolean closeDbOnClose) throws IOException {
try {
this.db = db;
this.closeDbOnClose = closeDbOnClose;
this.blockSize = blockSize;
this.headers = headers;
this.filename = filename;
this.size = size;
this.data = data;
this.metaLock = metaLock;
ReadWriteLock[] locks = new ReadWriteLock[metaLock.size()];
for (int i = 0; i < metaLock.size(); i++) {
locks[i] = metaLock.getAt(i);
}
this.readWriteLocks = locks;
byte[] nextIdBytes = db.get(headers, NEXT_ID_KEY);
if (nextIdBytes != null) {
this.nextId = new AtomicLong(Longs.fromByteArray(nextIdBytes));
@ -110,7 +118,7 @@ public class RocksdbFileStore {
options.setWalRecoveryMode(WALRecoveryMode.PointInTimeRecovery);
options.setCreateMissingColumnFamilies(true);
options.setCreateIfMissing(true);
options.setUnorderedWrite(true);
//options.setUnorderedWrite(true);
options.setAvoidUnnecessaryBlockingIO(true);
options.setSkipCheckingSstFileSizesOnDbOpen(true);
options.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
@ -190,7 +198,8 @@ public class RocksdbFileStore {
handles.get(2),
handles.get(3),
blockSize,
metaLock
metaLock,
false
);
} catch (RocksDBException e) {
throw new IOException(e);
@ -212,7 +221,8 @@ public class RocksdbFileStore {
handles.get(2),
handles.get(3),
blockSize,
metaLock
metaLock,
true
);
} catch (RocksDBException e) {
throw new IOException("Failed to open RocksDB meta file store", e);
@ -438,6 +448,7 @@ public class RocksdbFileStore {
ByteBuffer bb = ByteBuffer.wrap(new byte[Long.BYTES + Integer.BYTES]);
bb.order(ByteOrder.LITTLE_ENDIAN);
bb.putLong(id);
bb.order(ByteOrder.BIG_ENDIAN);
bb.putInt(i);
return bb.array();
}
@ -636,12 +647,8 @@ public class RocksdbFileStore {
public void clear() throws IOException {
Lock[] locks = new Lock[metaLock.size()];
for (int i = 0; i < metaLock.size(); i++) {
locks[i] = metaLock.getAt(i).writeLock();
}
for (Lock lock : locks) {
lock.lock();
for (var lock : readWriteLocks) {
lock.writeLock().lock();
}
try {
ensureOpen();
@ -650,26 +657,23 @@ public class RocksdbFileStore {
remove(key);
}
} finally {
for (Lock lock : locks) {
lock.unlock();
for (var lock : readWriteLocks) {
lock.writeLock().unlock();
}
}
}
public List<String> listKey() {
Lock[] locks = new Lock[metaLock.size()];
for (int i = 0; i < metaLock.size(); i++) {
locks[i] = metaLock.getAt(i).readLock();
}
for (Lock lock : locks) {
lock.lock();
ensureOpen();
for (var lock : readWriteLocks) {
lock.readLock().lock();
}
try {
ensureOpen();
return listKeyInternal();
} finally {
for (Lock lock : locks) {
lock.unlock();
for (var lock : readWriteLocks) {
lock.readLock().unlock();
}
}
}
@ -790,22 +794,24 @@ public class RocksdbFileStore {
if (closed) {
return;
}
Lock[] locks = new Lock[metaLock.size()];
for (int i = 0; i < metaLock.size(); i++) {
locks[i] = metaLock.getAt(i).writeLock();
}
for (Lock lock : locks) {
lock.lock();
for (var lock : readWriteLocks) {
lock.writeLock().lock();
}
try {
if (closed) {
return;
}
db.close();
closed = true;
if (closeDbOnClose) {
try {
db.closeE();
} catch (RocksDBException e) {
throw new IOException(e);
}
}
} finally {
for (Lock lock : locks) {
lock.unlock();
for (var lock : readWriteLocks) {
lock.writeLock().unlock();
}
}
}