diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index afe0ece..55b94b1 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -44,6 +44,7 @@ import it.cavallium.dbengine.utils.DBException; import it.cavallium.dbengine.utils.StreamUtils; import java.io.IOException; import java.math.BigInteger; +import java.nio.file.LinkOption; import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; @@ -57,6 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.logging.log4j.LogManager; @@ -642,6 +644,11 @@ public class LLLocalDictionary implements LLDictionary { Set brokenFiles = new ConcurrentHashMap().keySet(true); LongAdder totalScanned = new LongAdder(); AtomicLong totalEstimate = new AtomicLong(); + Set filesToSkip = Stream + .of(System.getProperty("filenames.skip.list", "").split(",")) + .filter(x -> !x.isBlank()) + .map(String::trim) + .collect(Collectors.toUnmodifiableSet()); try { totalEstimate.set(db.getNumEntries()); @@ -651,20 +658,26 @@ public class LLLocalDictionary implements LLDictionary { Column column = ColumnUtils.special(columnName); try { - record FileRange(String path, @Nullable LLRange range, long countEstimate) {} + record FileRange(String filePath, String filename, @Nullable LLRange range, long countEstimate) {} return db.getAllLiveFiles() .map(metadata -> new FileRange(Path.of(metadata.path()).resolve("./" + metadata.fileName()).normalize().toString(), + metadata.fileName().replace("/", ""), LLRange.intersect(metadata.keysRange(), rangeFull), metadata.numEntries() )) .filter(fr -> fr.range != null) + // Skip some files + .filter(fr -> !filesToSkip.contains(fr.filename)) .parallel() .flatMap(fr -> { - String path = fr.path; + String filename = fr.filename; + String path = fr.filePath; LLRange rangePartial = fr.range; AtomicLong fileScanned = new AtomicLong(); final long fileEstimate = fr.countEstimate; + AtomicBoolean streamStarted = new AtomicBoolean(false); + AtomicBoolean streamStarted2 = new AtomicBoolean(false); AtomicBoolean streamEnded = new AtomicBoolean(false); totalScanned.add(fileEstimate); try { @@ -679,12 +692,10 @@ public class LLLocalDictionary implements LLDictionary { } ro.setVerifyChecksums(true); return resourceStream(() -> db.newRocksIterator(ro, rangePartial, false), rocksIterator -> { - if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { - logger.info("Seeking to {}->{}->first on file {}", databaseName, column.name(), path); - } - rocksIterator.seekToFirst(); return StreamUtils.>streamWhileNonNull(() -> { - if (!rocksIterator.isValid()) { + boolean first = streamStarted.compareAndSet(false, true); + boolean second = !first && streamStarted.compareAndSet(false, true); + if (!first && !rocksIterator.isValid()) { if (streamEnded.compareAndSet(false, true)) { totalScanned.add(fileScanned.get() - fileEstimate); return Optional.of(new FileOk(databaseName, column, path)); @@ -696,12 +707,22 @@ public class LLLocalDictionary implements LLDictionary { boolean shouldSendStatus; Buf rawKey = null; try { - rawKey = rocksIterator.keyBuf().copy(); - shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0; - if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { - logger.info("Checking {}->{}->{} on file {}", databaseName, column.name(), rawKey, path); + if (second) { + if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { + logger.info("Seeking to {}->{}->first on file {}", databaseName, column.name(), filename); + } + rocksIterator.seekToFirst(); + } + if (first) { + shouldSendStatus = true; + } else { + rawKey = rocksIterator.keyBuf().copy(); + shouldSendStatus = fileScanned.incrementAndGet() % 1_000_000 == 0; + if (PRINT_ALL_CHECKSUM_VERIFICATION_STEPS) { + logger.info("Checking {}->{}->{} on file {}", databaseName, column.name(), rawKey.toString(), filename); + } + rocksIterator.next(); } - rocksIterator.next(); } catch (RocksDBException ex) { return Optional.of(new BlockBad(databaseName, column, rawKey, path, ex)); } diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java index 06428f7..34c428d 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalKeyValueDatabase.java @@ -116,6 +116,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.forcecolumnfamilyconsistencychecks", "true")); static final boolean PRINT_ALL_CHECKSUM_VERIFICATION_STEPS = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.checks.verification.print", "false")); + private static final InfoLogLevel LOG_LEVEL = InfoLogLevel.getInfoLogLevel(Byte.parseByte(System.getProperty("it.cavallium.dbengine.log.levelcode", "" + InfoLogLevel.WARN_LEVEL.getValue()))); private static final CacheFactory CACHE_FACTORY = USE_CLOCK_CACHE ? new ClockCacheFactory() : new LRUCacheFactory(); private static final boolean ALLOW_SNAPSHOTS = Boolean.parseBoolean(System.getProperty("it.cavallium.dbengine.snapshots.allow", "true")); @@ -257,6 +258,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa if (dynamicLevelBytes) { columnFamilyOptions.setLevelCompactionDynamicLevelBytes(true); } else { + columnFamilyOptions.setLevelCompactionDynamicLevelBytes(false); // https://www.arangodb.com/docs/stable/programs-arangod-rocksdb.html // https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.html columnFamilyOptions.setMaxBytesForLevelBase(256 * SizeUnit.MB); @@ -451,11 +453,11 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa } catch (IOException e) { throw new RocksDBException("Failed to create secondary exception: " + e); } - this.db = RocksDB.openAsSecondary(rocksdbOptions, + this.db = RocksDB.openReadOnly(rocksdbOptions, dbPathString, - secondaryPath.toString(), descriptors, - handles + handles, + false ); } else if (databaseOptions.optimistic()) { this.db = OptimisticTransactionDB.open(rocksdbOptions, dbPathString, descriptors, handles); @@ -942,7 +944,7 @@ public class LLLocalKeyValueDatabase extends Backuppable implements LLKeyValueDa options.setCreateIfMissing(true); options.setSkipStatsUpdateOnDbOpen(true); options.setCreateMissingColumnFamilies(true); - options.setInfoLogLevel(InfoLogLevel.WARN_LEVEL); + options.setInfoLogLevel(LOG_LEVEL); // todo: automatically flush every x seconds? options.setManualWalFlush(true); diff --git a/src/repair/java/it/cavallium/dbengine/repair/Repair.java b/src/repair/java/it/cavallium/dbengine/repair/Repair.java index d190b86..a64caa5 100644 --- a/src/repair/java/it/cavallium/dbengine/repair/Repair.java +++ b/src/repair/java/it/cavallium/dbengine/repair/Repair.java @@ -10,6 +10,7 @@ import it.cavallium.datagen.nativedata.NullableString; import it.cavallium.datagen.nativedata.Nullableboolean; import it.cavallium.datagen.nativedata.Nullableint; import it.cavallium.datagen.nativedata.Nullablelong; +import it.cavallium.dbengine.client.Compression; import it.cavallium.dbengine.client.VerificationProgress.BlockBad; import it.cavallium.dbengine.client.VerificationProgress.FileOk; import it.cavallium.dbengine.client.VerificationProgress.Progress; @@ -37,6 +38,7 @@ import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; +import org.rocksdb.InfoLogLevel; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; @@ -51,6 +53,7 @@ public class Repair { System.setProperty("it.cavallium.dbengine.checks.filesize", "true"); System.setProperty("it.cavallium.dbengine.checks.paranoidfilechecks", "true"); System.setProperty("it.cavallium.dbengine.checks.forcecolumnfamilyconsistencychecks", "true"); + System.setProperty("it.cavallium.dbengine.log.levelcode", String.valueOf(InfoLogLevel.DEBUG_LEVEL.getValue())); ObjectList initialArgs = ObjectArrayList.wrap(argsArray), args = initialArgs; if (args.isEmpty() || args.contains("--help")) { printHelp(initialArgs); @@ -98,7 +101,7 @@ public class Repair { System.err.println("File is ok: " + block.databaseName() + (block.column() != null ? "->" + block.column().name() : "") + "->" + block.file()); } case Progress progress -> { - System.err.printf("Progress: [%d/%d] file: [%d/%d] %s->%s->%s%n", + System.err.printf("Progress: [%d/%d] file: [%d/%d] %s%s->%s%n", progress.scanned(), progress.total(), progress.fileScanned(), @@ -216,10 +219,10 @@ public class Repair { Nullableint.empty(), NullableString.empty(), Nullablelong.empty(), - false, + true, + Nullablelong.of(8096 * 1024), Nullablelong.empty(), - Nullablelong.empty(), - NullableCompression.empty() + NullableCompression.of(Compression.LZ4_HC) )) .columnOptions(columnNames.stream() .map(columnName -> NamedColumnOptions.of(columnName, @@ -231,10 +234,10 @@ public class Repair { Nullableint.empty(), NullableString.empty(), Nullablelong.empty(), - false, + true, + Nullablelong.of(8096 * 1024), Nullablelong.empty(), - Nullablelong.empty(), - NullableCompression.empty())) + NullableCompression.of(Compression.LZ4_HC))) .toList()) .writeBufferManager(Nullablelong.empty()) .build()); diff --git a/src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java b/src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java index 94e9f35..06910e1 100644 --- a/src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java +++ b/src/test/java/it/cavallium/dbengine/tests/LLRangeTest.java @@ -33,15 +33,15 @@ class LLRangeTest { private String toStringRange(LLRange r) { if (r.isSingle()) { - return LLUtils.HEX_FORMAT.formatHex(r.getSingle().toByteArray()); + return r.getSingle().toString(); } else if (r.hasMin() && r.hasMax()) { - return LLUtils.HEX_FORMAT.formatHex(r.getMin().toByteArray()) + return r.getMin().toString() + "-" - + LLUtils.HEX_FORMAT.formatHex(r.getMax().toByteArray()); + + r.getMax().toString(); } else if (r.hasMin()) { - return LLUtils.HEX_FORMAT.formatHex(r.getMin().toByteArray()) + "-MAX"; + return r.getMin().toString() + "-MAX"; } else if (r.hasMax()) { - return "MIN-" + LLUtils.HEX_FORMAT.formatHex(r.getMax().toByteArray()); + return "MIN-" + r.getMax().toString(); } else { return "MIN-MAX"; } diff --git a/src/test/java/module-info.java b/src/test/java/module-info.java index eb79b9f..3f8ad69 100644 --- a/src/test/java/module-info.java +++ b/src/test/java/module-info.java @@ -14,5 +14,4 @@ module dbengine.tests { requires org.apache.commons.lang3; requires rocksdbjni; opens it.cavallium.dbengine.tests; - opens it.cavallium.dbengine.database.disk.test; } \ No newline at end of file