From e962ae63369672b519424ebfef501c0215fbcb76 Mon Sep 17 00:00:00 2001 From: Andrea Cavalli Date: Sat, 30 Apr 2022 01:49:44 +0200 Subject: [PATCH] Secure database shutdown, deduplicate compaction script --- .../database/disk/AbstractRocksDBColumn.java | 922 +++++++++--------- .../disk/LLLocalKeyValueDatabase.java | 113 +-- .../disk/OptimisticRocksDBColumn.java | 5 +- .../disk/PessimisticRocksDBColumn.java | 5 +- .../dbengine/database/disk/RocksDBColumn.java | 3 +- .../dbengine/database/disk/RocksDBUtils.java | 102 ++ .../database/disk/StandardRocksDBColumn.java | 5 +- 7 files changed, 596 insertions(+), 559 deletions(-) create mode 100644 src/main/java/it/cavallium/dbengine/database/disk/RocksDBUtils.java diff --git a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java index 5c006aa..60ebc25 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/AbstractRocksDBColumn.java @@ -23,14 +23,18 @@ import it.cavallium.dbengine.database.SafeCloseable; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithRelease; import it.cavallium.dbengine.database.disk.LLLocalDictionary.ReleasableSliceImplWithoutRelease; import it.cavallium.dbengine.database.serialization.SerializationException; +import it.cavallium.dbengine.rpc.current.data.Column; +import it.cavallium.dbengine.rpc.current.data.NamedColumnOptions; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.StampedLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -72,7 +76,7 @@ public sealed abstract class AbstractRocksDBColumn implements private final ColumnFamilyHandle cfh; protected final MeterRegistry meterRegistry; - protected final Lock accessibilityLock; + protected final StampedLock closeLock; protected final String columnName; protected final DistributionSummary keyBufferSize; @@ -106,7 +110,7 @@ public sealed abstract class AbstractRocksDBColumn implements String databaseName, ColumnFamilyHandle cfh, MeterRegistry meterRegistry, - Lock accessibilityLock) { + StampedLock closeLock) { this.db = db; this.nettyDirect = nettyDirect && alloc.getAllocationType() == OFF_HEAP; this.alloc = alloc; @@ -119,7 +123,7 @@ public sealed abstract class AbstractRocksDBColumn implements } this.columnName = columnName; this.meterRegistry = meterRegistry; - this.accessibilityLock = accessibilityLock; + this.closeLock = closeLock; this.keyBufferSize = DistributionSummary .builder("buffer.size.distribution") @@ -327,24 +331,261 @@ public sealed abstract class AbstractRocksDBColumn implements return cfh; } - @Override - public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key) - throws RocksDBException { + protected void ensureOpen() { if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called dbGet in a nonblocking thread"); + throw new UnsupportedOperationException("Called in a nonblocking thread"); } - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + ensureOwned(db); + ensureOwned(cfh); + } + + protected void ensureOwned(org.rocksdb.RocksObject rocksObject) { + if (!rocksObject.isOwningHandle()) { + throw new IllegalStateException("Not owning handle"); } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - keyBufferSize.record(key.readableBytes()); - int readAttemptsCount = 0; + } + + @Override + public @Nullable Buffer get(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + var closeReadLock = closeLock.readLock(); try { + ensureOpen(); + ensureOwned(readOptions); + keyBufferSize.record(key.readableBytes()); + int readAttemptsCount = 0; + try { + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer; + boolean mustCloseKey; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); + } + + try { + // Create a direct result buffer because RocksDB works only with direct buffers + var resultBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); + try { + assert resultBuffer.readerOffset() == 0; + assert resultBuffer.writerOffset() == 0; + var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); + + var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), + resultWritable.clear()); + KeyMayExistEnum keyMayExistState = keyMayExist.exists; + int keyMayExistValueLength = keyMayExist.valueLength; + // At the beginning, size reflects the expected size, then it becomes the real data size + int size = keyMayExistState == kExistsWithValue ? keyMayExistValueLength : -1; + boolean isKExistsWithoutValue = false; + switch (keyMayExistState) { + case kNotExist: { + readValueNotFoundWithBloomBufferSize.record(0); + resultBuffer.close(); + return null; + } + // todo: kExistsWithValue is not reliable (read below), + // in some cases it should be treated as kExistsWithoutValue + case kExistsWithValue: + case kExistsWithoutValue: { + if (keyMayExistState == kExistsWithoutValue) { + isKExistsWithoutValue = true; + } else { + // todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken, + // and sometimes it returns an empty array, as if it exists + if (size == 0 || resultWritable.limit() == 0) { + isKExistsWithoutValue = true; + } + } + if (isKExistsWithoutValue) { + assert keyMayExistValueLength == 0; + resultWritable.clear(); + readAttemptsCount++; + // real data size + size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + if (size == RocksDB.NOT_FOUND) { + resultBuffer.close(); + readValueNotFoundWithMayExistBloomBufferSize.record(0); + return null; + } + } + } + default: { + // real data size + assert size >= 0; + if (size <= resultWritable.limit()) { + if (isKExistsWithoutValue) { + readValueFoundWithBloomUncachedBufferSize.record(size); + } else { + readValueFoundWithBloomCacheBufferSize.record(size); + } + assert size == resultWritable.limit(); + return resultBuffer.writerOffset(resultWritable.limit()); + } else { + resultBuffer.ensureWritable(size); + resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); + assert resultBuffer.readerOffset() == 0; + assert resultBuffer.writerOffset() == 0; + + readAttemptsCount++; + size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); + if (size == RocksDB.NOT_FOUND) { + readValueNotFoundWithMayExistBloomBufferSize.record(0); + resultBuffer.close(); + return null; + } + assert size == resultWritable.limit(); + if (isKExistsWithoutValue) { + readValueFoundWithBloomUncachedBufferSize.record(size); + } else { + readValueFoundWithBloomCacheBufferSize.record(size); + } + return resultBuffer.writerOffset(resultWritable.limit()); + } + } + } + } catch (Throwable t) { + resultBuffer.close(); + throw t; + } + } finally { + if (mustCloseKey) { + key.close(); + } + } + } else { + try { + byte[] keyArray = LLUtils.toArray(key); + requireNonNull(keyArray); + Holder data = new Holder<>(); + if (db.keyMayExist(cfh, readOptions, keyArray, data)) { + // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it + // returns an empty array, as if it exists + if (data.getValue() != null && data.getValue().length > 0) { + readValueFoundWithBloomCacheBufferSize.record(data.getValue().length); + return LLUtils.fromByteArray(alloc, data.getValue()); + } else { + readAttemptsCount++; + byte[] result = db.get(cfh, readOptions, keyArray); + if (result == null) { + if (data.getValue() != null) { + readValueNotFoundWithBloomBufferSize.record(0); + } else { + readValueNotFoundWithMayExistBloomBufferSize.record(0); + } + return null; + } else { + readValueFoundWithBloomUncachedBufferSize.record(0); + return LLUtils.fromByteArray(alloc, result); + } + } + } else { + readValueNotFoundWithBloomBufferSize.record(0); + return null; + } + } finally { + if (!(readOptions instanceof UnreleasableReadOptions)) { + readOptions.close(); + } + } + } + } finally { + readAttempts.record(readAttemptsCount); + } + } finally { + closeLock.unlockRead(closeReadLock); + } + } + + @Override + public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException { + var closeReadLock = closeLock.readLock(); + try { + try { + ensureOpen(); + ensureOwned(writeOptions); + assert key.isAccessible(); + assert value.isAccessible(); + this.keyBufferSize.record(key.readableBytes()); + this.writeValueBufferSize.record(value.readableBytes()); + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer; + boolean mustCloseKey; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); + } + try { + // Get the value nio buffer to pass to RocksDB + ByteBuffer valueNioBuffer; + boolean mustCloseValue; + { + if (!LLUtils.isReadOnlyDirect(value)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseValue = true; + var directValue = DefaultBufferAllocators.offHeapAllocator().allocate(value.readableBytes()); + value.copyInto(value.readerOffset(), directValue, 0, value.readableBytes()); + value = directValue; + } else { + mustCloseValue = false; + } + valueNioBuffer = ((ReadableComponent) value).readableBuffer(); + assert valueNioBuffer.isDirect(); + assert valueNioBuffer.limit() == value.readableBytes(); + } + + try { + db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); + } finally { + if (mustCloseValue) { + value.close(); + } + } + } finally { + if (mustCloseKey) { + key.close(); + } + } + } else { + db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); + } + } finally { + if (!(writeOptions instanceof UnreleasableWriteOptions)) { + writeOptions.close(); + } + } + } finally { + closeLock.unlockRead(closeReadLock); + } + } + + @Override + public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(readOptions); if (nettyDirect) { // Get the key nio buffer to pass to RocksDB ByteBuffer keyNioBuffer; @@ -363,90 +604,20 @@ public sealed abstract class AbstractRocksDBColumn implements assert keyNioBuffer.isDirect(); assert keyNioBuffer.limit() == key.readableBytes(); } - try { - // Create a direct result buffer because RocksDB works only with direct buffers - var resultBuffer = alloc.allocate(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); - try { - assert resultBuffer.readerOffset() == 0; - assert resultBuffer.writerOffset() == 0; - var resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); - - var keyMayExist = db.keyMayExist(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); - KeyMayExistEnum keyMayExistState = keyMayExist.exists; - int keyMayExistValueLength = keyMayExist.valueLength; - // At the beginning, size reflects the expected size, then it becomes the real data size - int size = keyMayExistState == kExistsWithValue ? keyMayExistValueLength : -1; - boolean isKExistsWithoutValue = false; - switch (keyMayExistState) { - case kNotExist: { - readValueNotFoundWithBloomBufferSize.record(0); - resultBuffer.close(); - return null; - } - // todo: kExistsWithValue is not reliable (read below), - // in some cases it should be treated as kExistsWithoutValue - case kExistsWithValue: - case kExistsWithoutValue: { - if (keyMayExistState == kExistsWithoutValue) { - isKExistsWithoutValue = true; - } else { - // todo: "size == 0 || resultWritable.limit() == 0" is checked because keyMayExist is broken, - // and sometimes it returns an empty array, as if it exists - if (size == 0 || resultWritable.limit() == 0) { - isKExistsWithoutValue = true; - } - } - if (isKExistsWithoutValue) { - assert keyMayExistValueLength == 0; - resultWritable.clear(); - readAttemptsCount++; - // real data size - size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); - if (size == RocksDB.NOT_FOUND) { - resultBuffer.close(); - readValueNotFoundWithMayExistBloomBufferSize.record(0); - return null; - } - } - } - default: { - // real data size - assert size >= 0; - if (size <= resultWritable.limit()) { - if (isKExistsWithoutValue) { - readValueFoundWithBloomUncachedBufferSize.record(size); - } else { - readValueFoundWithBloomCacheBufferSize.record(size); - } - assert size == resultWritable.limit(); - return resultBuffer.writerOffset(resultWritable.limit()); - } else { - resultBuffer.ensureWritable(size); - resultWritable = ((WritableComponent) resultBuffer).writableBuffer(); - assert resultBuffer.readerOffset() == 0; - assert resultBuffer.writerOffset() == 0; - - readAttemptsCount++; - size = db.get(cfh, readOptions, keyNioBuffer.rewind(), resultWritable.clear()); - if (size == RocksDB.NOT_FOUND) { - readValueNotFoundWithMayExistBloomBufferSize.record(0); - resultBuffer.close(); - return null; - } - assert size == resultWritable.limit(); - if (isKExistsWithoutValue) { - readValueFoundWithBloomUncachedBufferSize.record(size); - } else { - readValueFoundWithBloomCacheBufferSize.record(size); - } - return resultBuffer.writerOffset(resultWritable.limit()); - } - } + if (db.keyMayExist(cfh, keyNioBuffer)) { + int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER); + boolean found = size != RocksDB.NOT_FOUND; + if (found) { + readValueFoundWithBloomSimpleBufferSize.record(size); + return true; + } else { + readValueNotFoundWithMayExistBloomBufferSize.record(0); + return false; } - } catch (Throwable t) { - resultBuffer.close(); - throw t; + } else { + readValueNotFoundWithBloomBufferSize.record(0); + return false; } } finally { if (mustCloseKey) { @@ -454,35 +625,76 @@ public sealed abstract class AbstractRocksDBColumn implements } } } else { + int size = RocksDB.NOT_FOUND; + byte[] keyBytes = LLUtils.toArray(key); + Holder data = new Holder<>(); + boolean mayExistHit = false; try { - byte[] keyArray = LLUtils.toArray(key); - requireNonNull(keyArray); - Holder data = new Holder<>(); - if (db.keyMayExist(cfh, readOptions, keyArray, data)) { - // todo: "data.getValue().length > 0" is checked because keyMayExist is broken, and sometimes it - // returns an empty array, as if it exists - if (data.getValue() != null && data.getValue().length > 0) { - readValueFoundWithBloomCacheBufferSize.record(data.getValue().length); - return LLUtils.fromByteArray(alloc, data.getValue()); + if (db.keyMayExist(cfh, readOptions, keyBytes, data)) { + mayExistHit = true; + if (data.getValue() != null) { + size = data.getValue().length; } else { - readAttemptsCount++; - byte[] result = db.get(cfh, readOptions, keyArray); - if (result == null) { - if (data.getValue() != null) { - readValueNotFoundWithBloomBufferSize.record(0); - } else { - readValueNotFoundWithMayExistBloomBufferSize.record(0); - } - return null; - } else { - readValueFoundWithBloomUncachedBufferSize.record(0); - return LLUtils.fromByteArray(alloc, result); - } + size = db.get(cfh, readOptions, keyBytes, NO_DATA); } + } + } finally { + if (!(readOptions instanceof UnreleasableReadOptions)) { + readOptions.close(); + } + } + boolean found = size != RocksDB.NOT_FOUND; + if (found) { + readValueFoundWithBloomSimpleBufferSize.record(size); + } else { + if (mayExistHit) { + readValueNotFoundWithMayExistBloomBufferSize.record(0); } else { readValueNotFoundWithBloomBufferSize.record(0); - return null; } + } + return found; + } + } finally { + closeLock.unlockRead(closeReadLock); + } + } + + @Override + public boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(readOptions); + if (nettyDirect) { + // Get the key nio buffer to pass to RocksDB + ByteBuffer keyNioBuffer; + boolean mustCloseKey; + { + if (!LLUtils.isReadOnlyDirect(key)) { + // If the nio buffer is not available, copy the netty buffer into a new direct buffer + mustCloseKey = true; + var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); + key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); + key = directKey; + } else { + mustCloseKey = false; + } + keyNioBuffer = ((ReadableComponent) key).readableBuffer(); + assert keyNioBuffer.isDirect(); + assert keyNioBuffer.limit() == key.readableBytes(); + } + try { + return db.keyMayExist(cfh, keyNioBuffer); + } finally { + if (mustCloseKey) { + key.close(); + } + } + } else { + byte[] keyBytes = LLUtils.toArray(key); + try { + return db.keyMayExist(cfh, readOptions, keyBytes, null); } finally { if (!(readOptions instanceof UnreleasableReadOptions)) { readOptions.close(); @@ -490,29 +702,17 @@ public sealed abstract class AbstractRocksDBColumn implements } } } finally { - readAttempts.record(readAttemptsCount); + closeLock.unlockRead(closeReadLock); } } @Override - public void put(@NotNull WriteOptions writeOptions, Buffer key, Buffer value) throws RocksDBException { + public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException { + var closeReadLock = closeLock.readLock(); try { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called dbPut in a nonblocking thread"); - } - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!writeOptions.isOwningHandle()) { - throw new IllegalStateException("WriteOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - assert key.isAccessible(); - assert value.isAccessible(); - this.keyBufferSize.record(key.readableBytes()); - this.writeValueBufferSize.record(value.readableBytes()); + ensureOpen(); + ensureOwned(writeOptions); + keyBufferSize.record(key.readableBytes()); if (nettyDirect) { // Get the key nio buffer to pass to RocksDB ByteBuffer keyNioBuffer; @@ -532,331 +732,117 @@ public sealed abstract class AbstractRocksDBColumn implements assert keyNioBuffer.limit() == key.readableBytes(); } try { - // Get the value nio buffer to pass to RocksDB - ByteBuffer valueNioBuffer; - boolean mustCloseValue; - { - if (!LLUtils.isReadOnlyDirect(value)) { - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - mustCloseValue = true; - var directValue = DefaultBufferAllocators.offHeapAllocator().allocate(value.readableBytes()); - value.copyInto(value.readerOffset(), directValue, 0, value.readableBytes()); - value = directValue; - } else { - mustCloseValue = false; - } - valueNioBuffer = ((ReadableComponent) value).readableBuffer(); - assert valueNioBuffer.isDirect(); - assert valueNioBuffer.limit() == value.readableBytes(); - } - - try { - db.put(cfh, writeOptions, keyNioBuffer, valueNioBuffer); - } finally { - if (mustCloseValue) { - value.close(); - } - } + db.delete(cfh, writeOptions, keyNioBuffer); } finally { if (mustCloseKey) { key.close(); } } } else { - db.put(cfh, writeOptions, LLUtils.toArray(key), LLUtils.toArray(value)); + db.delete(cfh, writeOptions, LLUtils.toArray(key)); } } finally { - if (!(writeOptions instanceof UnreleasableWriteOptions)) { - writeOptions.close(); - } - } - } - - @Override - public boolean exists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); - } - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - if (nettyDirect) { - // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer; - boolean mustCloseKey; - { - if (!LLUtils.isReadOnlyDirect(key)) { - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - mustCloseKey = true; - var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); - key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); - key = directKey; - } else { - mustCloseKey = false; - } - keyNioBuffer = ((ReadableComponent) key).readableBuffer(); - assert keyNioBuffer.isDirect(); - assert keyNioBuffer.limit() == key.readableBytes(); - } - try { - if (db.keyMayExist(cfh, keyNioBuffer)) { - int size = db.get(cfh, readOptions, keyNioBuffer.position(0), LLUtils.EMPTY_BYTE_BUFFER); - boolean found = size != RocksDB.NOT_FOUND; - if (found) { - readValueFoundWithBloomSimpleBufferSize.record(size); - return true; - } else { - readValueNotFoundWithMayExistBloomBufferSize.record(0); - return false; - } - } else { - readValueNotFoundWithBloomBufferSize.record(0); - return false; - } - } finally { - if (mustCloseKey) { - key.close(); - } - } - } else { - int size = RocksDB.NOT_FOUND; - byte[] keyBytes = LLUtils.toArray(key); - Holder data = new Holder<>(); - boolean mayExistHit = false; - try { - if (db.keyMayExist(cfh, readOptions, keyBytes, data)) { - mayExistHit = true; - if (data.getValue() != null) { - size = data.getValue().length; - } else { - size = db.get(cfh, readOptions, keyBytes, NO_DATA); - } - } - } finally { - if (!(readOptions instanceof UnreleasableReadOptions)) { - readOptions.close(); - } - } - boolean found = size != RocksDB.NOT_FOUND; - if (found) { - readValueFoundWithBloomSimpleBufferSize.record(size); - } else { - if (mayExistHit) { - readValueNotFoundWithMayExistBloomBufferSize.record(0); - } else { - readValueNotFoundWithBloomBufferSize.record(0); - } - } - return found; - } - } - - @Override - public boolean mayExists(@NotNull ReadOptions readOptions, Buffer key) throws RocksDBException { - if (Schedulers.isInNonBlockingThread()) { - throw new UnsupportedOperationException("Called containsKey in a nonblocking thread"); - } - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - if (nettyDirect) { - // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer; - boolean mustCloseKey; - { - if (!LLUtils.isReadOnlyDirect(key)) { - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - mustCloseKey = true; - var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); - key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); - key = directKey; - } else { - mustCloseKey = false; - } - keyNioBuffer = ((ReadableComponent) key).readableBuffer(); - assert keyNioBuffer.isDirect(); - assert keyNioBuffer.limit() == key.readableBytes(); - } - try { - return db.keyMayExist(cfh, keyNioBuffer); - } finally { - if (mustCloseKey) { - key.close(); - } - } - } else { - byte[] keyBytes = LLUtils.toArray(key); - try { - return db.keyMayExist(cfh, readOptions, keyBytes, null); - } finally { - if (!(readOptions instanceof UnreleasableReadOptions)) { - readOptions.close(); - } - } - } - } - - @Override - public void delete(WriteOptions writeOptions, Buffer key) throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!writeOptions.isOwningHandle()) { - throw new IllegalStateException("WriteOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - keyBufferSize.record(key.readableBytes()); - if (nettyDirect) { - // Get the key nio buffer to pass to RocksDB - ByteBuffer keyNioBuffer; - boolean mustCloseKey; - { - if (!LLUtils.isReadOnlyDirect(key)) { - // If the nio buffer is not available, copy the netty buffer into a new direct buffer - mustCloseKey = true; - var directKey = DefaultBufferAllocators.offHeapAllocator().allocate(key.readableBytes()); - key.copyInto(key.readerOffset(), directKey, 0, key.readableBytes()); - key = directKey; - } else { - mustCloseKey = false; - } - keyNioBuffer = ((ReadableComponent) key).readableBuffer(); - assert keyNioBuffer.isDirect(); - assert keyNioBuffer.limit() == key.readableBytes(); - } - try { - db.delete(cfh, writeOptions, keyNioBuffer); - } finally { - if (mustCloseKey) { - key.close(); - } - } - } else { - db.delete(cfh, writeOptions, LLUtils.toArray(key)); + closeLock.unlockRead(closeReadLock); } } @Override public void delete(WriteOptions writeOptions, byte[] key) throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(writeOptions); + keyBufferSize.record(key.length); + db.delete(cfh, writeOptions, key); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!writeOptions.isOwningHandle()) { - throw new IllegalStateException("WriteOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - keyBufferSize.record(key.length); - db.delete(cfh, writeOptions, key); } @Override public List multiGetAsList(ReadOptions readOptions, List keys) throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(readOptions); + for (byte[] key : keys) { + keyBufferSize.record(key.length); + } + var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size()); + return db.multiGetAsList(readOptions, columnFamilyHandles, keys); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - for (byte[] key : keys) { - keyBufferSize.record(key.length); - } - var columnFamilyHandles = new RepeatedElementList<>(cfh, keys.size()); - return db.multiGetAsList(readOptions, columnFamilyHandles, keys); } @Override public void suggestCompactRange() throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + db.suggestCompactRange(cfh); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - db.suggestCompactRange(cfh); } @Override - public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) - throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + public void compactRange(byte[] begin, byte[] end, CompactRangeOptions options) throws RocksDBException { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(options); + db.compactRange(cfh, begin, end, options); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!options.isOwningHandle()) { - throw new IllegalStateException("CompactRangeOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - db.compactRange(cfh, begin, end, options); } @Override public void flush(FlushOptions options) throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(options); + db.flush(options, cfh); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!options.isOwningHandle()) { - throw new IllegalStateException("FlushOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - db.flush(options, cfh); } @Override public void flushWal(boolean sync) throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + db.flushWal(sync); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - db.flushWal(sync); } @Override public long getLongProperty(String property) throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + return db.getLongProperty(cfh, property); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - return db.getLongProperty(cfh, property); } @Override public void write(WriteOptions writeOptions, WriteBatch writeBatch) throws RocksDBException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(writeOptions); + ensureOwned(writeBatch); + db.write(writeOptions, writeBatch); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!writeOptions.isOwningHandle()) { - throw new IllegalStateException("WriteOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - db.write(writeOptions, writeBatch); } /** @@ -872,27 +858,23 @@ public sealed abstract class AbstractRocksDBColumn implements Buffer key, BinarySerializationFunction updater, UpdateAtomicResultMode returnMode) throws IOException { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); - } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } + var closeReadLock = closeLock.readLock(); try { - keyBufferSize.record(key.readableBytes()); - startedUpdate.increment(); - accessibilityLock.lock(); - return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode); - } catch (IOException e) { - throw e; - } catch (Exception e) { - throw new IOException(e); + ensureOpen(); + ensureOwned(readOptions); + try { + keyBufferSize.record(key.readableBytes()); + startedUpdate.increment(); + return updateAtomicImpl(readOptions, writeOptions, key, updater, returnMode); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException(e); + } finally { + endedUpdate.increment(); + } } finally { - accessibilityLock.unlock(); - endedUpdate.increment(); + closeLock.unlockRead(closeReadLock); } } @@ -923,24 +905,23 @@ public sealed abstract class AbstractRocksDBColumn implements @Override @NotNull public RocksDBIterator newIterator(@NotNull ReadOptions readOptions) { - if (!db.isOwningHandle()) { - throw new IllegalStateException("Database is closed"); + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + ensureOwned(readOptions); + var it = db.newIterator(cfh, readOptions); + return new RocksDBIterator(it, + nettyDirect, + this.startedIterSeek, + this.endedIterSeek, + this.iterSeekTime, + this.startedIterNext, + this.endedIterNext, + this.iterNextTime + ); + } finally { + closeLock.unlockRead(closeReadLock); } - if (!readOptions.isOwningHandle()) { - throw new IllegalStateException("ReadOptions is closed"); - } - if (!cfh.isOwningHandle()) { - throw new IllegalStateException("Column family is closed"); - } - return new RocksDBIterator(db.newIterator(cfh, readOptions), - nettyDirect, - this.startedIterSeek, - this.endedIterSeek, - this.iterSeekTime, - this.startedIterNext, - this.endedIterNext, - this.iterNextTime - ); } protected final Buffer applyUpdateAndCloseIfNecessary(BinarySerializationFunction updater, @@ -957,31 +938,18 @@ public sealed abstract class AbstractRocksDBColumn implements return newData; } + protected int getLastLevel() { + return RocksDBUtils.getLastLevel(db, cfh); + } + @Override - public final void forceCompaction(int volumeId) throws RocksDBException { - List files = new ArrayList<>(); - var meta = db.getColumnFamilyMetaData(cfh); - int bottommostLevel = -1; - for (LevelMetaData level : meta.levels()) { - bottommostLevel = Math.max(bottommostLevel, level.level()); - } - int count = 0; - x: for (LevelMetaData level : meta.levels()) { - for (SstFileMetaData file : level.files()) { - if (file.fileName().endsWith(".sst")) { - files.add(file.fileName()); - count++; - if (count >= 4) { - break x; - } - } - } - } - try (var co = new CompactionOptions()) { - if (!files.isEmpty() && bottommostLevel != -1) { - db.compactFiles(co, cfh, files, bottommostLevel, volumeId, null); - } - db.compactRange(cfh); + public final void forceCompaction(int volumeId) { + var closeReadLock = closeLock.readLock(); + try { + ensureOpen(); + RocksDBUtils.forceCompaction(db, db.getName(), cfh, volumeId, logger); + } finally { + closeLock.unlockRead(closeReadLock); } } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 9d6687c..f4fcb3e 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -40,6 +40,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -47,8 +48,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.StampedLock; import java.util.stream.Collectors; -import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.LogManager; @@ -62,7 +63,6 @@ import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactRangeOptions; -import org.rocksdb.CompactRangeOptions.BottommostLevelCompaction; import org.rocksdb.CompactionJobInfo; import org.rocksdb.CompactionOptions; import org.rocksdb.CompactionPriority; @@ -132,7 +132,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private final HashMap persistentCaches; private final ConcurrentHashMap snapshotsHandles = new ConcurrentHashMap<>(); private final AtomicLong nextSnapshotNumbers = new AtomicLong(1); - private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock(); + private final StampedLock closeLock = new StampedLock(); private volatile boolean closed = false; @SuppressWarnings("SwitchStatementWithTooFewBranches") @@ -253,6 +253,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } if (!columnOptions.levels().isEmpty()) { + columnFamilyOptions.setNumLevels(columnOptions.levels().size()); var firstLevelOptions = getRocksLevelOptions(columnOptions.levels().get(0)); columnFamilyOptions.setCompressionType(firstLevelOptions.compressionType); columnFamilyOptions.setCompressionOptions(firstLevelOptions.compressionOptions); @@ -269,9 +270,9 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { .map(v -> v.compression().getType()) .toList()); } else { - columnFamilyOptions.setNumLevels(7); - List compressionTypes = new ArrayList<>(7); - for (int i = 0; i < 7; i++) { + columnFamilyOptions.setNumLevels(6); + List compressionTypes = new ArrayList<>(6); + for (int i = 0; i < 6; i++) { if (i < 2) { compressionTypes.add(CompressionType.NO_COMPRESSION); } else { @@ -560,60 +561,27 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return paths.size() - 1; } - public void forceCompaction(int volumeId) throws RocksDBException { - try (var co = new CompactionOptions() - .setCompression(CompressionType.LZ4_COMPRESSION) - .setMaxSubcompactions(0) - .setOutputFileSizeLimit(2 * SizeUnit.GB)) { - for (ColumnFamilyHandle cfh : this.handles.values()) { - List files = new ArrayList<>(); - var meta = db.getColumnFamilyMetaData(cfh); - int bottommostLevel = -1; - for (LevelMetaData level : meta.levels()) { - bottommostLevel = Math.max(bottommostLevel, level.level()); - } - for (LevelMetaData level : meta.levels()) { - if (level.level() < bottommostLevel) { - for (SstFileMetaData file : level.files()) { - if (file.fileName().endsWith(".sst")) { - files.add(file.fileName()); - } - } - } - } - bottommostLevel = Math.max(bottommostLevel, databaseOptions.defaultColumnOptions().levels().size() - 1); + public int getLastLevel(Column column) { + return databaseOptions + .columnOptions() + .stream() + .filter(namedColumnOptions -> namedColumnOptions.columnName().equals(column.name())) + .findFirst() + .map(NamedColumnOptions::levels) + .filter(levels -> !levels.isEmpty()) + .or(() -> Optional.of(databaseOptions.defaultColumnOptions().levels()).filter(levels -> !levels.isEmpty())) + .map(List::size) + .orElse(6); + } - if (!files.isEmpty() && bottommostLevel != -1) { - var partitionSize = files.size() / Runtime.getRuntime().availableProcessors(); - List> partitions; - if (partitionSize > 0) { - partitions = partition(files, files.size() / Runtime.getRuntime().availableProcessors()); - } else { - partitions = List.of(files); - } - int finalBottommostLevel = bottommostLevel; - Mono.when(partitions.stream().map(partition -> Mono.fromCallable(() -> { - logger.info("Compacting {} files in database {} in column family {} to level {}", - partition.size(), - name, - new String(cfh.getName(), StandardCharsets.UTF_8), - finalBottommostLevel - ); - if (!partition.isEmpty()) { - var coi = new CompactionJobInfo(); - db.compactFiles(co, cfh, partition, finalBottommostLevel, volumeId, coi); - logger.info("Compacted {} files in database {} in column family {} to level {}: {}", - partition.size(), - name, - new String(cfh.getName(), StandardCharsets.UTF_8), - finalBottommostLevel, - coi.status().getCodeString() - ); - } - return null; - }).subscribeOn(Schedulers.boundedElastic())).toList()).block(); - } - } + public List getColumnFiles(Column column, boolean excludeLastLevel) { + var cfh = handles.get(column); + return RocksDBUtils.getColumnFiles(db, cfh, excludeLastLevel); + } + + public void forceCompaction(int volumeId) throws RocksDBException { + for (var cfh : this.handles.values()) { + RocksDBUtils.forceCompaction(db, name, cfh, volumeId, logger); } } @@ -660,14 +628,13 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { return name; } - public Lock getAccessibilityLock() { - return shutdownLock.readLock(); + public StampedLock getCloseLock() { + return closeLock; } private void flushAndCloseDb(RocksDB db, Cache standardCache, Cache compressedCache, List handles) throws RocksDBException { - var shutdownWriteLock = shutdownLock.writeLock(); - shutdownWriteLock.lock(); + var closeWriteLock = closeLock.writeLock(); try { if (closed) { return; @@ -715,7 +682,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { } } } finally { - shutdownWriteLock.unlock(); + closeLock.unlockWrite(closeWriteLock); } } @@ -1037,7 +1004,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { private RocksDBColumn getRocksDBColumn(RocksDB db, ColumnFamilyHandle cfh) { var nettyDirect = databaseOptions.allowNettyDirect(); - var accessibilityLock = getAccessibilityLock(); + var closeLock = getCloseLock(); if (db instanceof OptimisticTransactionDB optimisticTransactionDB) { return new OptimisticRocksDBColumn(optimisticTransactionDB, nettyDirect, @@ -1045,7 +1012,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { name, cfh, meterRegistry, - accessibilityLock + closeLock ); } else if (db instanceof TransactionDB transactionDB) { return new PessimisticRocksDBColumn(transactionDB, @@ -1054,10 +1021,10 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { name, cfh, meterRegistry, - accessibilityLock + closeLock ); } else { - return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, accessibilityLock); + return new StandardRocksDBColumn(db, nettyDirect, allocator, name, cfh, meterRegistry, closeLock); } } @@ -1225,8 +1192,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono takeSnapshot() { return Mono .fromCallable(() -> snapshotTime.recordCallable(() -> { - var shutdownReadLock = shutdownLock.readLock(); - shutdownReadLock.lock(); + var closeReadLock = closeLock.readLock(); try { if (closed) { throw new IllegalStateException("Database closed"); @@ -1236,7 +1202,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { this.snapshotsHandles.put(currentSnapshotSequenceNumber, snapshot); return new LLSnapshot(currentSnapshotSequenceNumber); } finally { - shutdownReadLock.unlock(); + closeLock.unlockRead(closeReadLock); } })) .subscribeOn(dbRScheduler); @@ -1246,8 +1212,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { public Mono releaseSnapshot(LLSnapshot snapshot) { return Mono .fromCallable(() -> { - var shutdownReadLock = shutdownLock.readLock(); - shutdownReadLock.lock(); + var closeReadLock = closeLock.readLock(); try { if (closed) { throw new IllegalStateException("Database closed"); @@ -1262,7 +1227,7 @@ public class LLLocalKeyValueDatabase implements LLKeyValueDatabase { db.releaseSnapshot(dbSnapshot); return null; } finally { - shutdownReadLock.unlock(); + closeLock.unlockRead(closeReadLock); } }) .subscribeOn(dbRScheduler); diff --git a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java index a3679f9..533f5e1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/OptimisticRocksDBColumn.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.StampedLock; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; @@ -39,8 +40,8 @@ public final class OptimisticRocksDBColumn extends AbstractRocksDBColumn getColumnFiles(RocksDB db, ColumnFamilyHandle cfh, boolean excludeLastLevel) { + List files = new ArrayList<>(); + var meta = db.getColumnFamilyMetaData(cfh); + var lastLevel = excludeLastLevel ? getLastLevel(db, cfh) : -1; + for (LevelMetaData level : meta.levels()) { + if (!excludeLastLevel || level.level() < lastLevel) { + for (SstFileMetaData file : level.files()) { + if (file.fileName().endsWith(".sst")) { + files.add(file.fileName()); + } + } + } + } + return files; + } + + public static void forceCompaction(RocksDB db, + String logDbName, + ColumnFamilyHandle cfh, + int volumeId, + Logger logger) { + try (var co = new CompactionOptions() + .setCompression(CompressionType.LZ4_COMPRESSION) + .setMaxSubcompactions(0) + .setOutputFileSizeLimit(2 * SizeUnit.GB)) { + List filesToCompact = getColumnFiles(db, cfh, true); + + if (!filesToCompact.isEmpty()) { + var partitionSize = filesToCompact.size() / Runtime.getRuntime().availableProcessors(); + List> partitions; + if (partitionSize > 0) { + partitions = partition(filesToCompact, partitionSize); + } else { + partitions = List.of(filesToCompact); + } + int finalBottommostLevel = getLastLevel(db, cfh); + Mono.whenDelayError(partitions.stream().map(partition -> Mono.fromCallable(() -> { + logger.info("Compacting {} files in database {} in column family {} to level {}", + partition.size(), + logDbName, + new String(cfh.getName(), StandardCharsets.UTF_8), + finalBottommostLevel + ); + if (!partition.isEmpty()) { + var coi = new CompactionJobInfo(); + try { + db.compactFiles(co, cfh, partition, finalBottommostLevel, volumeId, coi); + logger.info("Compacted {} files in database {} in column family {} to level {}: {}", + partition.size(), + logDbName, + new String(cfh.getName(), StandardCharsets.UTF_8), + finalBottommostLevel, + coi.status().getCodeString() + ); + } catch (Throwable ex) { + logger.error("Failed to compact {} files in database {} in column family {} to level {}", + partition.size(), + logDbName, + new String(cfh.getName(), StandardCharsets.UTF_8), + finalBottommostLevel, + ex + ); + } + } + return null; + }).subscribeOn(Schedulers.boundedElastic())).toList()).block(); + } + } + } +} diff --git a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java index 5352aa3..c455c23 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/StandardRocksDBColumn.java @@ -9,6 +9,7 @@ import it.cavallium.dbengine.database.LLDelta; import it.cavallium.dbengine.database.LLUtils; import java.io.IOException; import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.StampedLock; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; @@ -23,8 +24,8 @@ public final class StandardRocksDBColumn extends AbstractRocksDBColumn boolean nettyDirect, BufferAllocator alloc, String dbName, - ColumnFamilyHandle cfh, MeterRegistry meterRegistry, Lock accessibilityLock) { - super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, accessibilityLock); + ColumnFamilyHandle cfh, MeterRegistry meterRegistry, StampedLock closeLock) { + super(db, nettyDirect, alloc, dbName, cfh, meterRegistry, closeLock); } @Override