diff --git a/src/main/java/it/cavallium/dbengine/database/LLUtils.java b/src/main/java/it/cavallium/dbengine/database/LLUtils.java index eab880b..a6122d8 100644 --- a/src/main/java/it/cavallium/dbengine/database/LLUtils.java +++ b/src/main/java/it/cavallium/dbengine/database/LLUtils.java @@ -27,6 +27,14 @@ public class LLUtils { private static final byte[] RESPONSE_TRUE = new byte[]{1}; private static final byte[] RESPONSE_FALSE = new byte[]{0}; + public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1]; + + static { + for (int i1 = 0; i1 < 256; i1++) { + var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1]; + b[0] = (byte) i1; + } + } public static boolean responseToBoolean(byte[] response) { return response[0] == 1; diff --git a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java index fc354bf..1e81794 100644 --- a/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java +++ b/src/main/java/it/cavallium/dbengine/database/disk/LLLocalDictionary.java @@ -16,8 +16,14 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.concurrent.locks.StampedLock; import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.commons.lang3.tuple.Pair; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.rocksdb.ColumnFamilyHandle; @@ -42,7 +48,7 @@ import reactor.core.scheduler.Scheduler; public class LLLocalDictionary implements LLDictionary { protected static final Logger logger = LoggerFactory.getLogger(LLLocalDictionary.class); - private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = true; + private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = false; static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations @@ -51,6 +57,7 @@ public class LLLocalDictionary implements LLDictionary { static final boolean PREFER_SEEK_TO_FIRST = false; static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false; public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true; + static final boolean PARALLEL_EXACT_SIZE = true; private static final int STRIPES = 512; private static final byte[] FIRST_KEY = new byte[]{}; @@ -178,9 +185,13 @@ public class LLLocalDictionary implements LLDictionary { if (range.hasMax()) { readOpts.setIterateUpperBound(new Slice(range.getMax())); } - try (RocksIterator iter = db.newIterator(cfh, readOpts)) { - iter.seekToFirst(); - return iter.isValid(); + try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } + return rocksIterator.isValid(); } }) .onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause)) @@ -774,11 +785,15 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setIgnoreRangeDeletions(true); } - try (var iter = db.newIterator(cfh, readOpts)) { - iter.seekToFirst(); + try (var rocksIterator = db.newIterator(cfh, readOpts)) { + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } long i = 0; - while (iter.isValid()) { - iter.next(); + while (rocksIterator.isValid()) { + rocksIterator.next(); i++; } return i; @@ -802,7 +817,11 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setIterateUpperBound(new Slice(range.getMax())); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { - rocksIterator.seekToFirst(); + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } byte[] key; if (rocksIterator.isValid()) { key = rocksIterator.key(); @@ -827,7 +846,11 @@ public class LLLocalDictionary implements LLDictionary { readOpts.setIterateUpperBound(new Slice(range.getMax())); } try (var rocksIterator = db.newIterator(cfh, readOpts)) { - rocksIterator.seekToFirst(); + if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) { + rocksIterator.seek(range.getMin()); + } else { + rocksIterator.seekToFirst(); + } byte[] key; if (rocksIterator.isValid()) { key = rocksIterator.key(); @@ -849,15 +872,17 @@ public class LLLocalDictionary implements LLDictionary { e.printStackTrace(); return 0; } + } else if (PARALLEL_EXACT_SIZE) { + return exactSizeAll(snapshot); } else { rocksdbSnapshot.setFillCache(false); rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - rocksdbSnapshot.setIgnoreRangeDeletions(false); + rocksdbSnapshot.setIgnoreRangeDeletions(true); long count = 0; try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) { iter.seekToFirst(); - // If it's a fast size of a snapshot, count only up to 1'000'000 elements - while (iter.isValid() && count < 1_000_000) { + // If it's a fast size of a snapshot, count only up to 100'000 elements + while (iter.isValid() && count < 100_000) { count++; iter.next(); } @@ -869,16 +894,73 @@ public class LLLocalDictionary implements LLDictionary { private long exactSizeAll(@Nullable LLSnapshot snapshot) { var readOpts = resolveSnapshot(snapshot); readOpts.setFillCache(false); + readOpts.setReadaheadSize(2 * 1024 * 1024); readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED); - long count = 0; - try (RocksIterator iter = db.newIterator(cfh, readOpts)) { - iter.seekToFirst(); - while (iter.isValid()) { - count++; - iter.next(); + if (PARALLEL_EXACT_SIZE) { + var commonPool = ForkJoinPool.commonPool(); + var futures = IntStream + .range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length) + .mapToObj(idx -> Pair.of(idx == -1 ? null : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx], + LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length >= idx + 1 ? null + : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1] + )) + .map(range -> (Callable) () -> { + long partialCount = 0; + var rangeReadOpts = new ReadOptions(readOpts); + Slice sliceBegin; + if (range.getKey() != null) { + sliceBegin = new Slice(range.getKey()); + } else { + sliceBegin = null; + } + Slice sliceEnd; + if (range.getValue() != null) { + sliceEnd = new Slice(range.getValue()); + } else { + sliceEnd = null; + } + try { + if (sliceBegin != null) { + rangeReadOpts.setIterateLowerBound(sliceBegin); + } + if (sliceBegin != null) { + rangeReadOpts.setIterateUpperBound(sliceEnd); + } + try (RocksIterator iter = db.newIterator(cfh, rangeReadOpts)) { + iter.seekToFirst(); + while (iter.isValid()) { + partialCount++; + iter.next(); + } + return partialCount; + } + } finally { + if (sliceBegin != null) { + sliceBegin.close(); + } + if (sliceEnd != null) { + sliceEnd.close(); + } + } + }) + .map(commonPool::submit) + .collect(Collectors.toList()); + long count = 0; + for (ForkJoinTask future : futures) { + count += future.join(); } return count; + } else { + long count = 0; + try (RocksIterator iter = db.newIterator(cfh, readOpts)) { + iter.seekToFirst(); + while (iter.isValid()) { + count++; + iter.next(); + } + return count; + } } }