diff --git a/table/filter_block.cc b/table/filter_block.cc index 38f807cef..5f687ebc7 100644 --- a/table/filter_block.cc +++ b/table/filter_block.cc @@ -64,9 +64,9 @@ void FilterBlockBuilder::AddKey(const Slice& key) { // prefix(last entry) to get the prefix of the last key. if (prev.size() == 0 || ! SamePrefix(key, prev)) { Slice prefix = prefix_extractor_->Transform(key); - assert(comparator_->Compare(prefix, key) <= 0); InternalKey internal_prefix_tmp(prefix, 0, kTypeValue); Slice internal_prefix = internal_prefix_tmp.Encode(); + assert(comparator_->Compare(internal_prefix, key) <= 0); start_.push_back(entries_.size()); entries_.append(internal_prefix.data(), internal_prefix.size()); } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index cce60b16e..349d22ec5 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -4,7 +4,7 @@ // //The test uses an array to compare against values written to the database. //Keys written to the array are in 1:1 correspondence to the actual values in -//the database according to the formula in the functino GenerateValue +//the database according to the formula in the function GenerateValue //Space is reserved in the array from 0 to FLAGS_max_key and values are randomly //written/deleted/read from those positions. During verification we compare all @@ -26,6 +26,7 @@ #include "leveldb/write_batch.h" #include "leveldb/statistics.h" #include "port/port.h" +#include "util/coding.h" #include "util/crc32c.h" #include "util/histogram.h" #include "util/mutexlock.h" @@ -43,12 +44,13 @@ static uint32_t FLAGS_seed = 2341234; // Max number of key/values to place in database static long FLAGS_max_key = 2 * KB * KB * KB; -// If set, the test uses MultiGet, MultiPut and MultiDelete that -// do a different kind of validation during the test itself, -// rather than at the end. This is meant to solve the following -// problems at the expense of doing less degree of validation. -// (a) No need to acquire mutexes during writes (less cache flushes -// in multi-core leading to speed up) +// If set, the test uses MultiGet, MultiPrefixScan, MultiPut and +// MultiDelete that do a different kind of validation during the test +// itself, rather than at the end. This is meant to solve the +// following problems at the expense of doing less degree of +// validation. +// (a) No need to acquire mutexes during writes (less cache flushes in +// multi-core leading to speed up) // (b) No long validation at the end (more speed up) // (c) Also test snapshot and atomicity of batch writes static bool FLAGS_test_batches_snapshots = false; @@ -156,8 +158,14 @@ static int FLAGS_level0_slowdown_writes_trigger = 8; // Ratio of reads to total workload (expressed as a percentage) static unsigned int FLAGS_readpercent = 10; +// Ratio of prefix iterators to total workload (expressed as a percentage) +static unsigned int FLAGS_prefixpercent = 25; + // Ratio of deletes to total workload (expressed as a percentage) -static unsigned int FLAGS_delpercent = 30; +static unsigned int FLAGS_writepercent = 50; + +// Ratio of deletes to total workload (expressed as a percentage) +static unsigned int FLAGS_delpercent = 15; // Option to disable compation triggered by read. static int FLAGS_disable_seek_compaction = false; @@ -191,6 +199,19 @@ static int FLAGS_level0_file_num_compaction_trigger = 0; namespace leveldb { +// convert long to a big-endian slice key +static std::string Key(long val) { + std::string little_endian_key; + std::string big_endian_key; + PutFixed64(&little_endian_key, val); + assert(little_endian_key.size() == sizeof(val)); + big_endian_key.resize(sizeof(val)); + for (int i=0; i<(int)sizeof(val); i++) { + big_endian_key[i] = little_endian_key[sizeof(val) - 1 - i]; + } + return big_endian_key; +} + class StressTest; namespace { @@ -200,9 +221,11 @@ class Stats { double finish_; double seconds_; long done_; + long gets_; + long prefixes_; long writes_; long deletes_; - long gets_; + long iterator_size_sums_; long founds_; long errors_; int next_report_; @@ -217,9 +240,11 @@ class Stats { next_report_ = 100; hist_.Clear(); done_ = 0; + gets_ = 0; + prefixes_ = 0; writes_ = 0; deletes_ = 0; - gets_ = 0; + iterator_size_sums_ = 0; founds_ = 0; errors_ = 0; bytes_ = 0; @@ -232,9 +257,11 @@ class Stats { void Merge(const Stats& other) { hist_.Merge(other.hist_); done_ += other.done_; + gets_ += other.gets_; + prefixes_ += other.prefixes_; writes_ += other.writes_; deletes_ += other.deletes_; - gets_ += other.gets_; + iterator_size_sums_ += other.iterator_size_sums_; founds_ += other.founds_; errors_ += other.errors_; bytes_ += other.bytes_; @@ -277,15 +304,20 @@ class Stats { bytes_ += nbytes; } - void AddDeletes(int n) { - deletes_ += n; - } - void AddGets(int ngets, int nfounds) { founds_ += nfounds; gets_ += ngets; } + void AddPrefixes(int nprefixes, int count) { + prefixes_ += nprefixes; + iterator_size_sums_ += count; + } + + void AddDeletes(int n) { + deletes_ += n; + } + void AddErrors(int n) { errors_ += n; } @@ -310,6 +342,9 @@ class Stats { fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_); fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_); fprintf(stdout, "%-12s: %ld/%ld gets found the key\n", "", founds_, gets_); + fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_); + fprintf(stdout, "%-12s: Iterator size sum is %ld\n", "", + iterator_size_sums_); fprintf(stdout, "%-12s: Got errors %ld times\n", "", errors_); if (FLAGS_histogram) { @@ -492,6 +527,9 @@ class StressTest { filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), + prefix_extractor_(NewFixedPrefixTransform( + FLAGS_test_batches_snapshots ? + sizeof(long) : sizeof(long)-1)), db_(nullptr), num_times_reopened_(0) { if (FLAGS_destroy_db_initially) { @@ -509,6 +547,7 @@ class StressTest { ~StressTest() { delete db_; delete filter_policy_; + delete prefix_extractor_; } void Run() { @@ -733,6 +772,82 @@ class StressTest { return s; } + // Given a prefix P, this does prefix scans for "0"+P, "1"+P,..."9"+P + // in the same snapshot. Each of these 10 scans returns a series of + // values; each series should be the same length, and it is verified + // for each index i that all the i'th values are of the form "0"+V, + // "1"+V,..."9"+V. + // ASSUMES that MultiPut was used to put (K, V) + Status MultiPrefixScan(ThreadState* thread, + const ReadOptions& readoptions, + const Slice& prefix) { + std::string prefixes[10] = {"0", "1", "2", "3", "4", + "5", "6", "7", "8", "9"}; + Slice prefix_slices[10]; + ReadOptions readoptionscopy[10]; + const Snapshot* snapshot = db_->GetSnapshot(); + Iterator* iters[10]; + Status s = Status::OK(); + for (int i = 0; i < 10; i++) { + prefixes[i] += prefix.ToString(); + prefix_slices[i] = prefixes[i]; + readoptionscopy[i] = readoptions; + readoptionscopy[i].prefix = &prefix_slices[i]; + readoptionscopy[i].snapshot = snapshot; + iters[i] = db_->NewIterator(readoptionscopy[i]); + iters[i]->SeekToFirst(); + } + + int count = 0; + while (iters[0]->Valid()) { + count++; + std::string values[10]; + // get list of all values for this iteration + for (int i = 0; i < 10; i++) { + // no iterator should finish before the first one + assert(iters[i]->Valid()); + values[i] = iters[i]->value().ToString(); + + char expected_first = (prefixes[i])[0]; + char actual_first = (values[i])[0]; + + if (actual_first != expected_first) { + fprintf(stderr, "error expected first = %c actual = %c\n", + expected_first, actual_first); + } + (values[i])[0] = ' '; // blank out the differing character + } + // make sure all values are equivalent + for (int i = 0; i < 10; i++) { + if (values[i] != values[0]) { + fprintf(stderr, "error : inconsistent values for prefix %s: %s, %s\n", + prefix.ToString().c_str(), values[0].c_str(), + values[i].c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any + } + iters[i]->Next(); + } + } + + // cleanup iterators and snapshot + for (int i = 0; i < 10; i++) { + // if the first iterator finished, they should have all finished + assert(!iters[i]->Valid()); + assert(iters[i]->status().ok()); + delete iters[i]; + } + db_->ReleaseSnapshot(snapshot); + + if (s.ok()) { + thread->stats.AddPrefixes(1, count); + } else { + thread->stats.AddErrors(1); + } + + return s; + } + void OperateDb(ThreadState* thread) { ReadOptions read_opts(FLAGS_verify_checksum, true); WriteOptions write_opts; @@ -764,11 +879,12 @@ class StressTest { } long rand_key = thread->rand.Next() % max_key; - Slice key((char*)&rand_key, sizeof(rand_key)); - //Read:10%;Delete:30%;Write:60% - unsigned int probability_operation = thread->rand.Uniform(100); - if (probability_operation < FLAGS_readpercent) { - // read load + std::string keystr = Key(rand_key); + Slice key = keystr; + int prob_op = thread->rand.Uniform(100); + + // OPERATION read? + if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { if (!FLAGS_test_batches_snapshots) { Status s = db_->Get(read_opts, key, &from_db); if (s.ok()) { @@ -784,19 +900,38 @@ class StressTest { } else { MultiGet(thread, read_opts, key, &from_db); } - } else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) { - //introduce delete load - if (!FLAGS_test_batches_snapshots) { - MutexLock l(thread->shared->GetMutexForKey(rand_key)); - thread->shared->Delete(rand_key); - db_->Delete(write_opts, key); - thread->stats.AddDeletes(1); - } else { - MultiDelete(thread, write_opts, key); - } + } + prob_op -= FLAGS_readpercent; - } else { - // write load + // OPERATION prefix scan? + if (prob_op >= 0 && prob_op < (int)FLAGS_prefixpercent) { + // keys are longs (e.g., 8 bytes), so we let prefixes be + // everything except the last byte. So there will be 2^8=256 + // keys per prefix. + Slice prefix = Slice(key.data(), key.size() - 1); + if (!FLAGS_test_batches_snapshots) { + read_opts.prefix = &prefix; + Iterator* iter = db_->NewIterator(read_opts); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + assert(iter->key().starts_with(prefix)); + count++; + } + assert(count <= 256); + if (iter->status().ok()) { + thread->stats.AddPrefixes(1, count); + } else { + thread->stats.AddErrors(1); + } + delete iter; + } else { + MultiPrefixScan(thread, read_opts, prefix); + } + } + prob_op -= FLAGS_prefixpercent; + + // OPERATION write? + if (prob_op >= 0 && prob_op < (int)FLAGS_writepercent) { uint32_t value_base = thread->rand.Next(); size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); @@ -811,9 +946,23 @@ class StressTest { } else { MultiPut(thread, write_opts, key, v, sz); } - PrintKeyValue(rand_key, value, sz); } + prob_op -= FLAGS_writepercent; + + // OPERATION delete? + if (prob_op >= 0 && prob_op < (int)FLAGS_delpercent) { + if (!FLAGS_test_batches_snapshots) { + MutexLock l(thread->shared->GetMutexForKey(rand_key)); + thread->shared->Delete(rand_key); + db_->Delete(write_opts, key); + thread->stats.AddDeletes(1); + } else { + MultiDelete(thread, write_opts, key); + } + } + prob_op -= FLAGS_delpercent; + thread->stats.FinishedSingleOp(); } thread->stats.Stop(); @@ -840,7 +989,8 @@ class StressTest { void VerifyValue(long key, const ReadOptions &opts, const SharedState &shared, std::string *value_from_db, bool strict=false) const { - Slice k((char*)&key, sizeof(key)); + std::string keystr = Key(key); + Slice k = keystr; char value[100]; uint32_t value_base = shared.Get(key); if (value_base == SharedState::SENTINEL && !strict) { @@ -896,6 +1046,9 @@ class StressTest { } fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str()); fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent); + fprintf(stdout, "Prefix percentage : %d\n", FLAGS_prefixpercent); + fprintf(stdout, "Write percentage : %d\n", FLAGS_writepercent); + fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key); @@ -941,6 +1094,7 @@ class StressTest { options.compaction_style = FLAGS_compaction_style; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; + options.prefix_extractor = prefix_extractor_; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; @@ -1012,6 +1166,7 @@ class StressTest { private: shared_ptr cache_; const FilterPolicy* filter_policy_; + const SliceTransform* prefix_extractor_; DB* db_; StackableDB* sdb_; int num_times_reopened_; @@ -1110,6 +1265,12 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--readpercent=%d%c", &n, &junk) == 1 && (n >= 0 && n <= 100)) { FLAGS_readpercent = n; + } else if (sscanf(argv[i], "--prefixpercent=%d%c", &n, &junk) == 1 && + (n >= 0 && n <= 100)) { + FLAGS_prefixpercent = n; + } else if (sscanf(argv[i], "--writepercent=%d%c", &n, &junk) == 1 && + (n >= 0 && n <= 100)) { + FLAGS_writepercent = n; } else if (sscanf(argv[i], "--delpercent=%d%c", &n, &junk) == 1 && (n >= 0 && n <= 100)) { FLAGS_delpercent = n; @@ -1183,8 +1344,9 @@ int main(int argc, char** argv) { // max number of concurrent compactions. FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); - if ((FLAGS_readpercent + FLAGS_delpercent) > 100) { - fprintf(stderr, "Error: Read + Delete percents > 100!\n"); + if ((FLAGS_readpercent + FLAGS_prefixpercent + + FLAGS_writepercent + FLAGS_delpercent) != 100) { + fprintf(stderr, "Error: Read+Prefix+Write+Delete percents != 100!\n"); exit(1); } if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) {