Code cleanup

This commit is contained in:
Andrea Cavalli 2021-08-16 10:36:54 +02:00
parent 435e7d4886
commit 6ed31ab2a6

View File

@ -82,7 +82,8 @@ public class LLLocalDictionary implements LLDictionary {
static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions(); static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions();
static final boolean PREFER_SEEK_TO_FIRST = false; static final boolean PREFER_SEEK_TO_FIRST = false;
/** /**
* It used to be false, now it's true to avoid crashes during iterations on completely corrupted files * It used to be false,
* now it's true to avoid crashes during iterations on completely corrupted files
*/ */
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true; static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true; public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
@ -180,7 +181,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
/** /**
* Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions! * Please don't modify the returned ReadOptions!
* If you want to modify it, wrap it into a new ReadOptions!
*/ */
private ReadOptions resolveSnapshot(LLSnapshot snapshot) { private ReadOptions resolveSnapshot(LLSnapshot snapshot) {
if (snapshot != null) { if (snapshot != null) {
@ -191,7 +193,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
/** /**
* Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions! * Please don't modify the returned ReadOptions!
* If you want to modify it, wrap it into a new ReadOptions!
*/ */
private ReadOptions getReadOptions(Snapshot snapshot) { private ReadOptions getReadOptions(Snapshot snapshot) {
if (snapshot != null) { if (snapshot != null) {
@ -239,7 +242,9 @@ public class LLLocalDictionary implements LLDictionary {
} }
@Override @Override
public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot, Mono<ByteBuf> keyMono, boolean existsAlmostCertainly) { public Mono<ByteBuf> get(@Nullable LLSnapshot snapshot,
Mono<ByteBuf> keyMono,
boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
key -> runOnDb(() -> { key -> runOnDb(() -> {
StampedLock lock; StampedLock lock;
@ -262,7 +267,8 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlockRead(stamp); lock.unlockRead(stamp);
} }
} }
}).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), }).onErrorMap(cause -> new IOException("Failed to read "
+ LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release) key -> Mono.fromRunnable(key::release)
); );
} }
@ -284,7 +290,7 @@ public class LLLocalDictionary implements LLDictionary {
ByteBuffer keyNioBuffer = LLUtils.toDirect(key); ByteBuffer keyNioBuffer = LLUtils.toDirect(key);
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect(); assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect();
// Create a direct result buffer because RocksDB works only with direct buffers // Create a direct result buffer because RocksDB works only with direct buffers
ByteBuf resultBuf = alloc.directBuffer(LLLocalDictionary.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); ByteBuf resultBuf = alloc.directBuffer(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES);
try { try {
int valueSize; int valueSize;
int assertionReadData = -1; int assertionReadData = -1;
@ -307,13 +313,15 @@ public class LLLocalDictionary implements LLDictionary {
// todo: check if limit is equal to value size or data that have been read // todo: check if limit is equal to value size or data that have been read
assert valueSize <= 0 || resultNioBuf.limit() > 0; assert valueSize <= 0 || resultNioBuf.limit() > 0;
// If the locking is enabled the data is safe, so since we are appending data to the end, // If the locking is enabled the data is safe, so since we are appending data
// we need to check if it has been appended correctly or it it has been overwritten. // to the end, we need to check if it has been appended correctly or it
// We must not do this check otherwise because if there is no locking the data can be // has been overwritten.
// overwritten with a smaller value the next time. // We must not do this check otherwise because if there is no locking the data
// can be overwritten with a smaller value the next time.
if (updateMode == UpdateMode.ALLOW) { if (updateMode == UpdateMode.ALLOW) {
// Check if read data is larger than previously read data. // Check if read data is larger than previously read data.
// If it's smaller or equals it means that RocksDB is overwriting the beginning of the result buffer. // If it's smaller or equals it means that RocksDB is overwriting
// the beginning of the result buffer.
assert resultNioBuf.limit() > assertionReadData; assert resultNioBuf.limit() > assertionReadData;
if (ASSERTIONS_ENABLED) { if (ASSERTIONS_ENABLED) {
assertionReadData = resultNioBuf.limit(); assertionReadData = resultNioBuf.limit();
@ -321,8 +329,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
// Check if read data is not bigger than the total value size. // Check if read data is not bigger than the total value size.
// If it's bigger it means that RocksDB is writing the start of the result into the result // If it's bigger it means that RocksDB is writing the start
// buffer more than once. // of the result into the result buffer more than once.
assert resultNioBuf.limit() <= valueSize; assert resultNioBuf.limit() <= valueSize;
} }
@ -342,7 +350,8 @@ public class LLLocalDictionary implements LLDictionary {
// Rewind the keyNioBuf position, making it readable again for the next loop iteration // Rewind the keyNioBuf position, making it readable again for the next loop iteration
keyNioBuffer.rewind(); keyNioBuffer.rewind();
if (resultBuf.capacity() < valueSize) { if (resultBuf.capacity() < valueSize) {
// Expand the resultBuf size if the result is bigger than the current result buffer size // Expand the resultBuf size if the result is bigger than the current result
// buffer size
resultBuf.capacity(valueSize); resultBuf.capacity(valueSize);
} }
} }
@ -354,18 +363,19 @@ public class LLLocalDictionary implements LLDictionary {
resultBuf.release(); resultBuf.release();
} }
} else { } else {
ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS);
byte[] keyArray = LLUtils.toArray(key); byte[] keyArray = LLUtils.toArray(key);
Objects.requireNonNull(keyArray); Objects.requireNonNull(keyArray);
Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>(); Holder<byte[]> data = existsAlmostCertainly ? null : new Holder<>();
if (existsAlmostCertainly || db.keyMayExist(cfh, if (existsAlmostCertainly || db.keyMayExist(cfh,
Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), validReadOptions,
keyArray, keyArray,
data data
)) { )) {
if (!existsAlmostCertainly && data.getValue() != null) { if (!existsAlmostCertainly && data.getValue() != null) {
return wrappedBuffer(data.getValue()); return wrappedBuffer(data.getValue());
} else { } else {
byte[] result = db.get(cfh, Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), keyArray); byte[] result = db.get(cfh, validReadOptions, keyArray);
if (result == null) { if (result == null) {
return null; return null;
} else { } else {
@ -382,9 +392,12 @@ public class LLLocalDictionary implements LLDictionary {
} }
@SuppressWarnings("SameParameterValue") @SuppressWarnings("SameParameterValue")
private void dbPut(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key, ByteBuf value) private void dbPut(ColumnFamilyHandle cfh,
throws RocksDBException { @Nullable WriteOptions writeOptions,
ByteBuf key,
ByteBuf value) throws RocksDBException {
try { try {
WriteOptions validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS);
if (databaseOptions.allowNettyDirect() && key.isDirect() && value.isDirect()) { if (databaseOptions.allowNettyDirect() && key.isDirect() && value.isDirect()) {
if (!key.isDirect()) { if (!key.isDirect()) {
throw new RocksDBException("Key buffer must be direct"); throw new RocksDBException("Key buffer must be direct");
@ -398,9 +411,9 @@ public class LLLocalDictionary implements LLDictionary {
var valueNioBuffer = LLUtils.toDirect(value); var valueNioBuffer = LLUtils.toDirect(value);
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || valueNioBuffer.isDirect(); assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || valueNioBuffer.isDirect();
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer); db.put(cfh, validWriteOptions, keyNioBuffer, valueNioBuffer);
} else { } else {
db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key), LLUtils.toArray(value)); db.put(cfh, validWriteOptions, LLUtils.toArray(key), LLUtils.toArray(value));
} }
} finally { } finally {
key.release(); key.release();
@ -430,16 +443,17 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setFillCache(false); readOpts.setFillCache(false);
if (range.hasMin()) { if (range.hasMin()) {
if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) { if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) {
readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), readOpts.setIterateLowerBound(new DirectSlice(Objects
"This range must use direct buffers" .requireNonNull(LLUtils.toDirect(range.getMin()),
))); "This range must use direct buffers")));
} else { } else {
readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin()))); readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin())));
} }
} }
if (range.hasMax()) { if (range.hasMax()) {
if (databaseOptions.allowNettyDirect() && range.getMax().isDirect()) { if (databaseOptions.allowNettyDirect() && range.getMax().isDirect()) {
readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()), readOpts.setIterateUpperBound(new DirectSlice(Objects
.requireNonNull(LLUtils.toDirect(range.getMax()),
"This range must use direct buffers" "This range must use direct buffers"
))); )));
} else { } else {
@ -498,13 +512,16 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlockRead(stamp); lock.unlockRead(stamp);
} }
} }
}).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), }).onErrorMap(cause -> new IOException("Failed to read "
+ LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release) key -> Mono.fromRunnable(key::release)
); );
} }
@Override @Override
public Mono<ByteBuf> put(Mono<ByteBuf> keyMono, Mono<ByteBuf> valueMono, LLDictionaryResultType resultType) { public Mono<ByteBuf> put(Mono<ByteBuf> keyMono,
Mono<ByteBuf> valueMono,
LLDictionaryResultType resultType) {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
key -> this key -> this
.getPreviousData(Mono.just(key).map(ByteBuf::retain), resultType) .getPreviousData(Mono.just(key).map(ByteBuf::retain), resultType)
@ -522,7 +539,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
try { try {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); logger.trace("Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(value));
} }
dbPut(cfh, null, key.retain(), value.retain()); dbPut(cfh, null, key.retain(), value.retain());
return null; return null;
@ -533,7 +551,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
}), }),
value -> Mono.fromRunnable(value::release) value -> Mono.fromRunnable(value::release)
).onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toStringSafe(key), cause))) ).onErrorMap(cause -> new IOException("Failed to write "
+ LLUtils.toStringSafe(key), cause)))
.singleOrEmpty(), .singleOrEmpty(),
key -> Mono.fromRunnable(key::release) key -> Mono.fromRunnable(key::release)
); );
@ -573,7 +592,8 @@ public class LLLocalDictionary implements LLDictionary {
while (true) { while (true) {
@Nullable ByteBuf prevData; @Nullable ByteBuf prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>(); var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { if (existsAlmostCertainly
|| db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) { if (prevDataBytes != null) {
@ -589,9 +609,13 @@ public class LLLocalDictionary implements LLDictionary {
} }
try { try {
@Nullable ByteBuf newData; @Nullable ByteBuf newData;
ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); ByteBuf prevDataToSendToUpdater = prevData == null
? null
: prevData.retainedSlice();
try { try {
newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); newData = updater.apply(prevDataToSendToUpdater == null
? null
: prevDataToSendToUpdater.retain());
if (!(prevDataToSendToUpdater == null if (!(prevDataToSendToUpdater == null
|| prevDataToSendToUpdater.readerIndex() == 0 || prevDataToSendToUpdater.readerIndex() == 0
|| !prevDataToSendToUpdater.isReadable())) { || !prevDataToSendToUpdater.isReadable())) {
@ -638,7 +662,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); logger.trace("Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
} }
dbPut(cfh, null, key.retain(), newData.retain()); dbPut(cfh, null, key.retain(), newData.retain());
} }
@ -665,7 +690,8 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlock(stamp); lock.unlock(stamp);
} }
} }
}).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)), }).onErrorMap(cause -> new IOException("Failed to read or write "
+ LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release) key -> Mono.fromRunnable(key::release)
); );
} }
@ -678,7 +704,9 @@ public class LLLocalDictionary implements LLDictionary {
boolean existsAlmostCertainly) { boolean existsAlmostCertainly) {
return Mono.usingWhen(keyMono, return Mono.usingWhen(keyMono,
key -> this.runOnDb(() -> { key -> this.runOnDb(() -> {
if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); if (updateMode == UpdateMode.DISALLOW) {
throw new UnsupportedOperationException("update() is disallowed");
}
StampedLock lock; StampedLock lock;
long stamp; long stamp;
if (updateMode == UpdateMode.ALLOW) { if (updateMode == UpdateMode.ALLOW) {
@ -696,7 +724,8 @@ public class LLLocalDictionary implements LLDictionary {
while (true) { while (true) {
@Nullable ByteBuf prevData; @Nullable ByteBuf prevData;
var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>(); var prevDataHolder = existsAlmostCertainly ? null : new Holder<byte[]>();
if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { if (existsAlmostCertainly
|| db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) {
if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { if (!existsAlmostCertainly && prevDataHolder.getValue() != null) {
byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); byte @Nullable [] prevDataBytes = prevDataHolder.getValue();
if (prevDataBytes != null) { if (prevDataBytes != null) {
@ -712,9 +741,13 @@ public class LLLocalDictionary implements LLDictionary {
} }
try { try {
@Nullable ByteBuf newData; @Nullable ByteBuf newData;
ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); ByteBuf prevDataToSendToUpdater = prevData == null
? null
: prevData.retainedSlice();
try { try {
newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); newData = updater.apply(prevDataToSendToUpdater == null
? null
: prevDataToSendToUpdater.retain());
assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() assert !databaseOptions.enableDbAssertionsWhenUsingAssertions()
|| prevDataToSendToUpdater == null || prevDataToSendToUpdater == null
|| prevDataToSendToUpdater.readerIndex() == 0 || prevDataToSendToUpdater.readerIndex() == 0
@ -757,7 +790,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); logger.trace("Writing {}: {}",
LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData));
} }
dbPut(cfh, null, key.retain(), newData.retain()); dbPut(cfh, null, key.retain(), newData.retain());
} }
@ -781,7 +815,8 @@ public class LLLocalDictionary implements LLDictionary {
lock.unlock(stamp); lock.unlock(stamp);
} }
} }
}).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)), }).onErrorMap(cause -> new IOException("Failed to read or write "
+ LLUtils.toStringSafe(key), cause)),
key -> Mono.fromRunnable(key::release) key -> Mono.fromRunnable(key::release)
); );
} }
@ -789,14 +824,15 @@ public class LLLocalDictionary implements LLDictionary {
private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key) private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key)
throws RocksDBException { throws RocksDBException {
try { try {
var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS);
if (databaseOptions.allowNettyDirect() && key.isDirect()) { if (databaseOptions.allowNettyDirect() && key.isDirect()) {
if (!key.isDirect()) { if (!key.isDirect()) {
throw new IllegalArgumentException("Key must be a direct buffer"); throw new IllegalArgumentException("Key must be a direct buffer");
} }
var keyNioBuffer = LLUtils.toDirect(key); var keyNioBuffer = LLUtils.toDirect(key);
db.delete(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer); db.delete(cfh, validWriteOptions, keyNioBuffer);
} else { } else {
db.delete(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key)); db.delete(cfh, validWriteOptions, LLUtils.toArray(key));
} }
} finally { } finally {
key.release(); key.release();
@ -832,7 +868,8 @@ public class LLLocalDictionary implements LLDictionary {
} }
} }
}) })
.onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toStringSafe(key), cause)) .onErrorMap(cause -> new IOException("Failed to delete "
+ LLUtils.toStringSafe(key), cause))
) )
.singleOrEmpty(), .singleOrEmpty(),
key -> Mono.fromCallable(key::release)); key -> Mono.fromCallable(key::release));