diff --git a/HISTORY.md b/HISTORY.md index 158a2d03e..599c3f94a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ * By default, checksums are verified on every read from database * Added is_manual_compaction to CompactionFilter::Context * Added "virtual void WaitForJoin() = 0" in class Env +* Removed BackupEngine::DeleteBackupsNewerThan() function ### New Features * If we find one truncated record at the end of the MANIFEST or WAL files, diff --git a/db/compaction.cc b/db/compaction.cc index 79470268d..48866a799 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -191,41 +191,72 @@ void Compaction::ResetNextCompactionIndex() { input_version_->ResetNextCompactionIndex(level_); } -static void InputSummary(std::vector& files, char* output, +/* +for sizes >=10TB, print "XXTB" +for sizes >=10GB, print "XXGB" +etc. +*/ +static void FileSizeSummary(unsigned long long sz, char* output, int len) { + const unsigned long long ull10 = 10; + if (sz >= ull10<<40) { + snprintf(output, len, "%lluTB", sz>>40); + } else if (sz >= ull10<<30) { + snprintf(output, len, "%lluGB", sz>>30); + } else if (sz >= ull10<<20) { + snprintf(output, len, "%lluMB", sz>>20); + } else if (sz >= ull10<<10) { + snprintf(output, len, "%lluKB", sz>>10); + } else { + snprintf(output, len, "%lluB", sz); + } +} + +static int InputSummary(std::vector& files, char* output, int len) { int write = 0; for (unsigned int i = 0; i < files.size(); i++) { int sz = len - write; - int ret = snprintf(output + write, sz, "%lu(%lu) ", - (unsigned long)files.at(i)->number, - (unsigned long)files.at(i)->file_size); + int ret; + char sztxt[16]; + FileSizeSummary((unsigned long long)files.at(i)->file_size, sztxt, 16); + ret = snprintf(output + write, sz, "%lu(%s) ", + (unsigned long)files.at(i)->number, + sztxt); if (ret < 0 || ret >= sz) break; write += ret; } + return write; } void Compaction::Summary(char* output, int len) { int write = snprintf(output, len, - "Base version %lu Base level %d, seek compaction:%d, inputs:", + "Base version %lu Base level %d, seek compaction:%d, inputs: [", (unsigned long)input_version_->GetVersionNumber(), level_, seek_compaction_); - if (write < 0 || write > len) { + if (write < 0 || write >= len) { return; } - char level_low_summary[100]; - InputSummary(inputs_[0], level_low_summary, sizeof(level_low_summary)); - char level_up_summary[100]; - if (inputs_[1].size()) { - InputSummary(inputs_[1], level_up_summary, sizeof(level_up_summary)); - } else { - level_up_summary[0] = '\0'; + write += InputSummary(inputs_[0], output+write, len-write); + if (write < 0 || write >= len) { + return; } - snprintf(output + write, len - write, "[%s],[%s]", - level_low_summary, level_up_summary); + write += snprintf(output+write, len-write, "],["); + if (write < 0 || write >= len) { + return; + } + + if (inputs_[1].size()) { + write += InputSummary(inputs_[1], output+write, len-write); + } + if (write < 0 || write >= len) { + return; + } + + snprintf(output+write, len-write, "]"); } } // namespace rocksdb diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 4a384ffd0..dca1e7e35 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -555,22 +555,27 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version) { version->files_[level].size(), version->LevelFileSummary(&tmp, 0)); // Check for size amplification first. - Compaction* c = PickCompactionUniversalSizeAmp(version, score); - if (c == nullptr) { + Compaction* c; + if ((c = PickCompactionUniversalSizeAmp(version, score)) != nullptr) { + Log(options_->info_log, "Universal: compacting for size amp\n"); + } else { // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. unsigned int ratio = options_->compaction_options_universal.size_ratio; - c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX); - // Size amplification and file size ratios are within configured limits. - // If max read amplification is exceeding configured limits, then force - // compaction without looking at filesize ratios and try to reduce - // the number of files to fewer than level0_file_num_compaction_trigger. - if (c == nullptr) { + if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX)) != nullptr) { + Log(options_->info_log, "Universal: compacting for size ratio\n"); + } else { + // Size amplification and file size ratios are within configured limits. + // If max read amplification is exceeding configured limits, then force + // compaction without looking at filesize ratios and try to reduce + // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = version->files_[level].size() - options_->level0_file_num_compaction_trigger; - c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files); + if ((c = PickCompactionUniversalReadAmp(version, score, UINT_MAX, num_files)) != nullptr) { + Log(options_->info_log, "Universal: compacting for file num\n"); + } } } if (c == nullptr) { @@ -675,14 +680,32 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( if (f->being_compacted) { break; } - // pick files if the total candidate file size (increased by the + // Pick files if the total/last candidate file size (increased by the // specified ratio) is still larger than the next candidate file. + // candidate_size is the total size of files picked so far with the + // default kCompactionStopStyleTotalSize; with + // kCompactionStopStyleSimilarSize, it's simply the size of the last + // picked file. uint64_t sz = (candidate_size * (100L + ratio)) /100; if (sz < f->file_size) { break; + } + if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { + // Similar-size stopping rule: also check the last picked file isn't + // far larger than the next candidate file. + sz = (f->file_size * (100L + ratio)) / 100; + if (sz < candidate_size) { + // If the small file we've encountered begins a run of similar-size + // files, we'll pick them up on a future iteration of the outer + // loop. If it's some lonely straggler, it'll eventually get picked + // by the last-resort read amp strategy which disregards size ratios. + break; + } + candidate_size = f->file_size; + } else { // default kCompactionStopStyleTotalSize + candidate_size += f->file_size; } candidate_count++; - candidate_size += f->file_size; } // Found a series of consecutive files that need compaction. diff --git a/db/db_bench.cc b/db/db_bench.cc index 291a0ce8c..efb6f210f 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -7,6 +7,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#define __STDC_FORMAT_MACROS +#include #include #include #include @@ -187,6 +189,11 @@ DEFINE_int32(max_background_compactions, "The maximum number of concurrent background compactions" " that can occur in parallel."); +DEFINE_int32(max_background_flushes, + rocksdb::Options().max_background_flushes, + "The maximum number of concurrent background flushes" + " that can occur in parallel."); + static rocksdb::CompactionStyle FLAGS_compaction_style_e; DEFINE_int32(compaction_style, (int32_t) rocksdb::Options().compaction_style, "style of compaction: level-based vs universal"); @@ -223,6 +230,8 @@ DEFINE_int32(open_files, rocksdb::Options().max_open_files, DEFINE_int32(bloom_bits, -1, "Bloom filter bits per key. Negative means" " use default settings."); +DEFINE_int32(memtable_bloom_bits, 0, "Bloom filter bits per key for memtable. " + "Negative means no bloom filter."); DEFINE_bool(use_existing_db, false, "If true, do not destroy the existing" " database. If you set this flag and also specify a benchmark that" @@ -487,11 +496,15 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } DEFINE_int32(prefix_size, 0, "control the prefix size for HashSkipList and " "plain table"); +DEFINE_int64(keys_per_prefix, 0, "control average number of keys generated " + "per prefix, 0 means no special handling of the prefix, " + "i.e. use the prefix comes with the generated random number."); enum RepFactory { kSkipList, kPrefixHash, - kVectorRep + kVectorRep, + kHashLinkedList }; enum RepFactory StringToRepFactory(const char* ctype) { assert(ctype); @@ -502,12 +515,15 @@ enum RepFactory StringToRepFactory(const char* ctype) { return kPrefixHash; else if (!strcasecmp(ctype, "vector")) return kVectorRep; + else if (!strcasecmp(ctype, "hash_linkedlist")) + return kHashLinkedList; fprintf(stdout, "Cannot parse memreptable %s\n", ctype); return kSkipList; } static enum RepFactory FLAGS_rep_factory; DEFINE_string(memtablerep, "skip_list", ""); +DEFINE_int64(hash_bucket_count, 1024 * 1024, "hash bucket count"); DEFINE_bool(use_plain_table, false, "if use plain table " "instead of block-based table format"); @@ -593,9 +609,9 @@ class Stats { double start_; double finish_; double seconds_; - long long done_; - long long last_report_done_; - long long next_report_; + int64_t done_; + int64_t last_report_done_; + int64_t next_report_; int64_t bytes_; double last_op_finish_; double last_report_finish_; @@ -672,12 +688,12 @@ class Stats { else if (next_report_ < 100000) next_report_ += 10000; else if (next_report_ < 500000) next_report_ += 50000; else next_report_ += 100000; - fprintf(stderr, "... finished %lld ops%30s\r", done_, ""); + fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, ""); fflush(stderr); } else { double now = FLAGS_env->NowMicros(); fprintf(stderr, - "%s ... thread %d: (%lld,%lld) ops and " + "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and " "(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n", FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(), id_, @@ -773,7 +789,7 @@ struct ThreadState { class Duration { public: - Duration(int max_seconds, long long max_ops) { + Duration(int max_seconds, int64_t max_ops) { max_seconds_ = max_seconds; max_ops_= max_ops; ops_ = 0; @@ -799,8 +815,8 @@ class Duration { private: int max_seconds_; - long long max_ops_; - long long ops_; + int64_t max_ops_; + int64_t ops_; double start_at_; }; @@ -811,24 +827,27 @@ class Benchmark { const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; DB* db_; - long long num_; + int64_t num_; int value_size_; int key_size_; + int prefix_size_; + int64_t keys_per_prefix_; int entries_per_batch_; WriteOptions write_options_; - long long reads_; - long long writes_; - long long readwrites_; - long long merge_keys_; + int64_t reads_; + int64_t writes_; + int64_t readwrites_; + int64_t merge_keys_; int heap_counter_; - char keyFormat_[100]; // will contain the format of key. e.g "%016d" void PrintHeader() { PrintEnvironment(); fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size); fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n", FLAGS_value_size, static_cast(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); - fprintf(stdout, "Entries: %lld\n", num_); + fprintf(stdout, "Entries: %" PRIu64 "\n", num_); + fprintf(stdout, "Prefix: %d bytes\n", FLAGS_prefix_size); + fprintf(stdout, "Keys per prefix: %" PRIu64 "\n", keys_per_prefix_); fprintf(stdout, "RawSize: %.1f MB (estimated)\n", ((static_cast(FLAGS_key_size + FLAGS_value_size) * num_) / 1048576.0)); @@ -856,7 +875,7 @@ class Benchmark { case rocksdb::kLZ4HCCompression: fprintf(stdout, "Compression: lz4hc\n"); break; - } + } switch (FLAGS_rep_factory) { case kPrefixHash: @@ -868,6 +887,9 @@ class Benchmark { case kVectorRep: fprintf(stdout, "Memtablerep: vector\n"); break; + case kHashLinkedList: + fprintf(stdout, "Memtablerep: hash_linkedlist\n"); + break; } fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level); @@ -1000,12 +1022,13 @@ class Benchmark { filter_policy_(FLAGS_bloom_bits >= 0 ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), - prefix_extractor_(NewFixedPrefixTransform(FLAGS_use_plain_table ? - FLAGS_prefix_size : FLAGS_key_size-1)), + prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)), db_(nullptr), num_(FLAGS_num), value_size_(FLAGS_value_size), key_size_(FLAGS_key_size), + prefix_size_(FLAGS_prefix_size), + keys_per_prefix_(FLAGS_keys_per_prefix), entries_per_batch_(1), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes), @@ -1014,6 +1037,11 @@ class Benchmark { ), merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), heap_counter_(0) { + if (FLAGS_prefix_size > FLAGS_key_size) { + fprintf(stderr, "prefix size is larger than key size"); + exit(1); + } + std::vector files; FLAGS_env->GetChildren(FLAGS_db, &files); for (unsigned int i = 0; i < files.size(); i++) { @@ -1032,17 +1060,55 @@ class Benchmark { delete prefix_extractor_; } - //this function will construct string format for key. e.g "%016lld" - void ConstructStrFormatForKey(char* str, int keySize) { - str[0] = '%'; - str[1] = '0'; - sprintf(str+2, "%dlld%s", keySize, "%s"); - } + // Generate key according to the given specification and random number. + // The resulting key will have the following format (if keys_per_prefix_ + // is positive), extra trailing bytes are either cut off or paddd with '0'. + // The prefix value is derived from key value. + // ---------------------------- + // | prefix 00000 | key 00000 | + // ---------------------------- + // If keys_per_prefix_ is 0, the key is simply a binary representation of + // random number followed by trailing '0's + // ---------------------------- + // | key 00000 | + // ---------------------------- + std::string GenerateKeyFromInt(uint64_t v, int64_t num_keys) { + std::string key; + key.resize(key_size_); + char* start = &(key[0]); + char* pos = start; + if (keys_per_prefix_ > 0) { + int64_t num_prefix = num_keys / keys_per_prefix_; + int64_t prefix = v % num_prefix; + int bytes_to_fill = std::min(prefix_size_, 8); + if (port::kLittleEndian) { + for (int i = 0; i < bytes_to_fill; ++i) { + pos[i] = (prefix >> ((bytes_to_fill - i - 1) << 3)) & 0xFF; + } + } else { + memcpy(pos, static_cast(&prefix), bytes_to_fill); + } + if (prefix_size_ > 8) { + // fill the rest with 0s + memset(pos + 8, '0', prefix_size_ - 8); + } + pos += prefix_size_; + } - unique_ptr GenerateKeyFromInt(long long v, const char* suffix = "") { - unique_ptr keyInStr(new char[kMaxKeySize + 1]); - snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix); - return keyInStr; + int bytes_to_fill = std::min(key_size_ - static_cast(pos - start), 8); + if (port::kLittleEndian) { + for (int i = 0; i < bytes_to_fill; ++i) { + pos[i] = (v >> ((bytes_to_fill - i - 1) << 3)) & 0xFF; + } + } else { + memcpy(pos, static_cast(&v), bytes_to_fill); + } + pos += bytes_to_fill; + if (key_size_ > pos - start) { + memset(pos, '0', key_size_ - (pos - start)); + } + + return key; } void Run() { @@ -1066,7 +1132,6 @@ class Benchmark { writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes); value_size_ = FLAGS_value_size; key_size_ = FLAGS_key_size; - ConstructStrFormatForKey(keyFormat_, key_size_); entries_per_batch_ = 1; write_options_ = WriteOptions(); if (FLAGS_sync) { @@ -1469,12 +1534,14 @@ class Benchmark { options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; + options.max_background_flushes = FLAGS_max_background_flushes; options.compaction_style = FLAGS_compaction_style_e; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.prefix_extractor = (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) ? prefix_extractor_ : nullptr; + options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; @@ -1488,19 +1555,26 @@ class Benchmark { options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; options.filter_deletes = FLAGS_filter_deletes; - if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) { - fprintf(stderr, "prefix_size should be non-zero iff memtablerep " - "== prefix_hash\n"); + if ((FLAGS_prefix_size == 0) && (FLAGS_rep_factory == kPrefixHash || + FLAGS_rep_factory == kHashLinkedList)) { + fprintf(stderr, "prefix_size should be non-zero if PrefixHash or " + "HashLinkedList memtablerep is used\n"); exit(1); } switch (FLAGS_rep_factory) { case kPrefixHash: options.memtable_factory.reset(NewHashSkipListRepFactory( - NewFixedPrefixTransform(FLAGS_prefix_size))); + prefix_extractor_, + FLAGS_hash_bucket_count)); break; case kSkipList: // no need to do anything break; + case kHashLinkedList: + options.memtable_factory.reset(NewHashLinkListRepFactory( + prefix_extractor_, + FLAGS_hash_bucket_count)); + break; case kVectorRep: options.memtable_factory.reset( new VectorRepFactory @@ -1508,7 +1582,8 @@ class Benchmark { break; } if (FLAGS_use_plain_table) { - if (FLAGS_rep_factory != kPrefixHash) { + if (FLAGS_rep_factory != kPrefixHash && + FLAGS_rep_factory != kHashLinkedList) { fprintf(stderr, "Waring: plain table is used with skipList\n"); } if (!FLAGS_mmap_read && !FLAGS_mmap_write) { @@ -1688,7 +1763,7 @@ class Benchmark { void DoWrite(ThreadState* thread, WriteMode write_mode) { const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0; - const int num_ops = writes_ == 0 ? num_ : writes_ ; + const int64_t num_ops = writes_ == 0 ? num_ : writes_; Duration duration(test_duration, num_ops); unique_ptr bit_set; @@ -1698,7 +1773,7 @@ class Benchmark { if (num_ != FLAGS_num) { char msg[100]; - snprintf(msg, sizeof(msg), "(%lld ops)", num_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " ops)", num_); thread->stats.AddMessage(msg); } @@ -1710,7 +1785,7 @@ class Benchmark { while (!duration.Done(entries_per_batch_)) { batch.Clear(); for (int j = 0; j < entries_per_batch_; j++) { - long long k = 0; + int64_t k = 0; switch(write_mode) { case SEQUENTIAL: k = i +j; @@ -1720,7 +1795,7 @@ class Benchmark { break; case UNIQUE_RANDOM: { - const long long t = thread->rand.Next() % FLAGS_num; + const int64_t t = thread->rand.Next() % FLAGS_num; if (!bit_set->test(t)) { // best case k = t; @@ -1748,9 +1823,9 @@ class Benchmark { break; } }; - unique_ptr key = GenerateKeyFromInt(k); - batch.Put(key.get(), gen.Generate(value_size_)); - bytes += value_size_ + strlen(key.get()); + std::string key = GenerateKeyFromInt(k, FLAGS_num); + batch.Put(key, gen.Generate(value_size_)); + bytes += value_size_ + key.size(); thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); @@ -1765,7 +1840,7 @@ class Benchmark { void ReadSequential(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); - long long i = 0; + int64_t i = 0; int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { bytes += iter->key().size() + iter->value().size(); @@ -1778,7 +1853,7 @@ class Benchmark { void ReadReverse(ThreadState* thread) { Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); - long long i = 0; + int64_t i = 0; int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { bytes += iter->key().size() + iter->value().size(); @@ -1792,20 +1867,20 @@ class Benchmark { // Calls MultiGet over a list of keys from a random distribution. // Returns the total number of keys found. long MultiGetRandom(ReadOptions& options, int num_keys, - Random64& rand, long long range, const char* suffix) { + Random64* rand, int64_t range, const char* suffix) { assert(num_keys > 0); std::vector keys(num_keys); std::vector values(num_keys); - std::vector > gen_keys(num_keys); + std::vector gen_keys(num_keys); int i; - long long k; + int64_t k; // Fill the keys vector for(i=0; iNext() % range; + gen_keys[i] = GenerateKeyFromInt(k, range) + suffix; + keys[i] = gen_keys[i]; } if (FLAGS_use_snapshot) { @@ -1841,7 +1916,7 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); Duration duration(FLAGS_duration, reads_); - long long found = 0; + int64_t found = 0; if (FLAGS_use_multiget) { // MultiGet const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group @@ -1850,7 +1925,8 @@ class Benchmark { // Recalculate number of keys per group, and call MultiGet until done long num_keys; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { - found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, ""); + found += + MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ""); thread->stats.FinishedSingleOp(db_); keys_left -= num_keys; } @@ -1858,11 +1934,11 @@ class Benchmark { options.tailing = true; Iterator* iter = db_->NewIterator(options); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); - iter->Seek(key.get()); - if (iter->Valid() && iter->key().compare(Slice(key.get())) == 0) { + iter->Seek(key); + if (iter->Valid() && iter->key().compare(Slice(key)) == 0) { ++found; } @@ -1870,33 +1946,34 @@ class Benchmark { } delete iter; } else { // Regular case. Do one "get" at a time Get + options.tailing = true; + options.prefix_seek = (FLAGS_prefix_size == 0); Iterator* iter = db_->NewIterator(options); std::string value; while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (FLAGS_use_snapshot) { options.snapshot = db_->GetSnapshot(); } if (FLAGS_read_range < 2) { - if (db_->Get(options, key.get(), &value).ok()) { + if (db_->Get(options, key, &value).ok()) { found++; } } else { - Slice skey(key.get()); int count = 1; if (FLAGS_get_approx) { - unique_ptr key2 = - GenerateKeyFromInt(k + (int) FLAGS_read_range); - Slice skey2(key2.get()); - Range range(skey, skey2); + std::string key2 = + GenerateKeyFromInt(k + static_cast(FLAGS_read_range), + FLAGS_num + FLAGS_read_range); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } - for (iter->Seek(skey); + for (iter->Seek(key); iter->Valid() && count <= FLAGS_read_range; ++count, iter->Next()) { found++; @@ -1915,8 +1992,14 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, reads_); + thread->stats.AddMessage(msg); + + if (FLAGS_perf_level > 0) { + thread->stats.AddMessage(perf_context.ToString()); + } } void PrefixScanRandom(ThreadState* thread) { @@ -1928,13 +2011,13 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); Duration duration(FLAGS_duration, reads_); - long long found = 0; + int64_t found = 0; while (!duration.Done(1)) { std::string value; const int k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - Slice skey(key.get()); + std::string key = GenerateKeyFromInt(k, FLAGS_num); + Slice skey(key); Slice prefix = prefix_extractor_->Transform(skey); options.prefix = FLAGS_use_prefix_api ? &prefix : nullptr; @@ -1950,7 +2033,8 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, reads_); thread->stats.AddMessage(msg); } @@ -1968,7 +2052,8 @@ class Benchmark { long num_keys; long found; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { - found = MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, "."); + found = + MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "."); // We should not find any key since the key we try to get has a // different suffix @@ -1983,9 +2068,9 @@ class Benchmark { std::string value; Status s; while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k, "."); - s = db_->Get(options, key.get(), &value); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num) + "."; + s = db_->Get(options, key, &value); assert(!s.ok() && s.IsNotFound()); thread->stats.FinishedSingleOp(db_); } @@ -1995,26 +2080,26 @@ class Benchmark { void ReadHot(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); - const long long range = (FLAGS_num + 99) / 100; - long long found = 0; + const int64_t range = (FLAGS_num + 99) / 100; + int64_t found = 0; if (FLAGS_use_multiget) { - const long long kpg = FLAGS_keys_per_multiget; // keys per multiget group - long long keys_left = reads_; + const int64_t kpg = FLAGS_keys_per_multiget; // keys per multiget group + int64_t keys_left = reads_; // Recalculate number of keys per group, and call MultiGet until done long num_keys; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { - found += MultiGetRandom(options, num_keys, thread->rand, range, ""); + found += MultiGetRandom(options, num_keys, &thread->rand, range, ""); thread->stats.FinishedSingleOp(db_); keys_left -= num_keys; } } else { std::string value; while (!duration.Done(1)) { - const long long k = thread->rand.Next() % range; - unique_ptr key = GenerateKeyFromInt(k); - if (db_->Get(options, key.get(), &value).ok()){ + const int64_t k = thread->rand.Next() % range; + std::string key = GenerateKeyFromInt(k, range); + if (db_->Get(options, key, &value).ok()) { ++found; } thread->stats.FinishedSingleOp(db_); @@ -2022,7 +2107,8 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, reads_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, reads_); thread->stats.AddMessage(msg); } @@ -2040,18 +2126,19 @@ class Benchmark { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); std::string value; - long long found = 0; + int64_t found = 0; while (!duration.Done(1)) { Iterator* iter = db_->NewIterator(options); - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - iter->Seek(key.get()); - if (iter->Valid() && iter->key() == key.get()) found++; + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); + iter->Seek(key); + if (iter->Valid() && iter->key() == Slice(key)) found++; delete iter; thread->stats.FinishedSingleOp(db_); } char msg[100]; - snprintf(msg, sizeof(msg), "(%lld of %lld found)", found, num_); + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + found, num_); thread->stats.AddMessage(msg); } @@ -2063,9 +2150,9 @@ class Benchmark { while (!duration.Done(entries_per_batch_)) { batch.Clear(); for (int j = 0; j < entries_per_batch_; j++) { - const long long k = seq ? i+j : (thread->rand.Next() % FLAGS_num); - unique_ptr key = GenerateKeyFromInt(k); - batch.Delete(key.get()); + const int64_t k = seq ? i+j : (thread->rand.Next() % FLAGS_num); + std::string key = GenerateKeyFromInt(k, FLAGS_num); + batch.Delete(key); thread->stats.FinishedSingleOp(db_); } s = db_->Write(write_options_, &batch); @@ -2113,10 +2200,9 @@ class Benchmark { } } - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - Status s = db_->Put(write_options_, key.get(), - gen.Generate(value_size_)); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2228,18 +2314,18 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long found = 0; + int64_t found = 0; int get_weight = 0; int put_weight = 0; int delete_weight = 0; - long long gets_done = 0; - long long puts_done = 0; - long long deletes_done = 0; + int64_t gets_done = 0; + int64_t puts_done = 0; + int64_t deletes_done = 0; // the number of iterations is the larger of read_ or write_ - for (long long i = 0; i < readwrites_; i++) { - const long long k = thread->rand.Next() % (FLAGS_numdistinct); - unique_ptr key = GenerateKeyFromInt(k); + for (int64_t i = 0; i < readwrites_; i++) { + const int64_t k = thread->rand.Next() % (FLAGS_numdistinct); + std::string key = GenerateKeyFromInt(k, FLAGS_numdistinct); if (get_weight == 0 && put_weight == 0 && delete_weight == 0) { // one batch completed, reinitialize for next batch get_weight = FLAGS_readwritepercent; @@ -2248,7 +2334,7 @@ class Benchmark { } if (get_weight > 0) { // do all the gets first - Status s = GetMany(options, key.get(), &value); + Status s = GetMany(options, key, &value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "getmany error: %s\n", s.ToString().c_str()); // we continue after error rather than exiting so that we can @@ -2261,8 +2347,7 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Status s = PutMany(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = PutMany(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "putmany error: %s\n", s.ToString().c_str()); exit(1); @@ -2270,7 +2355,7 @@ class Benchmark { put_weight--; puts_done++; } else if (delete_weight > 0) { - Status s = DeleteMany(write_options_, key.get()); + Status s = DeleteMany(write_options_, key); if (!s.ok()) { fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str()); exit(1); @@ -2283,7 +2368,8 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "( get:%lld put:%lld del:%lld total:%lld found:%lld)", + "( get:%" PRIu64 " put:%" PRIu64 " del:%" PRIu64 " total:%" \ + PRIu64 " found:%" PRIu64 ")", gets_done, puts_done, deletes_done, readwrites_, found); thread->stats.AddMessage(msg); } @@ -2300,17 +2386,17 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long found = 0; + int64_t found = 0; int get_weight = 0; int put_weight = 0; - long long reads_done = 0; - long long writes_done = 0; + int64_t reads_done = 0; + int64_t writes_done = 0; Duration duration(FLAGS_duration, readwrites_); // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (get_weight == 0 && put_weight == 0) { // one batch completed, reinitialize for next batch get_weight = FLAGS_readwritepercent; @@ -2323,17 +2409,14 @@ class Benchmark { } if (FLAGS_get_approx) { - char key2[100]; - snprintf(key2, sizeof(key2), "%016lld", k + 1); - Slice skey2(key2); - Slice skey(key2); - Range range(skey, skey2); + std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } // do all the gets first - Status s = db_->Get(options, key.get(), &value); + Status s = db_->Get(options, key, &value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "get error: %s\n", s.ToString().c_str()); // we continue after error rather than exiting so that we can @@ -2352,8 +2435,7 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Status s = db_->Put(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2364,8 +2446,8 @@ class Benchmark { thread->stats.FinishedSingleOp(db_); } char msg[100]; - snprintf(msg, sizeof(msg), - "( reads:%lld writes:%lld total:%lld found:%lld)", + snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \ + " total:%" PRIu64 " found:%" PRIu64 ")", reads_done, writes_done, readwrites_, found); thread->stats.AddMessage(msg); } @@ -2388,10 +2470,10 @@ class Benchmark { long num_keys; // number of keys to read in current group long num_put_keys; // number of keys to put in current group - long found = 0; - long reads_done = 0; - long writes_done = 0; - long multigets_done = 0; + int64_t found = 0; + int64_t reads_done = 0; + int64_t writes_done = 0; + int64_t multigets_done = 0; // the number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); @@ -2415,18 +2497,18 @@ class Benchmark { assert(num_keys + num_put_keys <= keys_left); // Apply the MultiGet operations - found += MultiGetRandom(options, num_keys, thread->rand, FLAGS_num, ""); + found += MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ""); ++multigets_done; reads_done+=num_keys; thread->stats.FinishedSingleOp(db_); // Now do the puts int i; - long long k; + int64_t k; for(i=0; irand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); - Status s = db_->Put(write_options_, key.get(), + std::string key = GenerateKeyFromInt(k, FLAGS_num); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); @@ -2440,7 +2522,8 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "( reads:%ld writes:%ld total:%lld multiget_ops:%ld found:%ld)", + "( reads:%" PRIu64 " writes:%" PRIu64 " total:%" PRIu64 \ + " multiget_ops:%" PRIu64 " found:%" PRIu64 ")", reads_done, writes_done, readwrites_, multigets_done, found); thread->stats.AddMessage(msg); } @@ -2451,29 +2534,26 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long found = 0; + int64_t found = 0; Duration duration(FLAGS_duration, readwrites_); // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (FLAGS_use_snapshot) { options.snapshot = db_->GetSnapshot(); } if (FLAGS_get_approx) { - char key2[100]; - snprintf(key2, sizeof(key2), "%016lld", k + 1); - Slice skey2(key2); - Slice skey(key2); - Range range(skey, skey2); + std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } - if (db_->Get(options, key.get(), &value).ok()) { + if (db_->Get(options, key, &value).ok()) { found++; } @@ -2481,7 +2561,7 @@ class Benchmark { db_->ReleaseSnapshot(options.snapshot); } - Status s = db_->Put(write_options_, key.get(), gen.Generate(value_size_)); + Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2490,7 +2570,7 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "( updates:%lld found:%lld)", readwrites_, found); + "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found); thread->stats.AddMessage(msg); } @@ -2501,30 +2581,27 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long found = 0; + int64_t found = 0; // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % FLAGS_num; + std::string key = GenerateKeyFromInt(k, FLAGS_num); if (FLAGS_use_snapshot) { options.snapshot = db_->GetSnapshot(); } if (FLAGS_get_approx) { - char key2[100]; - snprintf(key2, sizeof(key2), "%016lld", k + 1); - Slice skey2(key2); - Slice skey(key2); - Range range(skey, skey2); + std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1); + Range range(key, key2); uint64_t sizes; db_->GetApproximateSizes(&range, 1, &sizes); } // Get the existing value - if (db_->Get(options, key.get(), &value).ok()) { + if (db_->Get(options, key, &value).ok()) { found++; } else { // If not existing, then just assume an empty string of data @@ -2544,7 +2621,7 @@ class Benchmark { value.append(operand.data(), operand.size()); // Write back to the database - Status s = db_->Put(write_options_, key.get(), value); + Status s = db_->Put(write_options_, key, value); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2552,7 +2629,8 @@ class Benchmark { thread->stats.FinishedSingleOp(db_); } char msg[100]; - snprintf(msg, sizeof(msg), "( updates:%lld found:%ld)", readwrites_, found); + snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")", + readwrites_, found); thread->stats.AddMessage(msg); } @@ -2572,11 +2650,10 @@ class Benchmark { // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % merge_keys_; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % merge_keys_; + std::string key = GenerateKeyFromInt(k, merge_keys_); - Status s = db_->Merge(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = db_->Merge(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); @@ -2587,7 +2664,7 @@ class Benchmark { // Print some statistics char msg[100]; - snprintf(msg, sizeof(msg), "( updates:%lld)", readwrites_); + snprintf(msg, sizeof(msg), "( updates:%" PRIu64 ")", readwrites_); thread->stats.AddMessage(msg); } @@ -2602,23 +2679,22 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; std::string value; - long long num_hits = 0; - long long num_gets = 0; - long long num_merges = 0; + int64_t num_hits = 0; + int64_t num_gets = 0; + int64_t num_merges = 0; size_t max_length = 0; // the number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % merge_keys_; - unique_ptr key = GenerateKeyFromInt(k); + const int64_t k = thread->rand.Next() % merge_keys_; + std::string key = GenerateKeyFromInt(k, merge_keys_); bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent; if (do_merge) { - Status s = db_->Merge(write_options_, key.get(), - gen.Generate(value_size_)); + Status s = db_->Merge(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); exit(1); @@ -2627,7 +2703,7 @@ class Benchmark { num_merges++; } else { - Status s = db_->Get(options, key.get(), &value); + Status s = db_->Get(options, key, &value); if (value.length() > max_length) max_length = value.length(); @@ -2647,7 +2723,8 @@ class Benchmark { } char msg[100]; snprintf(msg, sizeof(msg), - "(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)", + "(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64 " hits:%" \ + PRIu64 " maxlength:%zu)", num_gets, num_merges, readwrites_, num_hits, max_length); thread->stats.AddMessage(msg); } diff --git a/db/db_impl.cc b/db/db_impl.cc index d0ae32bcc..49e3e2830 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2330,7 +2330,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->compaction->level(), compact->compaction->num_input_files(1), compact->compaction->output_level(), compact->compaction->score(), options_.max_background_compactions - bg_compaction_scheduled_); - char scratch[256]; + char scratch[2345]; compact->compaction->Summary(scratch, sizeof(scratch)); Log(options_.info_log, "Compaction start summary: %s\n", scratch); @@ -2727,10 +2727,11 @@ struct IterState { static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); - DBImpl::DeletionState deletion_state; bool need_cleanup = state->super_version->Unref(); if (need_cleanup) { + DBImpl::DeletionState deletion_state; + state->mu->Lock(); state->super_version->Cleanup(); state->db->FindObsoleteFiles(deletion_state, false, true); diff --git a/db/db_test.cc b/db/db_test.cc index ca5ea4c29..80622c94d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1470,6 +1470,23 @@ TEST(DBTest, FilterDeletes) { } while (ChangeCompactOptions()); } + +TEST(DBTest, IterSeekBeforePrev) { + ASSERT_OK(Put("a", "b")); + ASSERT_OK(Put("c", "d")); + dbfull()->Flush(FlushOptions()); + ASSERT_OK(Put("0", "f")); + ASSERT_OK(Put("1", "h")); + dbfull()->Flush(FlushOptions()); + ASSERT_OK(Put("2", "j")); + auto iter = db_->NewIterator(ReadOptions()); + iter->Seek(Slice("c")); + iter->Prev(); + iter->Seek(Slice("a")); + iter->Prev(); + delete iter; +} + TEST(DBTest, IterEmpty) { do { CreateAndReopenWithCF({"pikachu"}); @@ -2552,6 +2569,89 @@ TEST(DBTest, UniversalCompactionOptions) { } } +TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 100<<10; //100KB + // trigger compaction if there are >= 4 files + options.level0_file_num_compaction_trigger = 4; + options.compaction_options_universal.size_ratio = 10; + options.compaction_options_universal.stop_style = kCompactionStopStyleSimilarSize; + options.num_levels=1; + Reopen(&options); + + Random rnd(301); + int key_idx = 0; + + // Stage 1: + // Generate a set of files at level 0, but don't trigger level-0 + // compaction. + for (int num = 0; + num < options.level0_file_num_compaction_trigger-1; + num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0), num + 1); + } + + // Generate one more file at level-0, which should trigger level-0 + // compaction. + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForCompact(); + // Suppose each file flushed from mem table has size 1. Now we compact + // (level0_file_num_compaction_trigger+1)=4 files and should have a big + // file of size 4. + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + + // Stage 2: + // Now we have one file at level 0, with size 4. We also have some data in + // mem table. Let's continue generating new files at level 0, but don't + // trigger level-0 compaction. + // First, clean up memtable before inserting new data. This will generate + // a level-0 file, with size around 0.4 (according to previously written + // data amount). + dbfull()->Flush(FlushOptions()); + for (int num = 0; + num < options.level0_file_num_compaction_trigger-3; + num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0), num + 3); + } + + // Generate one more file at level-0, which should trigger level-0 + // compaction. + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForCompact(); + // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. + // After compaction, we should have 3 files, with size 4, 0.4, 2. + ASSERT_EQ(NumTableFilesAtLevel(0), 3); + // Stage 3: + // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one + // more file at level-0, which should trigger level-0 compaction. + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForCompact(); + // Level-0 compaction is triggered, but no file will be picked up. + ASSERT_EQ(NumTableFilesAtLevel(0), 4); +} + #if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2) TEST(DBTest, CompressedCache) { int num_iter = 80; diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 551ca8fe6..61adad6b7 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -7,6 +7,7 @@ #define STORAGE_ROCKSDB_INCLUDE_PERF_CONTEXT_H #include +#include namespace rocksdb { @@ -26,6 +27,8 @@ struct PerfContext { void Reset(); // reset all performance counters to zero + std::string ToString() const; + uint64_t user_key_comparison_count; // total number of user key comparisons uint64_t block_cache_hit_count; // total number of block cache hits uint64_t block_read_count; // total number of block reads (with IO) diff --git a/include/utilities/backupable_db.h b/include/utilities/backupable_db.h index ab3a1ed80..abf05978c 100644 --- a/include/utilities/backupable_db.h +++ b/include/utilities/backupable_db.h @@ -58,14 +58,13 @@ struct BackupableDBOptions { explicit BackupableDBOptions(const std::string& _backup_dir, Env* _backup_env = nullptr, bool _share_table_files = true, - Logger* _info_log = nullptr, - bool _sync = true, - bool _destroy_old_data = false) : - backup_dir(_backup_dir), - backup_env(_backup_env), - info_log(_info_log), - sync(_sync), - destroy_old_data(_destroy_old_data) { } + Logger* _info_log = nullptr, bool _sync = true, + bool _destroy_old_data = false) + : backup_dir(_backup_dir), + backup_env(_backup_env), + info_log(_info_log), + sync(_sync), + destroy_old_data(_destroy_old_data) {} }; typedef uint32_t BackupID; @@ -99,8 +98,6 @@ class BackupEngine { const std::string& wal_dir) = 0; virtual Status RestoreDBFromLatestBackup(const std::string& db_dir, const std::string& wal_dir) = 0; - - virtual void DeleteBackupsNewerThan(uint64_t sequence_number) = 0; }; // Stack your DB with BackupableDB to be able to backup the DB @@ -138,32 +135,33 @@ class BackupableDB : public StackableDB { // Use this class to access information about backups and restore from them class RestoreBackupableDB { - public: - RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options); - ~RestoreBackupableDB(); + public: + RestoreBackupableDB(Env* db_env, const BackupableDBOptions& options); + ~RestoreBackupableDB(); - // Returns info about backups in backup_info - void GetBackupInfo(std::vector* backup_info); + // Returns info about backups in backup_info + void GetBackupInfo(std::vector* backup_info); - // restore from backup with backup_id - // IMPORTANT -- if options_.share_table_files == true and you restore DB - // from some backup that is not the latest, and you start creating new - // backups from the new DB, all the backups that were newer than the - // backup you restored from will be deleted - // - // Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3. - // If you try creating a new backup now, old backups 4 and 5 will be deleted - // and new backup with ID 4 will be created. - Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir, - const std::string& wal_dir); + // restore from backup with backup_id + // IMPORTANT -- if options_.share_table_files == true and you restore DB + // from some backup that is not the latest, and you start creating new + // backups from the new DB, they will probably fail + // + // Example: Let's say you have backups 1, 2, 3, 4, 5 and you restore 3. + // If you add new data to the DB and try creating a new backup now, the + // database will diverge from backups 4 and 5 and the new backup will fail. + // If you want to create new backup, you will first have to delete backups 4 + // and 5. + Status RestoreDBFromBackup(BackupID backup_id, const std::string& db_dir, + const std::string& wal_dir); - // restore from the latest backup - Status RestoreDBFromLatestBackup(const std::string& db_dir, - const std::string& wal_dir); - // deletes old backups, keeping latest num_backups_to_keep alive - Status PurgeOldBackups(uint32_t num_backups_to_keep); - // deletes a specific backup - Status DeleteBackup(BackupID backup_id); + // restore from the latest backup + Status RestoreDBFromLatestBackup(const std::string& db_dir, + const std::string& wal_dir); + // deletes old backups, keeping latest num_backups_to_keep alive + Status PurgeOldBackups(uint32_t num_backups_to_keep); + // deletes a specific backup + Status DeleteBackup(BackupID backup_id); private: BackupEngine* backup_engine_; diff --git a/table/filter_block.cc b/table/filter_block.cc index d7be78e1c..7d1bfccaa 100644 --- a/table/filter_block.cc +++ b/table/filter_block.cc @@ -82,7 +82,6 @@ void FilterBlockBuilder::AddKey(const Slice& key) { Slice prefix = prefix_extractor_->Transform(user_key); InternalKey internal_prefix_tmp(prefix, 0, kTypeValue); Slice internal_prefix = internal_prefix_tmp.Encode(); - assert(comparator_->Compare(internal_prefix, key) <= 0); start_.push_back(entries_.size()); entries_.append(internal_prefix.data(), internal_prefix.size()); } diff --git a/util/perf_context.cc b/util/perf_context.cc index 6833f6836..650abebca 100644 --- a/util/perf_context.cc +++ b/util/perf_context.cc @@ -3,6 +3,8 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // + +#include #include "util/perf_context_imp.h" namespace rocksdb { @@ -38,6 +40,35 @@ void PerfContext::Reset() { write_memtable_time = 0; } +#define OUTPUT(counter) #counter << " = " << counter << ", " + +std::string PerfContext::ToString() const { + std::ostringstream ss; + ss << OUTPUT(user_key_comparison_count) + << OUTPUT(block_cache_hit_count) + << OUTPUT(block_read_count) + << OUTPUT(block_read_byte) + << OUTPUT(block_read_time) + << OUTPUT(block_checksum_time) + << OUTPUT(block_decompress_time) + << OUTPUT(internal_key_skipped_count) + << OUTPUT(internal_delete_skipped_count) + << OUTPUT(write_wal_time) + << OUTPUT(get_snapshot_time) + << OUTPUT(get_from_memtable_time) + << OUTPUT(get_from_memtable_count) + << OUTPUT(get_post_process_time) + << OUTPUT(get_from_output_files_time) + << OUTPUT(seek_child_seek_time) + << OUTPUT(seek_child_seek_count) + << OUTPUT(seek_min_heap_time) + << OUTPUT(seek_internal_seek_time) + << OUTPUT(find_next_user_entry_time) + << OUTPUT(write_pre_and_post_process_time) + << OUTPUT(write_memtable_time); + return ss.str(); +} + __thread PerfContext perf_context; } diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 833140209..99bf1a2d9 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -46,8 +46,6 @@ class BackupEngineImpl : public BackupEngine { return RestoreDBFromBackup(latest_backup_id_, db_dir, wal_dir); } - void DeleteBackupsNewerThan(uint64_t sequence_number); - private: struct FileInfo { FileInfo(const std::string& fname, uint64_t sz, uint32_t checksum) @@ -185,6 +183,12 @@ class BackupEngineImpl : public BackupEngine { Env* db_env_; Env* backup_env_; + // directories + unique_ptr backup_directory_; + unique_ptr shared_directory_; + unique_ptr meta_directory_; + unique_ptr private_directory_; + static const size_t copy_file_buffer_size_ = 5 * 1024 * 1024LL; // 5MB }; @@ -203,11 +207,17 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, // create all the dirs we need backup_env_->CreateDirIfMissing(GetAbsolutePath()); + backup_env_->NewDirectory(GetAbsolutePath(), &backup_directory_); if (options_.share_table_files) { backup_env_->CreateDirIfMissing(GetAbsolutePath(GetSharedFileRel())); + backup_env_->NewDirectory(GetAbsolutePath(GetSharedFileRel()), + &shared_directory_); } backup_env_->CreateDirIfMissing(GetAbsolutePath(GetPrivateDirRel())); + backup_env_->NewDirectory(GetAbsolutePath(GetPrivateDirRel()), + &private_directory_); backup_env_->CreateDirIfMissing(GetBackupMetaDir()); + backup_env_->NewDirectory(GetBackupMetaDir(), &meta_directory_); std::vector backup_meta_files; backup_env_->GetChildren(GetBackupMetaDir(), &backup_meta_files); @@ -279,26 +289,6 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); } -void BackupEngineImpl::DeleteBackupsNewerThan(uint64_t sequence_number) { - for (auto backup : backups_) { - if (backup.second.GetSequenceNumber() > sequence_number) { - Log(options_.info_log, - "Deleting backup %u because sequence number (%" PRIu64 - ") is newer than %" PRIu64 "", - backup.first, backup.second.GetSequenceNumber(), sequence_number); - backup.second.Delete(); - obsolete_backups_.push_back(backup.first); - } - } - for (auto ob : obsolete_backups_) { - backups_.erase(backups_.find(ob)); - } - auto itr = backups_.end(); - latest_backup_id_ = (itr == backups_.begin()) ? 0 : (--itr)->first; - PutLatestBackupFileContents(latest_backup_id_); // Ignore errors - GarbageCollection(false); -} - Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { Status s; std::vector live_files; @@ -347,9 +337,8 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { return Status::Corruption("Can't parse file name. This is very bad"); } // we should only get sst, manifest and current files here - assert(type == kTableFile || - type == kDescriptorFile || - type == kCurrentFile); + assert(type == kTableFile || type == kDescriptorFile || + type == kCurrentFile); // rules: // * if it's kTableFile, than it's shared @@ -393,6 +382,28 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { // install the newly created backup meta! (atomic) s = PutLatestBackupFileContents(new_backup_id); } + if (s.ok() && options_.sync) { + unique_ptr backup_private_directory; + backup_env_->NewDirectory( + GetAbsolutePath(GetPrivateFileRel(new_backup_id, false)), + &backup_private_directory); + if (backup_private_directory != nullptr) { + backup_private_directory->Fsync(); + } + if (private_directory_ != nullptr) { + private_directory_->Fsync(); + } + if (meta_directory_ != nullptr) { + meta_directory_->Fsync(); + } + if (shared_directory_ != nullptr) { + shared_directory_->Fsync(); + } + if (backup_directory_ != nullptr) { + backup_directory_->Fsync(); + } + } + if (!s.ok()) { // clean all the files we might have created Log(options_.info_log, "Backup failed -- %s", s.ToString().c_str()); @@ -590,6 +601,7 @@ Status BackupEngineImpl::CopyFile(const std::string& src, unique_ptr src_file; EnvOptions env_options; env_options.use_mmap_writes = false; + env_options.use_os_buffer = false; if (size != nullptr) { *size = 0; } @@ -705,6 +717,7 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, EnvOptions env_options; env_options.use_mmap_writes = false; + env_options.use_os_buffer = false; std::unique_ptr src_file; Status s = src_env->NewSequentialFile(src, &src_file, env_options); @@ -892,6 +905,9 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( uint64_t size; s = env_->GetFileSize(backup_dir + "/" + filename, &size); + if (!s.ok()) { + return s; + } if (line.empty()) { return Status::Corruption("File checksum is missing"); @@ -912,6 +928,11 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile( files.emplace_back(filename, size, checksum_value); } + if (s.ok() && data.size() > 0) { + // file has to be read completely. if not, we count it as corruption + s = Status::Corruption("Tailing data in backup meta file"); + } + if (s.ok()) { for (const auto& file_info : files) { s = AddFile(file_info); @@ -967,11 +988,7 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) { BackupableDB::BackupableDB(DB* db, const BackupableDBOptions& options) : StackableDB(db), - backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) { - if (options.share_table_files) { - backup_engine_->DeleteBackupsNewerThan(GetLatestSequenceNumber()); - } -} + backup_engine_(new BackupEngineImpl(db->GetEnv(), options)) {} BackupableDB::~BackupableDB() { delete backup_engine_; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 75853e179..5c20579b4 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -715,27 +715,38 @@ TEST(BackupableDBTest, OnlineIntegrationTest) { CloseRestoreDB(); } -TEST(BackupableDBTest, DeleteNewerBackups) { +TEST(BackupableDBTest, FailOverwritingBackups) { + options_.write_buffer_size = 1024 * 1024 * 1024; // 1GB // create backups 1, 2, 3, 4, 5 OpenBackupableDB(true); for (int i = 0; i < 5; ++i) { FillDB(db_.get(), 100 * i, 100 * (i + 1)); - ASSERT_OK(db_->CreateNewBackup(!!(i % 2))); + ASSERT_OK(db_->CreateNewBackup(true)); + CloseBackupableDB(); + OpenBackupableDB(false); } CloseBackupableDB(); - // backup 3 is fine - AssertBackupConsistency(3, 0, 300, 500); - // this should delete backups 4 and 5 - OpenBackupableDB(); - CloseBackupableDB(); - // backups 4 and 5 don't exist + // restore 3 OpenRestoreDB(); - Status s = restore_db_->RestoreDBFromBackup(4, dbname_, dbname_); - ASSERT_TRUE(s.IsNotFound()); - s = restore_db_->RestoreDBFromBackup(5, dbname_, dbname_); - ASSERT_TRUE(s.IsNotFound()); + ASSERT_OK(restore_db_->RestoreDBFromBackup(3, dbname_, dbname_)); CloseRestoreDB(); + + OpenBackupableDB(false); + FillDB(db_.get(), 0, 300); + Status s = db_->CreateNewBackup(true); + // the new backup fails because new table files + // clash with old table files from backups 4 and 5 + // (since write_buffer_size is huge, we can be sure that + // each backup will generate only one sst file and that + // a file generated by a new backup is the same as + // sst file generated by backup 4) + ASSERT_TRUE(s.IsCorruption()); + ASSERT_OK(db_->DeleteBackup(4)); + ASSERT_OK(db_->DeleteBackup(5)); + // now, the backup can succeed + ASSERT_OK(db_->CreateNewBackup(true)); + CloseBackupableDB(); } TEST(BackupableDBTest, NoShareTableFiles) {