Parallel size

This commit is contained in:
Andrea Cavalli 2021-03-18 19:53:32 +01:00
parent aaa203f7ad
commit a80849f241
2 changed files with 109 additions and 19 deletions

View File

@ -27,6 +27,14 @@ public class LLUtils {
private static final byte[] RESPONSE_TRUE = new byte[]{1};
private static final byte[] RESPONSE_FALSE = new byte[]{0};
public static final byte[][] LEXICONOGRAPHIC_ITERATION_SEEKS = new byte[256][1];
static {
for (int i1 = 0; i1 < 256; i1++) {
var b = LEXICONOGRAPHIC_ITERATION_SEEKS[i1];
b[0] = (byte) i1;
}
}
public static boolean responseToBoolean(byte[] response) {
return response[0] == 1;

View File

@ -16,8 +16,14 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.tuple.Pair;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.rocksdb.ColumnFamilyHandle;
@ -42,7 +48,7 @@ import reactor.core.scheduler.Scheduler;
public class LLLocalDictionary implements LLDictionary {
protected static final Logger logger = LoggerFactory.getLogger(LLLocalDictionary.class);
private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = true;
private static final boolean USE_CURRENT_FASTSIZE_FOR_OLD_SNAPSHOTS = false;
static final int RESERVED_WRITE_BATCH_SIZE = 2 * 1024 * 1024; // 2MiB
static final long MAX_WRITE_BATCH_SIZE = 1024L * 1024L * 1024L; // 1GiB
static final int CAPPED_WRITE_BATCH_CAP = 50000; // 50K operations
@ -51,6 +57,7 @@ public class LLLocalDictionary implements LLDictionary {
static final boolean PREFER_SEEK_TO_FIRST = false;
static final boolean VERIFY_CHECKSUMS_WHEN_NOT_NEEDED = false;
public static final boolean DEBUG_PREFIXES_WHEN_ASSERTIONS_ARE_ENABLED = true;
static final boolean PARALLEL_EXACT_SIZE = true;
private static final int STRIPES = 512;
private static final byte[] FIRST_KEY = new byte[]{};
@ -178,9 +185,13 @@ public class LLLocalDictionary implements LLDictionary {
if (range.hasMax()) {
readOpts.setIterateUpperBound(new Slice(range.getMax()));
}
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
return iter.isValid();
try (RocksIterator rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
return rocksIterator.isValid();
}
})
.onErrorMap(cause -> new IOException("Failed to read range " + range.toString(), cause))
@ -774,11 +785,15 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setIgnoreRangeDeletions(true);
}
try (var iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
long i = 0;
while (iter.isValid()) {
iter.next();
while (rocksIterator.isValid()) {
rocksIterator.next();
i++;
}
return i;
@ -802,7 +817,11 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setIterateUpperBound(new Slice(range.getMax()));
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
rocksIterator.seekToFirst();
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
if (rocksIterator.isValid()) {
key = rocksIterator.key();
@ -827,7 +846,11 @@ public class LLLocalDictionary implements LLDictionary {
readOpts.setIterateUpperBound(new Slice(range.getMax()));
}
try (var rocksIterator = db.newIterator(cfh, readOpts)) {
rocksIterator.seekToFirst();
if (!LLLocalDictionary.PREFER_SEEK_TO_FIRST && range.hasMin()) {
rocksIterator.seek(range.getMin());
} else {
rocksIterator.seekToFirst();
}
byte[] key;
if (rocksIterator.isValid()) {
key = rocksIterator.key();
@ -849,15 +872,17 @@ public class LLLocalDictionary implements LLDictionary {
e.printStackTrace();
return 0;
}
} else if (PARALLEL_EXACT_SIZE) {
return exactSizeAll(snapshot);
} else {
rocksdbSnapshot.setFillCache(false);
rocksdbSnapshot.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
rocksdbSnapshot.setIgnoreRangeDeletions(false);
rocksdbSnapshot.setIgnoreRangeDeletions(true);
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, rocksdbSnapshot)) {
iter.seekToFirst();
// If it's a fast size of a snapshot, count only up to 1'000'000 elements
while (iter.isValid() && count < 1_000_000) {
// If it's a fast size of a snapshot, count only up to 100'000 elements
while (iter.isValid() && count < 100_000) {
count++;
iter.next();
}
@ -869,16 +894,73 @@ public class LLLocalDictionary implements LLDictionary {
private long exactSizeAll(@Nullable LLSnapshot snapshot) {
var readOpts = resolveSnapshot(snapshot);
readOpts.setFillCache(false);
readOpts.setReadaheadSize(2 * 1024 * 1024);
readOpts.setVerifyChecksums(VERIFY_CHECKSUMS_WHEN_NOT_NEEDED);
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
while (iter.isValid()) {
count++;
iter.next();
if (PARALLEL_EXACT_SIZE) {
var commonPool = ForkJoinPool.commonPool();
var futures = IntStream
.range(-1, LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length)
.mapToObj(idx -> Pair.of(idx == -1 ? null : LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx],
LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS.length >= idx + 1 ? null
: LLUtils.LEXICONOGRAPHIC_ITERATION_SEEKS[idx + 1]
))
.map(range -> (Callable<Long>) () -> {
long partialCount = 0;
var rangeReadOpts = new ReadOptions(readOpts);
Slice sliceBegin;
if (range.getKey() != null) {
sliceBegin = new Slice(range.getKey());
} else {
sliceBegin = null;
}
Slice sliceEnd;
if (range.getValue() != null) {
sliceEnd = new Slice(range.getValue());
} else {
sliceEnd = null;
}
try {
if (sliceBegin != null) {
rangeReadOpts.setIterateLowerBound(sliceBegin);
}
if (sliceBegin != null) {
rangeReadOpts.setIterateUpperBound(sliceEnd);
}
try (RocksIterator iter = db.newIterator(cfh, rangeReadOpts)) {
iter.seekToFirst();
while (iter.isValid()) {
partialCount++;
iter.next();
}
return partialCount;
}
} finally {
if (sliceBegin != null) {
sliceBegin.close();
}
if (sliceEnd != null) {
sliceEnd.close();
}
}
})
.map(commonPool::submit)
.collect(Collectors.toList());
long count = 0;
for (ForkJoinTask<Long> future : futures) {
count += future.join();
}
return count;
} else {
long count = 0;
try (RocksIterator iter = db.newIterator(cfh, readOpts)) {
iter.seekToFirst();
while (iter.isValid()) {
count++;
iter.next();
}
return count;
}
}
}