diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index 3363368..d4e1efa 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -82,7 +82,8 @@ public class LLLocalDictionary implements LLDictionary { static final WriteOptions BATCH_WRITE_OPTIONS = new UnmodifiableWriteOptions(); static final boolean PREFER_SEEK_TO_FIRST = false; /** - * It used to be false, now it's true to avoid crashes during iterations on completely corrupted files + * It used to be false, + * now it's true to avoid crashes during iterations on completely corrupted files */ static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = true; public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true; @@ -180,7 +181,8 @@ public class LLLocalDictionary implements LLDictionary { } /** - * Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions! + * Please don't modify the returned ReadOptions! + * If you want to modify it, wrap it into a new ReadOptions! */ private ReadOptions resolveSnapshot(LLSnapshot snapshot) { if (snapshot != null) { @@ -191,7 +193,8 @@ public class LLLocalDictionary implements LLDictionary { } /** - * Please don't modify the returned ReadOptions! If you want to modify it, wrap it into a new ReadOptions! + * Please don't modify the returned ReadOptions! + * If you want to modify it, wrap it into a new ReadOptions! */ private ReadOptions getReadOptions(Snapshot snapshot) { if (snapshot != null) { @@ -239,7 +242,9 @@ public class LLLocalDictionary implements LLDictionary { } @Override - public Mono get(@Nullable LLSnapshot snapshot, Mono keyMono, boolean existsAlmostCertainly) { + public Mono get(@Nullable LLSnapshot snapshot, + Mono keyMono, + boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> runOnDb(() -> { StampedLock lock; @@ -262,7 +267,8 @@ public class LLLocalDictionary implements LLDictionary { lock.unlockRead(stamp); } } - }).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), + }).onErrorMap(cause -> new IOException("Failed to read " + + LLUtils.toStringSafe(key), cause)), key -> Mono.fromRunnable(key::release) ); } @@ -284,7 +290,7 @@ public class LLLocalDictionary implements LLDictionary { ByteBuffer keyNioBuffer = LLUtils.toDirect(key); assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || keyNioBuffer.isDirect(); // Create a direct result buffer because RocksDB works only with direct buffers - ByteBuf resultBuf = alloc.directBuffer(LLLocalDictionary.INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); + ByteBuf resultBuf = alloc.directBuffer(INITIAL_DIRECT_READ_BYTE_BUF_SIZE_BYTES); try { int valueSize; int assertionReadData = -1; @@ -307,13 +313,15 @@ public class LLLocalDictionary implements LLDictionary { // todo: check if limit is equal to value size or data that have been read assert valueSize <= 0 || resultNioBuf.limit() > 0; - // If the locking is enabled the data is safe, so since we are appending data to the end, - // we need to check if it has been appended correctly or it it has been overwritten. - // We must not do this check otherwise because if there is no locking the data can be - // overwritten with a smaller value the next time. + // If the locking is enabled the data is safe, so since we are appending data + // to the end, we need to check if it has been appended correctly or it + // has been overwritten. + // We must not do this check otherwise because if there is no locking the data + // can be overwritten with a smaller value the next time. if (updateMode == UpdateMode.ALLOW) { // Check if read data is larger than previously read data. - // If it's smaller or equals it means that RocksDB is overwriting the beginning of the result buffer. + // If it's smaller or equals it means that RocksDB is overwriting + // the beginning of the result buffer. assert resultNioBuf.limit() > assertionReadData; if (ASSERTIONS_ENABLED) { assertionReadData = resultNioBuf.limit(); @@ -321,8 +329,8 @@ public class LLLocalDictionary implements LLDictionary { } // Check if read data is not bigger than the total value size. - // If it's bigger it means that RocksDB is writing the start of the result into the result - // buffer more than once. + // If it's bigger it means that RocksDB is writing the start + // of the result into the result buffer more than once. assert resultNioBuf.limit() <= valueSize; } @@ -342,7 +350,8 @@ public class LLLocalDictionary implements LLDictionary { // Rewind the keyNioBuf position, making it readable again for the next loop iteration keyNioBuffer.rewind(); if (resultBuf.capacity() < valueSize) { - // Expand the resultBuf size if the result is bigger than the current result buffer size + // Expand the resultBuf size if the result is bigger than the current result + // buffer size resultBuf.capacity(valueSize); } } @@ -354,18 +363,19 @@ public class LLLocalDictionary implements LLDictionary { resultBuf.release(); } } else { + ReadOptions validReadOptions = Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS); byte[] keyArray = LLUtils.toArray(key); Objects.requireNonNull(keyArray); Holder data = existsAlmostCertainly ? null : new Holder<>(); if (existsAlmostCertainly || db.keyMayExist(cfh, - Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), + validReadOptions, keyArray, data )) { if (!existsAlmostCertainly && data.getValue() != null) { return wrappedBuffer(data.getValue()); } else { - byte[] result = db.get(cfh, Objects.requireNonNullElse(readOptions, EMPTY_READ_OPTIONS), keyArray); + byte[] result = db.get(cfh, validReadOptions, keyArray); if (result == null) { return null; } else { @@ -382,9 +392,12 @@ public class LLLocalDictionary implements LLDictionary { } @SuppressWarnings("SameParameterValue") - private void dbPut(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key, ByteBuf value) - throws RocksDBException { + private void dbPut(ColumnFamilyHandle cfh, + @Nullable WriteOptions writeOptions, + ByteBuf key, + ByteBuf value) throws RocksDBException { try { + WriteOptions validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS); if (databaseOptions.allowNettyDirect() && key.isDirect() && value.isDirect()) { if (!key.isDirect()) { throw new RocksDBException("Key buffer must be direct"); @@ -398,9 +411,9 @@ public class LLLocalDictionary implements LLDictionary { var valueNioBuffer = LLUtils.toDirect(value); assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || valueNioBuffer.isDirect(); - db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer, valueNioBuffer); + db.put(cfh, validWriteOptions, keyNioBuffer, valueNioBuffer); } else { - db.put(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key), LLUtils.toArray(value)); + db.put(cfh, validWriteOptions, LLUtils.toArray(key), LLUtils.toArray(value)); } } finally { key.release(); @@ -430,16 +443,17 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setFillCache(false); if (range.hasMin()) { if (databaseOptions.allowNettyDirect() && range.getMin().isDirect()) { - readOpts.setIterateLowerBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMin()), - "This range must use direct buffers" - ))); + readOpts.setIterateLowerBound(new DirectSlice(Objects + .requireNonNull(LLUtils.toDirect(range.getMin()), + "This range must use direct buffers"))); } else { readOpts.setIterateLowerBound(new Slice(LLUtils.toArray(range.getMin()))); } } if (range.hasMax()) { if (databaseOptions.allowNettyDirect() && range.getMax().isDirect()) { - readOpts.setIterateUpperBound(new DirectSlice(Objects.requireNonNull(LLUtils.toDirect(range.getMax()), + readOpts.setIterateUpperBound(new DirectSlice(Objects + .requireNonNull(LLUtils.toDirect(range.getMax()), "This range must use direct buffers" ))); } else { @@ -498,13 +512,16 @@ public class LLLocalDictionary implements LLDictionary { lock.unlockRead(stamp); } } - }).onErrorMap(cause -> new IOException("Failed to read " + LLUtils.toStringSafe(key), cause)), + }).onErrorMap(cause -> new IOException("Failed to read " + + LLUtils.toStringSafe(key), cause)), key -> Mono.fromRunnable(key::release) ); } @Override - public Mono put(Mono keyMono, Mono valueMono, LLDictionaryResultType resultType) { + public Mono put(Mono keyMono, + Mono valueMono, + LLDictionaryResultType resultType) { return Mono.usingWhen(keyMono, key -> this .getPreviousData(Mono.just(key).map(ByteBuf::retain), resultType) @@ -522,7 +539,8 @@ public class LLLocalDictionary implements LLDictionary { } try { if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); + logger.trace("Writing {}: {}", + LLUtils.toStringSafe(key), LLUtils.toStringSafe(value)); } dbPut(cfh, null, key.retain(), value.retain()); return null; @@ -533,7 +551,8 @@ public class LLLocalDictionary implements LLDictionary { } }), value -> Mono.fromRunnable(value::release) - ).onErrorMap(cause -> new IOException("Failed to write " + LLUtils.toStringSafe(key), cause))) + ).onErrorMap(cause -> new IOException("Failed to write " + + LLUtils.toStringSafe(key), cause))) .singleOrEmpty(), key -> Mono.fromRunnable(key::release) ); @@ -573,7 +592,8 @@ public class LLLocalDictionary implements LLDictionary { while (true) { @Nullable ByteBuf prevData; var prevDataHolder = existsAlmostCertainly ? null : new Holder(); - if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { + if (existsAlmostCertainly + || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); if (prevDataBytes != null) { @@ -589,9 +609,13 @@ public class LLLocalDictionary implements LLDictionary { } try { @Nullable ByteBuf newData; - ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); + ByteBuf prevDataToSendToUpdater = prevData == null + ? null + : prevData.retainedSlice(); try { - newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); + newData = updater.apply(prevDataToSendToUpdater == null + ? null + : prevDataToSendToUpdater.retain()); if (!(prevDataToSendToUpdater == null || prevDataToSendToUpdater.readerIndex() == 0 || !prevDataToSendToUpdater.isReadable())) { @@ -638,7 +662,8 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); + logger.trace("Writing {}: {}", + LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); } dbPut(cfh, null, key.retain(), newData.retain()); } @@ -665,7 +690,8 @@ public class LLLocalDictionary implements LLDictionary { lock.unlock(stamp); } } - }).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)), + }).onErrorMap(cause -> new IOException("Failed to read or write " + + LLUtils.toStringSafe(key), cause)), key -> Mono.fromRunnable(key::release) ); } @@ -678,7 +704,9 @@ public class LLLocalDictionary implements LLDictionary { boolean existsAlmostCertainly) { return Mono.usingWhen(keyMono, key -> this.runOnDb(() -> { - if (updateMode == UpdateMode.DISALLOW) throw new UnsupportedOperationException("update() is disallowed"); + if (updateMode == UpdateMode.DISALLOW) { + throw new UnsupportedOperationException("update() is disallowed"); + } StampedLock lock; long stamp; if (updateMode == UpdateMode.ALLOW) { @@ -696,7 +724,8 @@ public class LLLocalDictionary implements LLDictionary { while (true) { @Nullable ByteBuf prevData; var prevDataHolder = existsAlmostCertainly ? null : new Holder(); - if (existsAlmostCertainly || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { + if (existsAlmostCertainly + || db.keyMayExist(cfh, LLUtils.toArray(key), prevDataHolder)) { if (!existsAlmostCertainly && prevDataHolder.getValue() != null) { byte @Nullable [] prevDataBytes = prevDataHolder.getValue(); if (prevDataBytes != null) { @@ -712,9 +741,13 @@ public class LLLocalDictionary implements LLDictionary { } try { @Nullable ByteBuf newData; - ByteBuf prevDataToSendToUpdater = prevData == null ? null : prevData.retainedSlice(); + ByteBuf prevDataToSendToUpdater = prevData == null + ? null + : prevData.retainedSlice(); try { - newData = updater.apply(prevDataToSendToUpdater == null ? null : prevDataToSendToUpdater.retain()); + newData = updater.apply(prevDataToSendToUpdater == null + ? null + : prevDataToSendToUpdater.retain()); assert !databaseOptions.enableDbAssertionsWhenUsingAssertions() || prevDataToSendToUpdater == null || prevDataToSendToUpdater.readerIndex() == 0 @@ -757,7 +790,8 @@ public class LLLocalDictionary implements LLDictionary { } } if (logger.isTraceEnabled()) { - logger.trace("Writing {}: {}", LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); + logger.trace("Writing {}: {}", + LLUtils.toStringSafe(key), LLUtils.toStringSafe(newData)); } dbPut(cfh, null, key.retain(), newData.retain()); } @@ -781,7 +815,8 @@ public class LLLocalDictionary implements LLDictionary { lock.unlock(stamp); } } - }).onErrorMap(cause -> new IOException("Failed to read or write " + LLUtils.toStringSafe(key), cause)), + }).onErrorMap(cause -> new IOException("Failed to read or write " + + LLUtils.toStringSafe(key), cause)), key -> Mono.fromRunnable(key::release) ); } @@ -789,14 +824,15 @@ public class LLLocalDictionary implements LLDictionary { private void dbDelete(ColumnFamilyHandle cfh, @Nullable WriteOptions writeOptions, ByteBuf key) throws RocksDBException { try { + var validWriteOptions = Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS); if (databaseOptions.allowNettyDirect() && key.isDirect()) { if (!key.isDirect()) { throw new IllegalArgumentException("Key must be a direct buffer"); } var keyNioBuffer = LLUtils.toDirect(key); - db.delete(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), keyNioBuffer); + db.delete(cfh, validWriteOptions, keyNioBuffer); } else { - db.delete(cfh, Objects.requireNonNullElse(writeOptions, EMPTY_WRITE_OPTIONS), LLUtils.toArray(key)); + db.delete(cfh, validWriteOptions, LLUtils.toArray(key)); } } finally { key.release(); @@ -832,7 +868,8 @@ public class LLLocalDictionary implements LLDictionary { } } }) - .onErrorMap(cause -> new IOException("Failed to delete " + LLUtils.toStringSafe(key), cause)) + .onErrorMap(cause -> new IOException("Failed to delete " + + LLUtils.toStringSafe(key), cause)) ) .singleOrEmpty(), key -> Mono.fromCallable(key::release));