From 457c78eb897ff746dac15483844c61eedd331113 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 27 Feb 2014 12:13:48 -0800 Subject: [PATCH] [CF] db_stress for column families Summary: I had this diff for a while to test column families implementation. Last night, I ran it sucessfully for 10 hours with the command: time ./db_stress --threads=30 --ops_per_thread=200000000 --max_key=5000 --column_families=20 --clear_column_family_one_in=3000000 --verify_before_write=1 --reopen=50 --max_background_compactions=10 --max_background_flushes=10 --db=/tmp/db_stress It is ready to be committed :) Test Plan: Ran it for 10 hours Reviewers: dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16797 --- db/column_family_test.cc | 1 - db/db_impl.cc | 37 --- db/db_impl.h | 3 - include/rocksdb/db.h | 2 +- tools/db_stress.cc | 552 +++++++++++++++++++++++---------------- utilities/ttl/db_ttl.cc | 4 - utilities/ttl/db_ttl.h | 3 - 7 files changed, 330 insertions(+), 272 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 4657e9e38..a3b075242 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -322,7 +322,6 @@ TEST(ColumnFamilyTest, AddDrop) { ASSERT_OK(Put(1, "fodor", "mirko")); ASSERT_EQ("mirko", Get(1, "fodor")); ASSERT_EQ("NOT_FOUND", Get(3, "fodor")); - Close(); ASSERT_TRUE(TryOpen({"default"}).IsInvalidArgument()); Open({"default", "one", "three", "four"}); diff --git a/db/db_impl.cc b/db/db_impl.cc index b71e2ae01..1632d0493 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -347,43 +347,6 @@ DBImpl::~DBImpl() { LogFlush(options_.info_log); } -// Do not flush and close database elegantly. Simulate a crash. -void DBImpl::TEST_Destroy_DBImpl() { - // ensure that no new memtable flushes can occur - flush_on_destroy_ = false; - - // wait till all background compactions are done. - mutex_.Lock(); - while (bg_compaction_scheduled_ || - bg_flush_scheduled_ || - bg_logstats_scheduled_) { - bg_cv_.Wait(); - } - - // Prevent new compactions from occuring. - bg_work_gate_closed_ = true; - const int LargeNumber = 10000000; - bg_compaction_scheduled_ += LargeNumber; - - mutex_.Unlock(); - if (default_cf_handle_ != nullptr) { - // we need to delete handle outside of lock because it does its own locking - delete default_cf_handle_; - } - LogFlush(options_.info_log); - - // force release the lock file. - if (db_lock_ != nullptr) { - env_->UnlockFile(db_lock_); - } - - log_.reset(); - mutex_.Lock(); - versions_.reset(); - mutex_.Unlock(); - table_cache_.reset(); -} - uint64_t DBImpl::TEST_Current_Manifest_FileNo() { return versions_->ManifestFileNumber(); } diff --git a/db/db_impl.h b/db/db_impl.h index 1ca799f3b..60836bac0 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -162,9 +162,6 @@ class DBImpl : public DB { int64_t TEST_MaxNextLevelOverlappingBytes(ColumnFamilyHandle* column_family = nullptr); - // Simulate a db crash, no elegant closing of database. - void TEST_Destroy_DBImpl(); - // Return the current manifest file no. uint64_t TEST_Current_Manifest_FileNo(); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 1cb0b2283..431f3bf96 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -43,7 +43,7 @@ struct ColumnFamilyDescriptor { }; // Update Makefile if you change these -static const int kMajorVersion = 2; +static const int kMajorVersion = 3; static const int kMinorVersion = 0; struct Options; diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 4d02bcdc5..7fc9bb339 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -60,12 +60,14 @@ static bool ValidateUint32Range(const char* flagname, uint64_t value) { return true; } DEFINE_uint64(seed, 2341234, "Seed for PRNG"); -static const bool FLAGS_seed_dummy = - google::RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range); +static const bool FLAGS_seed_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_seed, &ValidateUint32Range); -DEFINE_int64(max_key, 1 * KB * KB * KB, +DEFINE_int64(max_key, 1 * KB* KB, "Max number of key/values to place in database"); +DEFINE_int32(column_families, 10, "Number of column families"); + DEFINE_bool(test_batches_snapshots, false, "If set, the test uses MultiGet(), MultiPut() and MultiDelete()" " which read/write/delete multiple keys in a batch. In this mode," @@ -146,6 +148,10 @@ DEFINE_int32(max_background_compactions, "The maximum number of concurrent background compactions " "that can occur in parallel."); +DEFINE_int32(max_background_flushes, rocksdb::Options().max_background_flushes, + "The maximum number of concurrent background flushes " + "that can occur in parallel."); + DEFINE_int32(universal_size_ratio, 0, "The ratio of file sizes that trigger" " compaction in universal style"); @@ -158,6 +164,11 @@ DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact" DEFINE_int32(universal_max_size_amplification_percent, 0, "The max size amplification for universal style compaction"); +DEFINE_int32(clear_column_family_one_in, 1000000, + "With a chance of 1/N, delete a column family and then recreate " + "it again. If N == 0, never drop/create column families. " + "When test_batches_snapshots is true, this flag has no effect"); + DEFINE_int64(cache_size, 2 * KB * KB * KB, "Number of bytes to use as a cache of uncompressed data."); @@ -170,8 +181,8 @@ static bool ValidateInt32Positive(const char* flagname, int32_t value) { return true; } DEFINE_int32(reopen, 10, "Number of times database reopens"); -static const bool FLAGS_reopen_dummy = - google::RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive); +static const bool FLAGS_reopen_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_reopen, &ValidateInt32Positive); DEFINE_int32(bloom_bits, 10, "Bloom filter bits per key. " "Negative means use default settings."); @@ -198,9 +209,9 @@ DEFINE_bool(use_fsync, false, "If true, issue fsync instead of fdatasync"); DEFINE_int32(kill_random_test, 0, "If non-zero, kill at various points in source code with " "probability 1/this"); -static const bool FLAGS_kill_random_test_dummy = - google::RegisterFlagValidator(&FLAGS_kill_random_test, - &ValidateInt32Positive); +static const bool FLAGS_kill_random_test_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_kill_random_test, + &ValidateInt32Positive); extern int rocksdb_kill_odds; DEFINE_bool(disable_wal, false, "If true, do not write WAL for write."); @@ -226,42 +237,37 @@ static bool ValidateInt32Percent(const char* flagname, int32_t value) { } DEFINE_int32(readpercent, 10, "Ratio of reads to total workload (expressed as a percentage)"); -static const bool FLAGS_readpercent_dummy = - google::RegisterFlagValidator(&FLAGS_readpercent, &ValidateInt32Percent); +static const bool FLAGS_readpercent_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_readpercent, &ValidateInt32Percent); DEFINE_int32(prefixpercent, 20, "Ratio of prefix iterators to total workload (expressed as a" " percentage)"); -static const bool FLAGS_prefixpercent_dummy = - google::RegisterFlagValidator(&FLAGS_prefixpercent, &ValidateInt32Percent); +static const bool FLAGS_prefixpercent_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_prefixpercent, &ValidateInt32Percent); DEFINE_int32(writepercent, 45, " Ratio of deletes to total workload (expressed as a percentage)"); -static const bool FLAGS_writepercent_dummy = - google::RegisterFlagValidator(&FLAGS_writepercent, &ValidateInt32Percent); +static const bool FLAGS_writepercent_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_writepercent, &ValidateInt32Percent); DEFINE_int32(delpercent, 15, "Ratio of deletes to total workload (expressed as a percentage)"); -static const bool FLAGS_delpercent_dummy = - google::RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent); +static const bool FLAGS_delpercent_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent); DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload" " (expressed as a percentage)"); -static const bool FLAGS_iterpercent_dummy = - google::RegisterFlagValidator(&FLAGS_iterpercent, &ValidateInt32Percent); +static const bool FLAGS_iterpercent_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_iterpercent, &ValidateInt32Percent); DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run"); -static const bool FLAGS_num_iterations_dummy = - google::RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range); +static const bool FLAGS_num_iterations_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range); DEFINE_bool(disable_seek_compaction, false, "Option to disable compation triggered by read."); -DEFINE_uint64(delete_obsolete_files_period_micros, 0, - "Option to delete obsolete files periodically" - "0 means that obsolete files are " - " deleted after every compaction run."); - enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { assert(ctype); @@ -290,21 +296,21 @@ DEFINE_string(hdfs, "", "Name of hdfs environment"); // posix or hdfs environment static rocksdb::Env* FLAGS_env = rocksdb::Env::Default(); -DEFINE_uint64(ops_per_thread, 600000, "Number of operations per thread."); -static const bool FLAGS_ops_per_thread_dummy = - google::RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range); +DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread."); +static const bool FLAGS_ops_per_thread_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_ops_per_thread, &ValidateUint32Range); DEFINE_uint64(log2_keys_per_lock, 2, "Log2 of number of keys per lock"); -static const bool FLAGS_log2_keys_per_lock_dummy = - google::RegisterFlagValidator(&FLAGS_log2_keys_per_lock, - &ValidateUint32Range); +static const bool FLAGS_log2_keys_per_lock_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_log2_keys_per_lock, + &ValidateUint32Range); DEFINE_int32(purge_redundant_percent, 50, "Percentage of times we want to purge redundant keys in memory " "before flushing"); -static const bool FLAGS_purge_redundant_percent_dummy = - google::RegisterFlagValidator(&FLAGS_purge_redundant_percent, - &ValidateInt32Percent); +static const bool FLAGS_purge_redundant_percent_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_purge_redundant_percent, + &ValidateInt32Percent); DEFINE_bool(filter_deletes, false, "On true, deletes use KeyMayExist to drop" " the delete if key not present"); @@ -339,8 +345,8 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { return true; } DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep"); -static const bool FLAGS_prefix_size_dummy = - google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); +static const bool FLAGS_prefix_size_dummy __attribute__((unused)) = + google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge " "that behaves like a Put"); @@ -531,28 +537,27 @@ class SharedState { start_verify_(false), stress_test_(stress_test) { if (FLAGS_test_batches_snapshots) { - key_locks_ = nullptr; - values_ = nullptr; fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); return; } - values_ = new uint32_t[max_key_]; - for (long i = 0; i < max_key_; i++) { - values_[i] = SENTINEL; + values_.resize(FLAGS_column_families); + + for (int i = 0; i < FLAGS_column_families; ++i) { + values_[i] = std::vector(max_key_, SENTINEL); } long num_locks = (max_key_ >> log2_keys_per_lock_); if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { - num_locks ++; + num_locks++; + } + fprintf(stdout, "Creating %ld locks\n", num_locks * FLAGS_column_families); + key_locks_.resize(FLAGS_column_families); + for (int i = 0; i < FLAGS_column_families; ++i) { + key_locks_[i] = std::vector(num_locks); } - fprintf(stdout, "Creating %ld locks\n", num_locks); - key_locks_ = new port::Mutex[num_locks]; } - ~SharedState() { - delete[] values_; - delete[] key_locks_; - } + ~SharedState() {} port::Mutex* GetMutex() { return &mu_; @@ -622,26 +627,36 @@ class SharedState { return start_verify_; } - port::Mutex* GetMutexForKey(long key) { - return &key_locks_[key >> log2_keys_per_lock_]; + port::Mutex* GetMutexForKey(int cf, long key) { + return &key_locks_[cf][key >> log2_keys_per_lock_]; } - void Put(long key, uint32_t value_base) { - values_[key] = value_base; + void LockColumnFamily(int cf) { + for (auto& mutex : key_locks_[cf]) { + mutex.Lock(); + } } - uint32_t Get(long key) const { - return values_[key]; + void UnlockColumnFamily(int cf) { + for (auto& mutex : key_locks_[cf]) { + mutex.Unlock(); + } } - void Delete(long key) const { - values_[key] = SENTINEL; + void ClearColumnFamily(int cf) { + std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL); } - uint32_t GetSeed() const { - return seed_; + void Put(int cf, long key, uint32_t value_base) { + values_[cf][key] = value_base; } + uint32_t Get(int cf, long key) const { return values_[cf][key]; } + + void Delete(int cf, long key) { values_[cf][key] = SENTINEL; } + + uint32_t GetSeed() const { return seed_; } + private: port::Mutex mu_; port::CondVar cv_; @@ -657,9 +672,8 @@ class SharedState { bool start_verify_; StressTest* stress_test_; - uint32_t *values_; - port::Mutex *key_locks_; - + std::vector> values_; + std::vector> key_locks_; }; // Per-thread state for concurrent executions of the same benchmark. @@ -682,13 +696,14 @@ class StressTest { public: StressTest() : cache_(NewLRUCache(FLAGS_cache_size)), - compressed_cache_(FLAGS_compressed_cache_size >= 0 ? - NewLRUCache(FLAGS_compressed_cache_size) : - nullptr), + compressed_cache_(FLAGS_compressed_cache_size >= 0 + ? NewLRUCache(FLAGS_compressed_cache_size) + : nullptr), filter_policy_(FLAGS_bloom_bits >= 0 - ? NewBloomFilterPolicy(FLAGS_bloom_bits) - : nullptr), + ? NewBloomFilterPolicy(FLAGS_bloom_bits) + : nullptr), db_(nullptr), + new_column_family_name_(0), num_times_reopened_(0) { if (FLAGS_destroy_db_initially) { std::vector files; @@ -703,6 +718,10 @@ class StressTest { } ~StressTest() { + for (auto cf : column_families_) { + delete cf; + } + column_families_.clear(); delete db_; delete filter_policy_; } @@ -817,9 +836,9 @@ class StressTest { // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... // ("9"+K, "9"+V) in DB atomically i.e in a single batch. // Also refer MultiGet. - Status MultiPut(ThreadState* thread, - const WriteOptions& writeoptions, - const Slice& key, const Slice& value, size_t sz) { + Status MultiPut(ThreadState* thread, const WriteOptions& writeoptions, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, size_t sz) { std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; std::string values[10] = {"9", "8", "7", "6", "5", @@ -832,9 +851,9 @@ class StressTest { values[i] += value.ToString(); value_slices[i] = values[i]; if (FLAGS_use_merge) { - batch.Merge(keys[i], value_slices[i]); + batch.Merge(column_family->GetID(), keys[i], value_slices[i]); } else { - batch.Put(keys[i], value_slices[i]); + batch.Put(column_family->GetID(), keys[i], value_slices[i]); } } @@ -852,9 +871,8 @@ class StressTest { // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K) // in DB atomically i.e in a single batch. Also refer MultiGet. - Status MultiDelete(ThreadState* thread, - const WriteOptions& writeoptions, - const Slice& key) { + Status MultiDelete(ThreadState* thread, const WriteOptions& writeoptions, + ColumnFamilyHandle* column_family, const Slice& key) { std::string keys[10] = {"9", "7", "5", "3", "1", "8", "6", "4", "2", "0"}; @@ -862,7 +880,7 @@ class StressTest { Status s; for (int i = 0; i < 10; i++) { keys[i] += key.ToString(); - batch.Delete(keys[i]); + batch.Delete(column_family->GetID(), keys[i]); } s = db_->Write(writeoptions, &batch); @@ -880,9 +898,9 @@ class StressTest { // in the same snapshot, and verifies that all the values are of the form // "0"+V, "1"+V,..."9"+V. // ASSUMES that MultiPut was used to put (K, V) into the DB. - Status MultiGet(ThreadState* thread, - const ReadOptions& readoptions, - const Slice& key, std::string* value) { + Status MultiGet(ThreadState* thread, const ReadOptions& readoptions, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) { std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; Slice key_slices[10]; std::string values[10]; @@ -892,7 +910,7 @@ class StressTest { for (int i = 0; i < 10; i++) { keys[i] += key.ToString(); key_slices[i] = keys[i]; - s = db_->Get(readoptionscopy, key_slices[i], value); + s = db_->Get(readoptionscopy, column_family, key_slices[i], value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "get error: %s\n", s.ToString().c_str()); values[i] = ""; @@ -937,8 +955,8 @@ class StressTest { // for each index i that all the i'th values are of the form "0"+V, // "1"+V,..."9"+V. // ASSUMES that MultiPut was used to put (K, V) - Status MultiPrefixScan(ThreadState* thread, - const ReadOptions& readoptions, + Status MultiPrefixScan(ThreadState* thread, const ReadOptions& readoptions, + ColumnFamilyHandle* column_family, const Slice& prefix) { std::string prefixes[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; @@ -953,7 +971,7 @@ class StressTest { readoptionscopy[i] = readoptions; readoptionscopy[i].prefix = &prefix_slices[i]; readoptionscopy[i].snapshot = snapshot; - iters[i] = db_->NewIterator(readoptionscopy[i]); + iters[i] = db_->NewIterator(readoptionscopy[i], column_family); iters[i]->SeekToFirst(); } @@ -1009,14 +1027,13 @@ class StressTest { // Given a key K, this creates an iterator which scans to K and then // does a random sequence of Next/Prev operations. - Status MultiIterate(ThreadState* thread, - const ReadOptions& readoptions, - const Slice& key) { + Status MultiIterate(ThreadState* thread, const ReadOptions& readoptions, + ColumnFamilyHandle* column_family, const Slice& key) { Status s; const Snapshot* snapshot = db_->GetSnapshot(); ReadOptions readoptionscopy = readoptions; readoptionscopy.snapshot = snapshot; - unique_ptr iter(db_->NewIterator(readoptionscopy)); + unique_ptr iter(db_->NewIterator(readoptionscopy, column_family)); iter->Seek(key); for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) { @@ -1071,15 +1088,50 @@ class StressTest { } } + if (!FLAGS_test_batches_snapshots && + FLAGS_clear_column_family_one_in != 0) { + if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) { + // drop column family and then create it again (can't drop default) + int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1; + std::string new_name = + std::to_string(new_column_family_name_.fetch_add(1)); + { + MutexLock l(thread->shared->GetMutex()); + fprintf( + stdout, + "[CF %d] Dropping and recreating column family. new name: %s\n", + cf, new_name.c_str()); + } + thread->shared->LockColumnFamily(cf); + Status s __attribute__((unused)); + s = db_->DropColumnFamily(column_families_[cf]); + delete column_families_[cf]; + assert(s.ok()); + s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name, + &column_families_[cf]); + column_family_names_[cf] = new_name; + thread->shared->ClearColumnFamily(cf); + assert(s.ok()); + thread->shared->UnlockColumnFamily(cf); + } + } + long rand_key = thread->rand.Next() % max_key; + int rand_column_family = thread->rand.Next() % FLAGS_column_families; std::string keystr = Key(rand_key); Slice key = keystr; int prob_op = thread->rand.Uniform(100); + std::unique_ptr l; + if (!FLAGS_test_batches_snapshots) { + l.reset(new MutexLock( + thread->shared->GetMutexForKey(rand_column_family, rand_key))); + } + auto column_family = column_families_[rand_column_family]; if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { // OPERATION read if (!FLAGS_test_batches_snapshots) { - Status s = db_->Get(read_opts, key, &from_db); + Status s = db_->Get(read_opts, column_family, key, &from_db); if (s.ok()) { // found case thread->stats.AddGets(1, 1); @@ -1091,7 +1143,7 @@ class StressTest { thread->stats.AddErrors(1); } } else { - MultiGet(thread, read_opts, key, &from_db); + MultiGet(thread, read_opts, column_family, key, &from_db); } } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { // OPERATION prefix scan @@ -1101,7 +1153,7 @@ class StressTest { Slice prefix = Slice(key.data(), key.size() - 1); if (!FLAGS_test_batches_snapshots) { read_opts.prefix = &prefix; - Iterator* iter = db_->NewIterator(read_opts); + Iterator* iter = db_->NewIterator(read_opts, column_family); int count = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { assert(iter->key().starts_with(prefix)); @@ -1115,7 +1167,7 @@ class StressTest { } delete iter; } else { - MultiPrefixScan(thread, read_opts, prefix); + MultiPrefixScan(thread, read_opts, column_family, prefix); } read_opts.prefix = nullptr; } else if (prefixBound <= prob_op && prob_op < writeBound) { @@ -1124,42 +1176,36 @@ class StressTest { size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); if (!FLAGS_test_batches_snapshots) { - MutexLock l(thread->shared->GetMutexForKey(rand_key)); if (FLAGS_verify_before_write) { std::string keystr2 = Key(rand_key); Slice k = keystr2; - Status s = db_->Get(read_opts, k, &from_db); - VerifyValue(rand_key, - read_opts, - *(thread->shared), - from_db, - s, - true); + Status s = db_->Get(read_opts, column_family, k, &from_db); + VerifyValue(rand_column_family, rand_key, read_opts, + *(thread->shared), from_db, s, true); } - thread->shared->Put(rand_key, value_base); + thread->shared->Put(rand_column_family, rand_key, value_base); if (FLAGS_use_merge) { - db_->Merge(write_opts, key, v); + db_->Merge(write_opts, column_family, key, v); } else { - db_->Put(write_opts, key, v); + db_->Put(write_opts, column_family, key, v); } thread->stats.AddBytesForWrites(1, sz); } else { - MultiPut(thread, write_opts, key, v, sz); + MultiPut(thread, write_opts, column_family, key, v, sz); } - PrintKeyValue(rand_key, value, sz); + PrintKeyValue(rand_column_family, rand_key, value, sz); } else if (writeBound <= prob_op && prob_op < delBound) { // OPERATION delete if (!FLAGS_test_batches_snapshots) { - MutexLock l(thread->shared->GetMutexForKey(rand_key)); - thread->shared->Delete(rand_key); - db_->Delete(write_opts, key); + thread->shared->Delete(rand_column_family, rand_key); + db_->Delete(write_opts, column_family, key); thread->stats.AddDeletes(1); } else { - MultiDelete(thread, write_opts, key); + MultiDelete(thread, write_opts, column_family, key); } } else { // OPERATION iterate - MultiIterate(thread, read_opts, key); + MultiIterate(thread, read_opts, column_family, key); } thread->stats.FinishedSingleOp(); } @@ -1177,91 +1223,93 @@ class StressTest { if (thread->tid == shared.GetNumThreads() - 1) { end = max_key; } - if (!thread->rand.OneIn(2)) { - // Use iterator to verify this range - unique_ptr iter(db_->NewIterator(options)); - iter->Seek(Key(start)); - for (long i = start; i < end; i++) { - std::string from_db; - std::string keystr = Key(i); - Slice k = keystr; - Status s = iter->status(); - if (iter->Valid()) { - if (iter->key().compare(k) > 0) { + for (size_t cf = 0; cf < column_families_.size(); ++cf) { + if (!thread->rand.OneIn(2)) { + // Use iterator to verify this range + unique_ptr iter( + db_->NewIterator(options, column_families_[cf])); + iter->Seek(Key(start)); + for (long i = start; i < end; i++) { + std::string from_db; + std::string keystr = Key(i); + Slice k = keystr; + Status s = iter->status(); + if (iter->Valid()) { + if (iter->key().compare(k) > 0) { + s = Status::NotFound(Slice()); + } else if (iter->key().compare(k) == 0) { + from_db = iter->value().ToString(); + iter->Next(); + } else if (iter->key().compare(k) < 0) { + VerificationAbort("An out of range key was found", cf, i); + } + } else { + // The iterator found no value for the key in question, so do not + // move to the next item in the iterator s = Status::NotFound(Slice()); - } else if (iter->key().compare(k) == 0) { - from_db = iter->value().ToString(); - iter->Next(); - } else if (iter->key().compare(k) < 0) { - VerificationAbort("An out of range key was found", i); } - } else { - // The iterator found no value for the key in question, so do not - // move to the next item in the iterator - s = Status::NotFound(Slice()); + VerifyValue(cf, i, options, shared, from_db, s, true); + if (from_db.length()) { + PrintKeyValue(cf, i, from_db.data(), from_db.length()); + } } - VerifyValue(i, options, shared, from_db, s, true); - if (from_db.length()) { - PrintKeyValue(i, from_db.data(), from_db.length()); - } - } - } - else { - // Use Get to verify this range - for (long i = start; i < end; i++) { - std::string from_db; - std::string keystr = Key(i); - Slice k = keystr; - Status s = db_->Get(options, k, &from_db); - VerifyValue(i, options, shared, from_db, s, true); - if (from_db.length()) { - PrintKeyValue(i, from_db.data(), from_db.length()); + } else { + // Use Get to verify this range + for (long i = start; i < end; i++) { + std::string from_db; + std::string keystr = Key(i); + Slice k = keystr; + Status s = db_->Get(options, column_families_[cf], k, &from_db); + if (from_db.length()) { + PrintKeyValue(cf, i, from_db.data(), from_db.length()); + } + VerifyValue(cf, i, options, shared, from_db, s, true); } } } } - void VerificationAbort(std::string msg, long key) const { - fprintf(stderr, "Verification failed for key %ld: %s\n", - key, msg.c_str()); + void VerificationAbort(std::string msg, int cf, long key) const { + fprintf(stderr, "Verification failed for column family %d key %ld: %s\n", + cf, key, msg.c_str()); exit(1); } - void VerifyValue(long key, - const ReadOptions &opts, - const SharedState &shared, - const std::string &value_from_db, - Status s, - bool strict=false) const { + void VerifyValue(int cf, long key, const ReadOptions& opts, + const SharedState& shared, const std::string& value_from_db, + Status s, bool strict = false) const { // compare value_from_db with the value in the shared state char value[100]; - uint32_t value_base = shared.Get(key); + uint32_t value_base = shared.Get(cf, key); if (value_base == SharedState::SENTINEL && !strict) { return; } if (s.ok()) { if (value_base == SharedState::SENTINEL) { - VerificationAbort("Unexpected value found", key); + VerificationAbort("Unexpected value found", cf, key); } size_t sz = GenerateValue(value_base, value, sizeof(value)); if (value_from_db.length() != sz) { - VerificationAbort("Length of value read is not equal", key); + VerificationAbort("Length of value read is not equal", cf, key); } if (memcmp(value_from_db.data(), value, sz) != 0) { - VerificationAbort("Contents of value read don't match", key); + VerificationAbort("Contents of value read don't match", cf, key); } } else { if (value_base != SharedState::SENTINEL) { - VerificationAbort("Value not found", key); + VerificationAbort("Value not found", cf, key); } } } - static void PrintKeyValue(uint32_t key, const char *value, size_t sz) { - if (!FLAGS_verbose) return; - fprintf(stdout, "%u ==> (%u) ", key, (unsigned int)sz); - for (size_t i=0; i (%u) ", cf, key, (unsigned int)sz); + for (size_t i = 0; i < sz; i++) { fprintf(stdout, "%X", value[i]); } fprintf(stdout, "\n"); @@ -1279,8 +1327,13 @@ class StressTest { } void PrintEnv() const { - fprintf(stdout, "LevelDB version : %d.%d\n", - kMajorVersion, kMinorVersion); + fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion, + kMinorVersion); + fprintf(stdout, "Column families : %d\n", FLAGS_column_families); + if (!FLAGS_test_batches_snapshots) { + fprintf(stdout, "Clear CFs one in : %d\n", + FLAGS_clear_column_family_one_in); + } fprintf(stdout, "Number of threads : %d\n", FLAGS_threads); fprintf(stdout, "Ops per thread : %lu\n", @@ -1357,43 +1410,41 @@ class StressTest { void Open() { assert(db_ == nullptr); - Options options; - options.block_cache = cache_; - options.block_cache_compressed = compressed_cache_; - options.write_buffer_size = FLAGS_write_buffer_size; - options.max_write_buffer_number = FLAGS_max_write_buffer_number; - options.min_write_buffer_number_to_merge = - FLAGS_min_write_buffer_number_to_merge; - options.max_background_compactions = FLAGS_max_background_compactions; - options.compaction_style = - static_cast(FLAGS_compaction_style); - options.block_size = FLAGS_block_size; - options.filter_policy = filter_policy_; - options.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size)); - options.max_open_files = FLAGS_open_files; - options.statistics = dbstats; - options.env = FLAGS_env; - options.disableDataSync = FLAGS_disable_data_sync; - options.use_fsync = FLAGS_use_fsync; - options.allow_mmap_reads = FLAGS_mmap_read; + options_.block_cache = cache_; + options_.block_cache_compressed = compressed_cache_; + options_.write_buffer_size = FLAGS_write_buffer_size; + options_.max_write_buffer_number = FLAGS_max_write_buffer_number; + options_.min_write_buffer_number_to_merge = + FLAGS_min_write_buffer_number_to_merge; + options_.max_background_compactions = FLAGS_max_background_compactions; + options_.max_background_flushes = FLAGS_max_background_flushes; + options_.compaction_style = + static_cast(FLAGS_compaction_style); + options_.block_size = FLAGS_block_size; + options_.filter_policy = filter_policy_; + options_.prefix_extractor.reset(NewFixedPrefixTransform(FLAGS_prefix_size)); + options_.max_open_files = FLAGS_open_files; + options_.statistics = dbstats; + options_.env = FLAGS_env; + options_.disableDataSync = FLAGS_disable_data_sync; + options_.use_fsync = FLAGS_use_fsync; + options_.allow_mmap_reads = FLAGS_mmap_read; rocksdb_kill_odds = FLAGS_kill_random_test; - options.target_file_size_base = FLAGS_target_file_size_base; - options.target_file_size_multiplier = FLAGS_target_file_size_multiplier; - options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; - options.max_bytes_for_level_multiplier = + options_.target_file_size_base = FLAGS_target_file_size_base; + options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier; + options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; + options_.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; - options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; - options.level0_slowdown_writes_trigger = - FLAGS_level0_slowdown_writes_trigger; - options.level0_file_num_compaction_trigger = - FLAGS_level0_file_num_compaction_trigger; - options.compression = FLAGS_compression_type_e; - options.create_if_missing = true; - options.disable_seek_compaction = FLAGS_disable_seek_compaction; - options.delete_obsolete_files_period_micros = - FLAGS_delete_obsolete_files_period_micros; - options.max_manifest_file_size = 1024; - options.filter_deletes = FLAGS_filter_deletes; + options_.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; + options_.level0_slowdown_writes_trigger = + FLAGS_level0_slowdown_writes_trigger; + options_.level0_file_num_compaction_trigger = + FLAGS_level0_file_num_compaction_trigger; + options_.compression = FLAGS_compression_type_e; + options_.create_if_missing = true; + options_.disable_seek_compaction = FLAGS_disable_seek_compaction; + options_.max_manifest_file_size = 10 * 1024; + options_.filter_deletes = FLAGS_filter_deletes; if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) { fprintf(stderr, "prefix_size should be non-zero iff memtablerep == prefix_hash\n"); @@ -1401,51 +1452,107 @@ class StressTest { } switch (FLAGS_rep_factory) { case kHashSkipList: - options.memtable_factory.reset(NewHashSkipListRepFactory()); + options_.memtable_factory.reset(NewHashSkipListRepFactory()); break; case kSkipList: // no need to do anything break; case kVectorRep: - options.memtable_factory.reset(new VectorRepFactory()); + options_.memtable_factory.reset(new VectorRepFactory()); break; } static Random purge_percent(1000); // no benefit from non-determinism here if (static_cast(purge_percent.Uniform(100)) < FLAGS_purge_redundant_percent - 1) { - options.purge_redundant_kvs_while_flush = false; + options_.purge_redundant_kvs_while_flush = false; } if (FLAGS_use_merge) { - options.merge_operator = MergeOperators::CreatePutOperator(); + options_.merge_operator = MergeOperators::CreatePutOperator(); } // set universal style compaction configurations, if applicable if (FLAGS_universal_size_ratio != 0) { - options.compaction_options_universal.size_ratio = - FLAGS_universal_size_ratio; + options_.compaction_options_universal.size_ratio = + FLAGS_universal_size_ratio; } if (FLAGS_universal_min_merge_width != 0) { - options.compaction_options_universal.min_merge_width = - FLAGS_universal_min_merge_width; + options_.compaction_options_universal.min_merge_width = + FLAGS_universal_min_merge_width; } if (FLAGS_universal_max_merge_width != 0) { - options.compaction_options_universal.max_merge_width = - FLAGS_universal_max_merge_width; + options_.compaction_options_universal.max_merge_width = + FLAGS_universal_max_merge_width; } if (FLAGS_universal_max_size_amplification_percent != 0) { - options.compaction_options_universal.max_size_amplification_percent = - FLAGS_universal_max_size_amplification_percent; + options_.compaction_options_universal.max_size_amplification_percent = + FLAGS_universal_max_size_amplification_percent; } fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); Status s; if (FLAGS_ttl == -1) { - s = DB::Open(options, FLAGS_db, &db_); + std::vector existing_column_families; + s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, + &existing_column_families); // ignore errors + if (!s.ok()) { + // DB doesn't exist + assert(existing_column_families.empty()); + assert(column_family_names_.empty()); + column_family_names_.push_back(default_column_family_name); + } else if (column_family_names_.empty()) { + // this is the first call to the function Open() + column_family_names_ = existing_column_families; + } else { + // this is a reopen. just assert that existing column_family_names are + // equivalent to what we remember + auto sorted_cfn = column_family_names_; + sort(sorted_cfn.begin(), sorted_cfn.end()); + sort(existing_column_families.begin(), existing_column_families.end()); + if (sorted_cfn != existing_column_families) { + fprintf(stderr, + "Expected column families differ from the existing:\n"); + printf("Expected: {"); + for (auto cf : sorted_cfn) { + printf("%s ", cf.c_str()); + } + printf("}\n"); + printf("Existing: {"); + for (auto cf : existing_column_families) { + printf("%s ", cf.c_str()); + } + printf("}\n"); + } + assert(sorted_cfn == existing_column_families); + } + std::vector cf_descriptors; + for (auto name : column_family_names_) { + if (name != default_column_family_name) { + new_column_family_name_ = + std::max(new_column_family_name_.load(), std::stoi(name) + 1); + } + cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); + } + s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, + &column_families_, &db_); + if (s.ok()) { + while (s.ok() && + column_families_.size() < (size_t)FLAGS_column_families) { + ColumnFamilyHandle* cf = nullptr; + std::string name = std::to_string(new_column_family_name_.load()); + new_column_family_name_++; + s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), name, &cf); + column_families_.push_back(cf); + column_family_names_.push_back(name); + } + } + assert(!s.ok() || column_families_.size() == + static_cast(FLAGS_column_families)); } else { - s = UtilityDB::OpenTtlDB(options, FLAGS_db, &sdb_, FLAGS_ttl); - db_ = sdb_; + StackableDB* sdb; + s = UtilityDB::OpenTtlDB(options_, FLAGS_db, &sdb, FLAGS_ttl); + db_ = sdb; } if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -1454,13 +1561,11 @@ class StressTest { } void Reopen() { - // do not close the db. Just delete the lock file. This - // simulates a crash-recovery kind of situation. - if (FLAGS_ttl != -1) { - ((DBWithTTL*) db_)->TEST_Destroy_DBWithTtl(); - } else { - ((DBImpl*) db_)->TEST_Destroy_DBImpl(); + for (auto cf : column_families_) { + delete cf; } + column_families_.clear(); + delete db_; db_ = nullptr; num_times_reopened_++; @@ -1482,14 +1587,15 @@ class StressTest { shared_ptr compressed_cache_; const FilterPolicy* filter_policy_; DB* db_; - StackableDB* sdb_; + Options options_; + std::vector column_families_; + std::vector column_family_names_; + std::atomic new_column_family_name_; int num_times_reopened_; }; } // namespace rocksdb - - int main(int argc, char** argv) { google::SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + " [OPTIONS]..."); diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 589148f48..21626bec2 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -214,8 +214,4 @@ Iterator* DBWithTTL::NewIterator(const ReadOptions& opts, return new TtlIterator(db_->NewIterator(opts, column_family)); } -void DBWithTTL::TEST_Destroy_DBWithTtl() { - ((DBImpl*) db_)->TEST_Destroy_DBImpl(); -} - } // namespace rocksdb diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 9a1d96a93..97852256a 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -54,9 +54,6 @@ class DBWithTTL : public StackableDB { virtual Iterator* NewIterator(const ReadOptions& opts, ColumnFamilyHandle* column_family) override; - // Simulate a db crash, no elegant closing of database. - void TEST_Destroy_DBWithTtl(); - virtual DB* GetBaseDB() { return db_; }