From 218487d8dc45c4cb03dbb80bd4d7031b131b9f25 Mon Sep 17 00:00:00 2001 From: Mike Kolupaev Date: Thu, 2 Jul 2015 14:27:00 -0700 Subject: [PATCH 01/18] [wal changes 1/3] fixed unbounded wal growth in some workloads Summary: This fixes the following scenario we've hit: - we reached max_total_wal_size, created a new wal and scheduled flushing all memtables corresponding to the old one, - before the last of these flushes started its column family was dropped; the last background flush call was a no-op; no one removed the old wal from alive_logs_, - hours have passed and no flushes happened even though lots of data was written; data is written to different column families, compactions are disabled; old column families are dropped before memtable grows big enough to trigger a flush; the old wal still sits in alive_logs_ preventing max_total_wal_size limit from kicking in, - a few more hours pass and we run out disk space because of one huge .log file. Test Plan: `make check`; backported the new test, checked that it fails without this diff Reviewers: igor Reviewed By: igor Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D40893 --- db/db_impl.cc | 39 ++++++++++++++++++++------------------- db/db_impl.h | 2 ++ db/db_impl_debug.cc | 5 +++++ db/db_test.cc | 36 ++++++++++++++++++++++++++++++++++-- db/version_set.h | 4 +++- 5 files changed, 64 insertions(+), 22 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index e7465e0f9..c599f1ef2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -527,6 +527,24 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, versions_->GetObsoleteFiles(&job_context->sst_delete_files, job_context->min_pending_output); + uint64_t min_log_number = versions_->MinLogNumber(); + if (!alive_log_files_.empty()) { + // find newly obsoleted log files + while (alive_log_files_.begin()->number < min_log_number) { + auto& earliest = *alive_log_files_.begin(); + job_context->log_delete_files.push_back(earliest.number); + total_log_size_ -= earliest.size; + alive_log_files_.pop_front(); + // Current log should always stay alive since it can't have + // number < MinLogNumber(). + assert(alive_log_files_.size()); + } + } + + // We're just cleaning up for DB::Write(). + job_context->logs_to_free = logs_to_free_; + logs_to_free_.clear(); + // store the current filenum, lognum, etc job_context->manifest_file_number = versions_->manifest_file_number(); job_context->pending_manifest_file_number = @@ -1309,17 +1327,6 @@ Status DBImpl::FlushMemTableToOutputFile( VersionStorageInfo::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), cfd->current()->storage_info()->LevelSummary(&tmp)); - - if (disable_delete_obsolete_files_ == 0) { - // add to deletion state - while (alive_log_files_.size() && - alive_log_files_.begin()->number < versions_->MinLogNumber()) { - const auto& earliest = *alive_log_files_.begin(); - job_context->log_delete_files.push_back(earliest.number); - total_log_size_ -= earliest.size; - alive_log_files_.pop_front(); - } - } } if (!s.ok() && !s.IsShutdownInProgress() && db_options_.paranoid_checks && @@ -2145,7 +2152,9 @@ void DBImpl::RecordFlushIOStats() { void DBImpl::BGWorkFlush(void* db) { IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); + TEST_SYNC_POINT("DBImpl::BGWorkFlush"); reinterpret_cast(db)->BackgroundCallFlush(); + TEST_SYNC_POINT("DBImpl::BGWorkFlush:done"); } void DBImpl::BGWorkCompaction(void* db) { @@ -2238,10 +2247,6 @@ void DBImpl::BackgroundCallFlush() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // We're just cleaning up for DB::Write() - job_context.logs_to_free = logs_to_free_; - logs_to_free_.clear(); - // If flush failed, we want to delete all temporary files that we might have // created. Thus, we force full scan in FindObsoleteFiles() FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); @@ -2308,10 +2313,6 @@ void DBImpl::BackgroundCallCompaction() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // We're just cleaning up for DB::Write() - job_context.logs_to_free = logs_to_free_; - logs_to_free_.clear(); - // If compaction failed, we want to delete all temporary files that we might // have created (they might not be all recorded in job_context in case of a // failure). Thus, we force full scan in FindObsoleteFiles() diff --git a/db/db_impl.h b/db/db_impl.h index a649b2baa..dd37dd031 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -290,6 +290,8 @@ class DBImpl : public DB { size_t TEST_LogsToFreeSize(); + uint64_t TEST_LogfileNumber(); + #endif // ROCKSDB_LITE // Returns the list of live files in 'live' and the list diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 35703cf1a..66177ed7a 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -148,5 +148,10 @@ size_t DBImpl::TEST_LogsToFreeSize() { return logs_to_free_.size(); } +uint64_t DBImpl::TEST_LogfileNumber() { + InstrumentedMutexLock l(&mutex_); + return logfile_number_; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/db/db_test.cc b/db/db_test.cc index c5d2a9b64..ddf7de9a4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8540,7 +8540,6 @@ TEST_F(DBTest, TransactionLogIterator) { } while (ChangeCompactOptions()); } -#ifndef NDEBUG // sync point is not included with DNDEBUG build TEST_F(DBTest, TransactionLogIteratorRace) { static const int LOG_ITERATOR_RACE_TEST_COUNT = 2; static const char* sync_points[LOG_ITERATOR_RACE_TEST_COUNT][4] = { @@ -8595,7 +8594,6 @@ TEST_F(DBTest, TransactionLogIteratorRace) { } while (ChangeCompactOptions()); } } -#endif TEST_F(DBTest, TransactionLogIteratorStallAtLastRecord) { do { @@ -14136,6 +14134,40 @@ TEST_F(DBTest, PrevAfterMerge) { ASSERT_EQ("1", it->key().ToString()); } +TEST_F(DBTest, DeletingOldWalAfterDrop) { + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { "Test:AllowFlushes", "DBImpl::BGWorkFlush" }, + { "DBImpl::BGWorkFlush:done", "Test:WaitForFlush"} }); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = CurrentOptions(); + options.max_total_wal_size = 8192; + options.compression = kNoCompression; + options.write_buffer_size = 1 << 20; + options.level0_file_num_compaction_trigger = (1<<30); + options.level0_slowdown_writes_trigger = (1<<30); + options.level0_stop_writes_trigger = (1<<30); + options.disable_auto_compactions = true; + DestroyAndReopen(options); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + CreateColumnFamilies({"cf1", "cf2"}, options); + ASSERT_OK(Put(0, "key1", DummyString(8192))); + ASSERT_OK(Put(0, "key2", DummyString(8192))); + // the oldest wal should now be getting_flushed + ASSERT_OK(db_->DropColumnFamily(handles_[0])); + // all flushes should now do nothing because their CF is dropped + TEST_SYNC_POINT("Test:AllowFlushes"); + TEST_SYNC_POINT("Test:WaitForFlush"); + uint64_t lognum1 = dbfull()->TEST_LogfileNumber(); + ASSERT_OK(Put(1, "key3", DummyString(8192))); + ASSERT_OK(Put(1, "key4", DummyString(8192))); + // new wal should have been created + uint64_t lognum2 = dbfull()->TEST_LogfileNumber(); + EXPECT_GT(lognum2, lognum1); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.h b/db/version_set.h index 9ee6aeaa9..778e537f5 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -612,7 +612,9 @@ class VersionSet { uint64_t MinLogNumber() const { uint64_t min_log_num = std::numeric_limits::max(); for (auto cfd : *column_family_set_) { - if (min_log_num > cfd->GetLogNumber()) { + // It's safe to ignore dropped column families here: + // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. + if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { min_log_num = cfd->GetLogNumber(); } } From acee2b08a2d37154b8f9e2dc74b1966202c15ec5 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 2 Jul 2015 16:10:31 -0700 Subject: [PATCH 02/18] Fixed endless loop in DBIter::FindPrevUserKey() Summary: Fixed endless loop in DBIter::FindPrevUserKey() Test Plan: ./db_stress --test_batches_snapshots=1 --threads=32 --write_buffer_size=4194304 --destroy_db_initially=0 --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 --delpercent=5 --iterpercent=10 --db=/tmp/rocksdb_crashtest_KdCI5F --max_key=100000000 --mmap_read=0 --block_size=16384 --cache_size=1048576 --open_files=500000 --verify_checksum=1 --sync=0 --progress_reports=0 --disable_wal=0 --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=0 --memtablerep=prefix_hash --prefix_size=7 --ops_per_thread=200 --kill_random_test=97 Reviewers: tnovak, igor, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41085 --- table/merger.cc | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/table/merger.cc b/table/merger.cc index b418b88a4..9781d0dca 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -208,6 +208,19 @@ class MergingIterator : public Iterator { } else { // Child has no entries >= key(). Position at last entry. child.SeekToLast(); + if (child.Valid() && comparator_->Compare(child.key(), key()) > 0) { + // Prefix bloom or prefix hash may return !Valid() if one of the + // following condition happens: 1. when prefix doesn't match. + // 2. Does not exist any row larger than the key within the prefix + // while SeekToLast() may return larger keys. + // + // Temporarily remove this child to avoid Prev() to return a key + // larger than the original keys. However, this can cause missing + // rows. + // + // TODO(3.13): need to fix. + continue; + } } if (child.Valid()) { maxHeap_.push(&child); From 35cd75c379eaca3a96f51334f043992ab36dbea8 Mon Sep 17 00:00:00 2001 From: Ari Ekmekji Date: Thu, 2 Jul 2015 17:14:39 -0700 Subject: [PATCH 03/18] Introduce InfoLogLevel::HEADER_LEVEL Summary: Introduced a new category in the enum InfoLogLevel in env.h. Modifed Log() in env.cc to use the Header() when the InfoLogLevel == HEADER_LEVEL. Updated tests in auto_roll_logger_test to ensure the header is handled properly in these cases. Test Plan: Augment existing tests in auto_roll_logger_test Reviewers: igor, sdong, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41067 --- include/rocksdb/env.h | 1 + util/auto_roll_logger_test.cc | 76 +++++++++++++++++++++-------------- util/env.cc | 8 +++- 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index f185f2b7f..b054f0def 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -615,6 +615,7 @@ enum InfoLogLevel : unsigned char { WARN_LEVEL, ERROR_LEVEL, FATAL_LEVEL, + HEADER_LEVEL, NUM_INFO_LOG_LEVELS, }; diff --git a/util/auto_roll_logger_test.cc b/util/auto_roll_logger_test.cc index 6733a62a4..44b3023cc 100644 --- a/util/auto_roll_logger_test.cc +++ b/util/auto_roll_logger_test.cc @@ -254,28 +254,29 @@ TEST_F(AutoRollLoggerTest, InfoLogLevel) { // becomes out of scope. { AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0); - for (int log_level = InfoLogLevel::FATAL_LEVEL; + for (int log_level = InfoLogLevel::HEADER_LEVEL; log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { logger.SetInfoLogLevel((InfoLogLevel)log_level); for (int log_type = InfoLogLevel::DEBUG_LEVEL; - log_type <= InfoLogLevel::FATAL_LEVEL; log_type++) { + log_type <= InfoLogLevel::HEADER_LEVEL; log_type++) { // log messages with log level smaller than log_level will not be // logged. LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str()); } - log_lines += InfoLogLevel::FATAL_LEVEL - log_level + 1; + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; } - for (int log_level = InfoLogLevel::FATAL_LEVEL; + for (int log_level = InfoLogLevel::HEADER_LEVEL; log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { logger.SetInfoLogLevel((InfoLogLevel)log_level); // again, messages with level smaller than log_level will not be logged. + Log(InfoLogLevel::HEADER_LEVEL, &logger, "%s", kSampleMessage.c_str()); Debug(&logger, "%s", kSampleMessage.c_str()); Info(&logger, "%s", kSampleMessage.c_str()); Warn(&logger, "%s", kSampleMessage.c_str()); Error(&logger, "%s", kSampleMessage.c_str()); Fatal(&logger, "%s", kSampleMessage.c_str()); - log_lines += InfoLogLevel::FATAL_LEVEL - log_level + 1; + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; } } std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str()); @@ -329,41 +330,54 @@ TEST_F(AutoRollLoggerTest, LogHeaderTest) { static const size_t LOG_MAX_SIZE = 1024 * 5; static const std::string HEADER_STR = "Log header line"; - InitTestDb(); + // test_num == 0 -> standard call to Header() + // test_num == 1 -> call to Log() with InfoLogLevel::HEADER_LEVEL + for (int test_num = 0; test_num < 2; test_num++) { - AutoRollLogger logger(Env::Default(), kTestDir, /*db_log_dir=*/ "", - LOG_MAX_SIZE, /*log_file_time_to_roll=*/ 0); + InitTestDb(); - // log some headers - for (size_t i = 0; i < MAX_HEADERS; i++) { - Header(&logger, "%s %d", HEADER_STR.c_str(), i); - } + AutoRollLogger logger(Env::Default(), kTestDir, /*db_log_dir=*/ "", + LOG_MAX_SIZE, /*log_file_time_to_roll=*/ 0); - const string& newfname = logger.TEST_log_fname().c_str(); - - // log enough data to cause a roll over - int i = 0; - for (size_t iter = 0; iter < 2; iter++) { - while (logger.GetLogFileSize() < LOG_MAX_SIZE) { - Info(&logger, (kSampleMessage + ":LogHeaderTest line %d").c_str(), i); - ++i; + if (test_num == 0) { + // Log some headers explicitly using Header() + for (size_t i = 0; i < MAX_HEADERS; i++) { + Header(&logger, "%s %d", HEADER_STR.c_str(), i); + } + } else if (test_num == 1) { + // HEADER_LEVEL should make this behave like calling Header() + for (size_t i = 0; i < MAX_HEADERS; i++) { + Log(InfoLogLevel::HEADER_LEVEL, &logger, "%s %d", + HEADER_STR.c_str(), i); + } } - Info(&logger, "Rollover"); - } + const string& newfname = logger.TEST_log_fname().c_str(); - // Flus the log for the latest file - LogFlush(&logger); + // Log enough data to cause a roll over + int i = 0; + for (size_t iter = 0; iter < 2; iter++) { + while (logger.GetLogFileSize() < LOG_MAX_SIZE) { + Info(&logger, (kSampleMessage + ":LogHeaderTest line %d").c_str(), i); + ++i; + } - const list oldfiles = GetOldFileNames(newfname); + Info(&logger, "Rollover"); + } - ASSERT_EQ(oldfiles.size(), (size_t) 2); + // Flush the log for the latest file + LogFlush(&logger); - for (auto oldfname : oldfiles) { - // verify that the files rolled over - ASSERT_NE(oldfname, newfname); - // verify that the old log contains all the header logs - ASSERT_EQ(GetLinesCount(oldfname, HEADER_STR), MAX_HEADERS); + const list oldfiles = GetOldFileNames(newfname); + + ASSERT_EQ(oldfiles.size(), (size_t) 2); + + for (auto oldfname : oldfiles) { + // verify that the files rolled over + ASSERT_NE(oldfname, newfname); + // verify that the old log contains all the header logs + ASSERT_EQ(GetLinesCount(oldfname, HEADER_STR), MAX_HEADERS); + } } } diff --git a/util/env.cc b/util/env.cc index e044024de..140d7d986 100644 --- a/util/env.cc +++ b/util/env.cc @@ -61,7 +61,13 @@ void Log(const InfoLogLevel log_level, Logger* info_log, const char* format, if (info_log && info_log->GetInfoLogLevel() <= log_level) { va_list ap; va_start(ap, format); - info_log->Logv(log_level, format, ap); + + if (log_level == InfoLogLevel::HEADER_LEVEL) { + info_log->LogHeader(format, ap); + } else { + info_log->Logv(log_level, format, ap); + } + va_end(ap); } } From b6655a679d11f42ce9a4915f54d7995f85b7556a Mon Sep 17 00:00:00 2001 From: lovro Date: Mon, 6 Jul 2015 04:24:09 -0700 Subject: [PATCH 04/18] Replace std::priority_queue in MergingIterator with custom heap Summary: While profiling compaction in our service I noticed a lot of CPU (~15% of compaction) being spent in MergingIterator and key comparison. Looking at the code I found MergingIterator was (understandably) using std::priority_queue for the multiway merge. Keys in our dataset include sequence numbers that increase with time. Adjacent keys in an L0 file are very likely to be adjacent in the full database. Consequently, compaction will often pick a chunk of rows from the same L0 file before switching to another one. It would be great to avoid the O(log K) operation per row while compacting. This diff replaces std::priority_queue with a custom binary heap implementation. It has a "replace top" operation that is cheap when the new top is the same as the old one (i.e. the priority of the top entry is decreased but it still stays on top). Test Plan: make check To test the effect on performance, I generated databases with data patterns that mimic what I describe in the summary (rows have a mostly increasing sequence number). I see a 10-15% CPU decrease for compaction (and a matching throughput improvement on tmpfs). The exact improvement depends on the number of L0 files and the amount of locality. Performance on randomly distributed keys seems on par with the old code. Reviewers: kailiu, sdong, igor Reviewed By: igor Subscribers: yoshinorim, dhruba, tnovak Differential Revision: https://reviews.facebook.net/D29133 --- table/iter_heap.h | 16 ++-- table/merger.cc | 196 ++++++++++++++++++++-------------------------- util/heap.h | 140 +++++++++++++++++++++++++++++++++ 3 files changed, 234 insertions(+), 118 deletions(-) create mode 100644 util/heap.h diff --git a/table/iter_heap.h b/table/iter_heap.h index 9569d3638..5343175c3 100644 --- a/table/iter_heap.h +++ b/table/iter_heap.h @@ -5,36 +5,34 @@ // #pragma once -#include #include "rocksdb/comparator.h" #include "table/iterator_wrapper.h" namespace rocksdb { -// Return the max of two keys. +// When used with std::priority_queue, this comparison functor puts the +// iterator with the max/largest key on top. class MaxIteratorComparator { public: MaxIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) { - return comparator_->Compare(a->key(), b->key()) <= 0; + bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { + return comparator_->Compare(a->key(), b->key()) < 0; } private: const Comparator* comparator_; }; -// Return the max of two keys. +// When used with std::priority_queue, this comparison functor puts the +// iterator with the min/smallest key on top. class MinIteratorComparator { public: - // if maxHeap is set comparator returns the max value. - // else returns the min Value. - // Can use to create a minHeap or a maxHeap. MinIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) { + bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { return comparator_->Compare(a->key(), b->key()) > 0; } private: diff --git a/table/merger.cc b/table/merger.cc index 9781d0dca..943a360e9 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -10,7 +10,6 @@ #include "table/merger.h" #include -#include #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" @@ -18,6 +17,7 @@ #include "table/iter_heap.h" #include "table/iterator_wrapper.h" #include "util/arena.h" +#include "util/heap.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" @@ -25,21 +25,8 @@ namespace rocksdb { // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { -typedef std::priority_queue, - MaxIteratorComparator> MergerMaxIterHeap; - -typedef std::priority_queue, - MinIteratorComparator> MergerMinIterHeap; - -// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator. -MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) { - return MergerMaxIterHeap(MaxIteratorComparator(comparator)); -} - -// Return's a new MinHeap of IteratorWrapper's using the provided Comparator. -MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) { - return MergerMinIterHeap(MinIteratorComparator(comparator)); -} +typedef BinaryHeap MergerMaxIterHeap; +typedef BinaryHeap MergerMinIterHeap; } // namespace const size_t kNumIterReserve = 4; @@ -51,10 +38,8 @@ class MergingIterator : public Iterator { : is_arena_mode_(is_arena_mode), comparator_(comparator), current_(nullptr), - use_heap_(true), direction_(kForward), - maxHeap_(NewMergerMaxIterHeap(comparator_)), - minHeap_(NewMergerMinIterHeap(comparator_)) { + minHeap_(comparator_) { children_.resize(n); for (int i = 0; i < n; i++) { children_[i].Set(children[i]); @@ -64,6 +49,7 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } + current_ = CurrentForward(); } virtual void AddIterator(Iterator* iter) { @@ -72,6 +58,7 @@ class MergingIterator : public Iterator { auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { minHeap_.push(&new_wrapper); + current_ = CurrentForward(); } } @@ -91,27 +78,25 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } - FindSmallest(); direction_ = kForward; + current_ = CurrentForward(); } virtual void SeekToLast() override { ClearHeaps(); + InitMaxHeap(); for (auto& child : children_) { child.SeekToLast(); if (child.Valid()) { - maxHeap_.push(&child); + maxHeap_->push(&child); } } - FindLargest(); direction_ = kReverse; + current_ = CurrentReverse(); } virtual void Seek(const Slice& target) override { - // Invalidate the heap. - use_heap_ = false; - IteratorWrapper* first_child = nullptr; - + ClearHeaps(); for (auto& child : children_) { { PERF_TIMER_GUARD(seek_child_seek_time); @@ -120,36 +105,15 @@ class MergingIterator : public Iterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { - // This child has valid key - if (!use_heap_) { - if (first_child == nullptr) { - // It's the first child has valid key. Only put it int - // current_. Now the values in the heap should be invalid. - first_child = &child; - } else { - // We have more than one children with valid keys. Initialize - // the heap and put the first child into the heap. - PERF_TIMER_GUARD(seek_min_heap_time); - ClearHeaps(); - minHeap_.push(first_child); - } - } - if (use_heap_) { - PERF_TIMER_GUARD(seek_min_heap_time); - minHeap_.push(&child); - } + PERF_TIMER_GUARD(seek_min_heap_time); + minHeap_.push(&child); } } - if (use_heap_) { - // If heap is valid, need to put the smallest key to curent_. - PERF_TIMER_GUARD(seek_min_heap_time); - FindSmallest(); - } else { - // The heap is not valid, then the current_ iterator is the first - // one, or null if there is no first child. - current_ = first_child; - } direction_ = kForward; + { + PERF_TIMER_GUARD(seek_min_heap_time); + current_ = CurrentForward(); + } } virtual void Next() override { @@ -157,10 +121,11 @@ class MergingIterator : public Iterator { // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already - // true for all of the non-current_ children since current_ is - // the smallest child and key() == current_->key(). Otherwise, - // we explicitly position the non-current_ children. + // true for all of the non-current children since current_ is + // the smallest child and key() == current_->key(). if (direction_ != kForward) { + // Otherwise, advance the non-current children. We advance current_ + // just after the if-block. ClearHeaps(); for (auto& child : children_) { if (&child != current_) { @@ -169,36 +134,42 @@ class MergingIterator : public Iterator { comparator_->Compare(key(), child.key()) == 0) { child.Next(); } - if (child.Valid()) { - minHeap_.push(&child); - } + } + if (child.Valid()) { + minHeap_.push(&child); } } direction_ = kForward; + // The loop advanced all non-current children to be > key() so current_ + // should still be strictly the smallest key. + assert(current_ == CurrentForward()); } // as the current points to the current record. move the iterator forward. - // and if it is valid add it to the heap. current_->Next(); - if (use_heap_) { - if (current_->Valid()) { - minHeap_.push(current_); - } - FindSmallest(); - } else if (!current_->Valid()) { - current_ = nullptr; + if (current_->Valid()) { + // current is still valid after the Next() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + minHeap_.replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + minHeap_.pop(); } + current_ = CurrentForward(); } virtual void Prev() override { assert(Valid()); // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already - // true for all of the non-current_ children since current_ is - // the largest child and key() == current_->key(). Otherwise, - // we explicitly position the non-current_ children. + // true for all of the non-current children since current_ is + // the largest child and key() == current_->key(). if (direction_ != kReverse) { + // Otherwise, retreat the non-current children. We retreat current_ + // just after the if-block. ClearHeaps(); + InitMaxHeap(); for (auto& child : children_) { if (&child != current_) { child.Seek(key()); @@ -222,19 +193,28 @@ class MergingIterator : public Iterator { continue; } } - if (child.Valid()) { - maxHeap_.push(&child); - } + } + if (child.Valid()) { + maxHeap_->push(&child); } } direction_ = kReverse; + // The loop retreated all non-current children to be < key() so current_ + // should still be strictly the largest key. + assert(current_ == CurrentReverse()); } current_->Prev(); if (current_->Valid()) { - maxHeap_.push(current_); + // current is still valid after the Prev() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + maxHeap_->replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + maxHeap_->pop(); } - FindLargest(); + current_ = CurrentReverse(); } virtual Slice key() const override { @@ -259,56 +239,54 @@ class MergingIterator : public Iterator { } private: - void FindSmallest(); - void FindLargest(); + // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(); + // Ensures that maxHeap_ is initialized when starting to go in the reverse + // direction + void InitMaxHeap(); bool is_arena_mode_; const Comparator* comparator_; autovector children_; + + // Cached pointer to child iterator with the current key, or nullptr if no + // child iterators are valid. This is the top of minHeap_ or maxHeap_ + // depending on the direction. IteratorWrapper* current_; - // If the value is true, both of iterators in the heap and current_ - // contain valid rows. If it is false, only current_ can possibly contain - // valid rows. - // This flag is always true for reverse direction, as we always use heap for - // the reverse iterating case. - bool use_heap_; // Which direction is the iterator moving? enum Direction { kForward, kReverse }; Direction direction_; - MergerMaxIterHeap maxHeap_; MergerMinIterHeap minHeap_; + // Max heap is used for reverse iteration, which is way less common than + // forward. Lazily initialize it to save memory. + std::unique_ptr maxHeap_; + + IteratorWrapper* CurrentForward() const { + assert(direction_ == kForward); + return !minHeap_.empty() ? minHeap_.top() : nullptr; + } + + IteratorWrapper* CurrentReverse() const { + assert(direction_ == kReverse); + assert(maxHeap_); + return !maxHeap_->empty() ? maxHeap_->top() : nullptr; + } }; -void MergingIterator::FindSmallest() { - assert(use_heap_); - if (minHeap_.empty()) { - current_ = nullptr; - } else { - current_ = minHeap_.top(); - assert(current_->Valid()); - minHeap_.pop(); - } -} - -void MergingIterator::FindLargest() { - assert(use_heap_); - if (maxHeap_.empty()) { - current_ = nullptr; - } else { - current_ = maxHeap_.top(); - assert(current_->Valid()); - maxHeap_.pop(); - } -} - void MergingIterator::ClearHeaps() { - use_heap_ = true; - maxHeap_ = NewMergerMaxIterHeap(comparator_); - minHeap_ = NewMergerMinIterHeap(comparator_); + minHeap_.clear(); + if (maxHeap_) { + maxHeap_->clear(); + } +} + +void MergingIterator::InitMaxHeap() { + if (!maxHeap_) { + maxHeap_.reset(new MergerMaxIterHeap(comparator_)); + } } Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, diff --git a/util/heap.h b/util/heap.h new file mode 100644 index 000000000..7d9e11113 --- /dev/null +++ b/util/heap.h @@ -0,0 +1,140 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include +#include "util/autovector.h" + +namespace rocksdb { + +// Binary heap implementation optimized for use in multi-way merge sort. +// Comparison to std::priority_queue: +// - In libstdc++, std::priority_queue::pop() usually performs just over logN +// comparisons but never fewer. +// - std::priority_queue does not have a replace-top operation, requiring a +// pop+push. If the replacement element is the new top, this requires +// around 2logN comparisons. +// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN +// comparisons. +// - This heap provides a replace_top() operation which requires [1, 2logN] +// comparisons. When the replacement element is also the new top, this +// takes just 1 or 2 comparisons. +// +// The last property can yield an order-of-magnitude performance improvement +// when merge-sorting real-world non-random data. If the merge operation is +// likely to take chunks of elements from the same input stream, only 1 +// comparison per element is needed. In RocksDB-land, this happens when +// compacting a database where keys are not randomly distributed across L0 +// files but nearby keys are likely to be in the same L0 file. +// +// The container uses the same counterintuitive ordering as +// std::priority_queue: the comparison operator is expected to provide the +// less-than relation, but top() will return the maximum. + +template> +class BinaryHeap { + public: + BinaryHeap() { } + explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { } + + void push(const T& value) { + data_.push_back(value); + upheap(data_.size() - 1); + } + + void push(T&& value) { + data_.push_back(std::move(value)); + upheap(data_.size() - 1); + } + + const T& top() const { + assert(!empty()); + return data_.front(); + } + + void replace_top(const T& value) { + assert(!empty()); + data_.front() = value; + downheap(get_root()); + } + + void replace_top(T&& value) { + assert(!empty()); + data_.front() = std::move(value); + downheap(get_root()); + } + + void pop() { + assert(!empty()); + data_.front() = std::move(data_.back()); + data_.pop_back(); + if (!empty()) { + downheap(get_root()); + } + } + + void swap(BinaryHeap &other) { + std::swap(cmp_, other.cmp_); + data_.swap(other.data_); + } + + void clear() { + data_.clear(); + } + + bool empty() const { + return data_.empty(); + } + + private: + static inline size_t get_root() { return 0; } + static inline size_t get_parent(size_t index) { return (index - 1) / 2; } + static inline size_t get_left(size_t index) { return 2 * index + 1; } + static inline size_t get_right(size_t index) { return 2 * index + 2; } + + void upheap(size_t index) { + T v = std::move(data_[index]); + while (index > get_root()) { + const size_t parent = get_parent(index); + if (!cmp_(data_[parent], v)) { + break; + } + data_[index] = std::move(data_[parent]); + index = parent; + } + data_[index] = std::move(v); + } + + void downheap(size_t index) { + T v = std::move(data_[index]); + while (1) { + const size_t left_child = get_left(index); + if (get_left(index) >= data_.size()) { + break; + } + const size_t right_child = left_child + 1; + assert(right_child == get_right(index)); + size_t picked_child = left_child; + if (right_child < data_.size() && + cmp_(data_[left_child], data_[right_child])) { + picked_child = right_child; + } + if (!cmp_(v, data_[picked_child])) { + break; + } + data_[index] = std::move(data_[picked_child]); + index = picked_child; + } + data_[index] = std::move(v); + } + + Compare cmp_; + autovector data_; +}; + +} // namespace rocksdb From 155ce60dafbc3841da73eedf0d9dc6c4fabd8404 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 6 Jul 2015 11:14:08 -0700 Subject: [PATCH 05/18] Fix compaction_job_test Summary: Two issues: * the input keys to the compaction don't include sequence number. * sequence number is set to max(seq_num), but it should be set to max(seq_num)+1, because the condition here is strictly-larger (i.e. we will only zero-out sequence number if the DB's sequence number is strictly greater than the key's sequence number): https://github.com/facebook/rocksdb/blob/master/db/compaction_job.cc#L830 Test Plan: make compaction_job_test && ./compaction_job_test Reviewers: sdong, lovro Reviewed By: lovro Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41247 --- db/compaction_job_test.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 79d12d8a7..7eed7e490 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -76,11 +76,10 @@ class CompactionJobTest : public testing::Test { largest = internal_key; largest_seqno = sequence_number; } - std::pair key_value( - {bottommost_internal_key.Encode().ToString(), value}); - contents.insert(key_value); + contents.insert({internal_key.Encode().ToString(), value}); if (i == 1 || k < kKeysPerFile / 2) { - expected_results.insert(key_value); + expected_results.insert( + {bottommost_internal_key.Encode().ToString(), value}); } } @@ -97,7 +96,7 @@ class CompactionJobTest : public testing::Test { mutable_cf_options_, &edit, &mutex_); mutex_.Unlock(); } - versions_->SetLastSequence(sequence_number); + versions_->SetLastSequence(sequence_number + 1); return expected_results; } From 58d7ab3c68b05d544499fb32692662a29a9c6b24 Mon Sep 17 00:00:00 2001 From: Andres Notzli Date: Mon, 6 Jul 2015 22:25:27 -0700 Subject: [PATCH 06/18] Added tests for ExpandWhileOverlapping() Summary: This patch adds three test cases for ExpandWhileOverlapping() to the compaction_picker_test test suite. ExpandWhileOverlapping() only has an effect if the comparison function for the internal keys allows for overlapping user keys in different SST files on the same level. Thus, this patch adds a comparator based on sequence numbers to compaction_picker_test for the new test cases. Test Plan: - make compaction_picker_test && ./compaction_picker_test -> All tests pass - Replace body of ExpandWhileOverlapping() with `return true` -> Compile and run ./compaction_picker_test as before -> New tests fail Reviewers: sdong, yhchiang, rven, anthony, IslamAbdelRahman, kradhakrishnan, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41277 --- db/compaction_picker_test.cc | 70 ++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 149931cce..f1145afc5 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -419,6 +419,76 @@ TEST_F(CompactionPickerTest, ParentIndexResetBug) { cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); } +// This test checks ExpandWhileOverlapping() by having overlapping user keys +// ranges (with different sequence numbers) in the input files. +TEST_F(CompactionPickerTest, OverlappingUserKeys) { + NewVersionStorage(6, kCompactionStyleLevel); + Add(1, 1U, "100", "150", 1U); + // Overlapping user keys + Add(1, 2U, "200", "400", 1U); + Add(1, 3U, "400", "500", 1000000000U, 0, 0); + Add(2, 4U, "600", "700", 1U); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_levels()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(0, 1)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, OverlappingUserKeys2) { + NewVersionStorage(6, kCompactionStyleLevel); + // Overlapping user keys on same level and output level + Add(1, 1U, "200", "400", 1000000000U); + Add(1, 2U, "400", "500", 1U, 0, 0); + Add(2, 3U, "400", "600", 1U); + // The following file is not in the compaction despite overlapping user keys + Add(2, 4U, "600", "700", 1U, 0, 0); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_levels()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->num_input_files(1)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, OverlappingUserKeys3) { + NewVersionStorage(6, kCompactionStyleLevel); + // Chain of overlapping user key ranges (forces ExpandWhileOverlapping() to + // expand multiple times) + Add(1, 1U, "100", "150", 1U); + Add(1, 2U, "150", "200", 1U, 0, 0); + Add(1, 3U, "200", "250", 1000000000U, 0, 0); + Add(1, 4U, "250", "300", 1U, 0, 0); + Add(1, 5U, "300", "350", 1U, 0, 0); + // Output level overlaps with the beginning and the end of the chain + Add(2, 6U, "050", "100", 1U); + Add(2, 7U, "350", "400", 1U); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_levels()); + ASSERT_EQ(5U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->num_input_files(1)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(3U, compaction->input(0, 2)->fd.GetNumber()); + ASSERT_EQ(4U, compaction->input(0, 3)->fd.GetNumber()); + ASSERT_EQ(5U, compaction->input(0, 4)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber()); +} + } // namespace rocksdb int main(int argc, char** argv) { From e12b403991217c5472135a1e2f8faad1b915d011 Mon Sep 17 00:00:00 2001 From: Aaron Feldman Date: Tue, 7 Jul 2015 10:49:16 -0700 Subject: [PATCH 07/18] Initialize threads later in constructor Summary: This addresses a test failure where an exception occured in the constructor's call to CreateDirIfMissing(). The existence of unjoined threads prevented this exception from propogating properly. See http://stackoverflow.com/questions/7381757/c-terminate-called-without-an-active-exception Test Plan: Re-run tests from task #7626266 Reviewers: sdong, anthony, igor Reviewed By: igor Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D41313 --- utilities/backupable/backupable_db.cc | 41 ++++++++++++++------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index a450a4a82..0b5d07159 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -442,26 +442,6 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, copy_file_buffer_size_(kDefaultCopyFileBufferSize), read_only_(read_only) { - // set up threads perform copies from files_to_copy_ in the background - for (int t = 0; t < options_.max_background_operations; t++) { - threads_.emplace_back([&]() { - CopyWorkItem work_item; - while (files_to_copy_.read(work_item)) { - CopyResult result; - result.status = CopyFile(work_item.src_path, - work_item.dst_path, - work_item.src_env, - work_item.dst_env, - work_item.sync, - work_item.rate_limiter, - &result.size, - &result.checksum_value, - work_item.size_limit); - work_item.result.set_value(std::move(result)); - } - }); - } - if (read_only_) { Log(options_.info_log, "Starting read_only backup engine"); } @@ -581,6 +561,27 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, if (!read_only_) { PutLatestBackupFileContents(latest_backup_id_); // Ignore errors } + + // set up threads perform copies from files_to_copy_ in the background + for (int t = 0; t < options_.max_background_operations; t++) { + threads_.emplace_back([&]() { + CopyWorkItem work_item; + while (files_to_copy_.read(work_item)) { + CopyResult result; + result.status = CopyFile(work_item.src_path, + work_item.dst_path, + work_item.src_env, + work_item.dst_env, + work_item.sync, + work_item.rate_limiter, + &result.size, + &result.checksum_value, + work_item.size_limit); + work_item.result.set_value(std::move(result)); + } + }); + } + Log(options_.info_log, "Initialized BackupEngine"); } From 685582a0b43cb19cb12bf7122657f9ea0f8c52c4 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 7 Jul 2015 11:36:24 -0700 Subject: [PATCH 08/18] Revert two diffs related to DBIter::FindPrevUserKey() Summary: This diff reverts the following two previous diffs related to DBIter::FindPrevUserKey(), which makes db_stress unstable. We should bake a better fix for this. * "Fix a comparison in DBIter::FindPrevUserKey()" ec70fea4c4025351190eba7a02bd09bb5f083790. * "Fixed endless loop in DBIter::FindPrevUserKey()" acee2b08a2d37154b8f9e2dc74b1966202c15ec5. Test Plan: db_stress Reviewers: anthony, igor, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41301 --- db/db_iter.cc | 7 ++----- db/db_iter_test.cc | 8 ++++++-- db/db_test.cc | 4 +++- table/merger.cc | 13 ------------- 4 files changed, 11 insertions(+), 21 deletions(-) diff --git a/db/db_iter.cc b/db/db_iter.cc index 7ed00365e..6bee64635 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -350,9 +350,6 @@ void DBIter::MergeValuesNewToOld() { void DBIter::Prev() { assert(valid_); if (direction_ == kForward) { - if (!iter_->Valid()) { - iter_->SeekToLast(); - } FindPrevUserKey(); direction_ = kReverse; } @@ -556,7 +553,7 @@ void DBIter::FindNextUserKey() { ParsedInternalKey ikey; FindParseableKey(&ikey, kForward); while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) != 0) { iter_->Next(); FindParseableKey(&ikey, kForward); } @@ -571,7 +568,7 @@ void DBIter::FindPrevUserKey() { ParsedInternalKey ikey; FindParseableKey(&ikey, kReverse); while (iter_->Valid() && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) >= 0) { + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0) { if (num_skipped >= max_skip_) { num_skipped = 0; IterKey last_key; diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index e5c58e4d9..2cb81aab1 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -1668,7 +1668,9 @@ TEST_F(DBIteratorTest, DBIterator8) { ASSERT_EQ(db_iter->value().ToString(), "0"); } -TEST_F(DBIteratorTest, DBIterator9) { +// TODO(3.13): fix the issue of Seek() then Prev() which might not necessary +// return the biggest element smaller than the seek key. +TEST_F(DBIteratorTest, DISABLED_DBIterator9) { Options options; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); { @@ -1716,7 +1718,9 @@ TEST_F(DBIteratorTest, DBIterator9) { } } -TEST_F(DBIteratorTest, DBIterator10) { +// TODO(3.13): fix the issue of Seek() then Prev() which might not necessary +// return the biggest element smaller than the seek key. +TEST_F(DBIteratorTest, DISABLED_DBIterator10) { Options options; TestIterator* internal_iter = new TestIterator(BytewiseComparator()); diff --git a/db/db_test.cc b/db/db_test.cc index ddf7de9a4..9a8fe6d9e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -14111,7 +14111,9 @@ TEST_F(DBTest, RowCache) { ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1); } -TEST_F(DBTest, PrevAfterMerge) { +// TODO(3.13): fix the issue of Seek() + Prev() which might not necessary +// return the biggest key which is smaller than the seek key. +TEST_F(DBTest, DISABLED_PrevAfterMerge) { Options options; options.create_if_missing = true; options.merge_operator = MergeOperators::CreatePutOperator(); diff --git a/table/merger.cc b/table/merger.cc index 943a360e9..22886f1d6 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -179,19 +179,6 @@ class MergingIterator : public Iterator { } else { // Child has no entries >= key(). Position at last entry. child.SeekToLast(); - if (child.Valid() && comparator_->Compare(child.key(), key()) > 0) { - // Prefix bloom or prefix hash may return !Valid() if one of the - // following condition happens: 1. when prefix doesn't match. - // 2. Does not exist any row larger than the key within the prefix - // while SeekToLast() may return larger keys. - // - // Temporarily remove this child to avoid Prev() to return a key - // larger than the original keys. However, this can cause missing - // rows. - // - // TODO(3.13): need to fix. - continue; - } } } if (child.Valid()) { From 4ce5be4255f654442986f0a9c0269e41cce7d362 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 7 Jul 2015 12:10:10 -0700 Subject: [PATCH 09/18] fixed leaking log::Writers Summary: Fixes valgrind errors in column_family_test. Test Plan: `make check`, `make valgrind_check` Reviewers: igor, yhchiang Reviewed By: yhchiang Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D41181 --- db/column_family.cc | 1 + db/db_impl.cc | 1 + db/db_test.cc | 1 + db/forward_iterator.cc | 1 + db/job_context.h | 5 +++++ 5 files changed, 9 insertions(+) diff --git a/db/column_family.cc b/db/column_family.cc index 4a600f611..b39518e08 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -59,6 +59,7 @@ ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { if (job_context.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles(job_context); } + job_context.Clean(); } } diff --git a/db/db_impl.cc b/db/db_impl.cc index c599f1ef2..66eac6ad0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -542,6 +542,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, } // We're just cleaning up for DB::Write(). + assert(job_context->logs_to_free.empty()); job_context->logs_to_free = logs_to_free_; logs_to_free_.clear(); diff --git a/db/db_test.cc b/db/db_test.cc index 9a8fe6d9e..cb919c09f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12670,6 +12670,7 @@ TEST_F(DBTest, DontDeletePendingOutputs) { dbfull()->FindObsoleteFiles(&job_context, true /*force*/); dbfull()->TEST_UnlockMutex(); dbfull()->PurgeObsoleteFiles(job_context); + job_context.Clean(); }; env_->table_write_callback_ = &purge_obsolete_files_function; diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index b4410199e..5ed125930 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -169,6 +169,7 @@ void ForwardIterator::Cleanup(bool release_sv) { if (job_context.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles(job_context); } + job_context.Clean(); } } } diff --git a/db/job_context.h b/db/job_context.h index d0281443e..5a54e2d85 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -83,6 +83,10 @@ struct JobContext { new_superversion = create_superversion ? new SuperVersion() : nullptr; } + // For non-empty JobContext Clean() has to be called at least once before + // before destruction (see asserts in ~JobContext()). Should be called with + // unlocked DB mutex. Destructor doesn't call Clean() to avoid accidentally + // doing potentially slow Clean() with locked DB mutex. void Clean() { // free pending memtables for (auto m : memtables_to_free) { @@ -109,6 +113,7 @@ struct JobContext { assert(memtables_to_free.size() == 0); assert(superversions_to_free.size() == 0); assert(new_superversion == nullptr); + assert(logs_to_free.size() == 0); } }; From 59b50dcef904b86dad7fe1c45a22ff59dbc2a865 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 7 Jul 2015 12:39:36 -0700 Subject: [PATCH 10/18] Update HISTORY.md for Listener Summary: Update HISTORY.md for Listener Test Plan: no code change Reviewers: igor, sdong, IslamAbdelRahman, anthony Reviewed By: anthony Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41325 --- HISTORY.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 89ff62f5e..7eced978e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,9 @@ * Added experimental support for optimistic transactions. See include/rocksdb/utilities/optimistic_transaction.h for more info. * Added a new way to report QPS from db_bench (check out --report_file and --report_interval_seconds) * Added a cache for individual rows. See DBOptions::row_cache for more info. +* Several new features on EventListener (see include/rocksdb/listener.h): + - OnCompationCompleted() now returns per-compaciton job statistics, defined in include/rocksdb/compaction_job_stats.h. + - Added OnTableFileCreated() and OnTableFileDeleted(). ### Public API changes * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. From 57d216ea6518c7f34eaea6538690bc52e6c605d1 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 7 Jul 2015 12:45:06 -0700 Subject: [PATCH 11/18] Remove assert(current_ == CurrentReverse()) in MergingIterator::Prev() Summary: Remove assert(current_ == CurrentReverse()) in MergingIterator::Prev() because it is possible to have some keys larger than the seek-key inserted between Seek() and SeekToLast(), which makes current_ not equal to CurrentReverse(). Test Plan: db_stress Reviewers: igor, sdong, IslamAbdelRahman, anthony Reviewed By: anthony Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41331 --- table/merger.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/table/merger.cc b/table/merger.cc index 22886f1d6..f380e0137 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -186,9 +186,12 @@ class MergingIterator : public Iterator { } } direction_ = kReverse; - // The loop retreated all non-current children to be < key() so current_ - // should still be strictly the largest key. - assert(current_ == CurrentReverse()); + // Note that we don't do assert(current_ == CurrentReverse()) here + // because it is possible to have some keys larger than the seek-key + // inserted between Seek() and SeekToLast(), which makes current_ not + // equal to CurrentReverse(). + // + // assert(current_ == CurrentReverse()); } current_->Prev(); From d8e3e766f9344f9e386bb8524b204c11fabbb776 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 7 Jul 2015 14:00:03 -0700 Subject: [PATCH 12/18] Fixed a bug in test ThreadStatusSingleCompaction Summary: Fixed a bug in test ThreadStatusSingleCompaction where SyncPoint traces are not cleared before the test begins its second iteration. Test Plan: db_test Reviewers: sdong, anthony, IslamAbdelRahman, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41337 --- db/db_test.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index cb919c09f..6ae8f878a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11249,10 +11249,10 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) { {"CompactionJob::Run():Start", "DBTest::ThreadStatusSingleCompaction:1"}, {"DBTest::ThreadStatusSingleCompaction:2", "CompactionJob::Run():End"}, }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - for (int tests = 0; tests < 2; ++tests) { DestroyAndReopen(options); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Random rnd(301); // The Put Phase. @@ -11284,8 +11284,8 @@ TEST_F(DBTest, ThreadStatusSingleCompaction) { // repeat the test with disabling thread tracking. options.enable_thread_tracking = false; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(DBTest, PreShutdownManualCompaction) { From c0b23dd5b07772d44eb9af3b4c475c3e3b27cfc2 Mon Sep 17 00:00:00 2001 From: Poornima Chozhiyath Raman Date: Tue, 7 Jul 2015 14:18:55 -0700 Subject: [PATCH 13/18] Enabling trivial move in universal compaction Summary: This change enables trivial move if all the input files are non onverlapping while doing Universal Compaction. Test Plan: ./compaction_picker_test and db_test ran successfully with the new testcases. Reviewers: sdong Reviewed By: sdong Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D40875 --- HISTORY.md | 1 + db/compaction.cc | 6 ++ db/compaction.h | 18 +++++ db/compaction_picker.cc | 107 +++++++++++++++++++++++++ db/compaction_picker.h | 6 ++ db/compaction_picker_test.cc | 60 ++++++++++++++ db/db_impl.cc | 32 +++++--- db/db_test.cc | 43 ++++++++++ include/rocksdb/universal_compaction.h | 8 +- 9 files changed, 267 insertions(+), 14 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 7eced978e..c67620383 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Several new features on EventListener (see include/rocksdb/listener.h): - OnCompationCompleted() now returns per-compaciton job statistics, defined in include/rocksdb/compaction_job_stats.h. - Added OnTableFileCreated() and OnTableFileDeleted(). +* Add compaction_options_universal.enable_trivial_move to true, to allow trivial move while performing universal compaction. Trivial move will happen only when all the input files are non overlapping. ### Public API changes * EventListener::OnFlushCompleted() now passes FlushJobInfo instead of a list of parameters. diff --git a/db/compaction.cc b/db/compaction.cc index 02077923f..a7f2a9742 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -167,6 +167,12 @@ bool Compaction::IsTrivialMove() const { return false; } + // Used in universal compaction, where trivial move can be done if the + // input files are non overlapping + if (cfd_->ioptions()->compaction_options_universal.allow_trivial_move) { + return is_trivial_move_; + } + return (start_level_ != output_level_ && num_input_levels() == 1 && input(0, 0)->fd.GetPathId() == GetOutputPathId() && InputCompressionMatchesOutput() && diff --git a/db/compaction.h b/db/compaction.h index beddf2363..d40864f39 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -158,6 +158,19 @@ class Compaction { // Was this compaction triggered manually by the client? bool IsManualCompaction() { return is_manual_compaction_; } + // Used when allow_trivial_move option is set in + // Universal compaction. If all the input files are + // non overlapping, then is_trivial_move_ variable + // will be set true, else false + void set_is_trivial_move(bool trivial_move) { + is_trivial_move_ = trivial_move; + } + + // Used when allow_trivial_move option is set in + // Universal compaction. Returns true, if the input files + // are non-overlapping and can be trivially moved. + bool is_trivial_move() { return is_trivial_move_; } + // Return the MutableCFOptions that should be used throughout the compaction // procedure const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } @@ -238,6 +251,11 @@ class Compaction { // Is this compaction requested by the client? const bool is_manual_compaction_; + // True if we can do trivial move in Universal multi level + // compaction + + bool is_trivial_move_; + // "level_ptrs_" holds indices into "input_version_->levels_", where each // index remembers which file of an associated level we are currently used // to check KeyNotExistsBeyondOutputLevel() for deletion operation. diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 70e48146b..ec18498ce 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -15,6 +15,7 @@ #include #include +#include #include #include @@ -37,6 +38,64 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { return sum; } +// Used in universal compaction when trivial move is enabled. +// This structure is used for the construction of min heap +// that contains the file meta data, the level of the file +// and the index of the file in that level + +struct InputFileInfo { + FileMetaData* f; + unsigned int level; + unsigned int index; +}; + +// Used in universal compaction when trivial move is enabled. +// This comparator is used for the construction of min heap +// based on the smallest key of the file. +struct UserKeyComparator { + explicit UserKeyComparator(const Comparator* ucmp) { ucmp_ = ucmp; } + + bool operator()(InputFileInfo i1, InputFileInfo i2) const { + return (ucmp_->Compare(i1.f->smallest.user_key(), + i2.f->smallest.user_key()) > 0); + } + + private: + const Comparator* ucmp_; +}; + +typedef std::priority_queue, + UserKeyComparator> SmallestKeyHeap; + +// This function creates the heap that is used to find if the files are +// overlapping during universal compaction when the allow_trivial_move +// is set. +SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { + SmallestKeyHeap smallest_key_priority_q = + SmallestKeyHeap(UserKeyComparator(ucmp)); + + InputFileInfo input_file; + + for (unsigned int l = 0; l < c->num_input_levels(); l++) { + if (c->num_input_files(l) != 0) { + if (l == 0 && c->start_level() == 0) { + for (size_t i = 0; i < c->num_input_files(0); i++) { + input_file.f = c->input(0, i); + input_file.level = 0; + input_file.index = i; + smallest_key_priority_q.push(std::move(input_file)); + } + } else { + input_file.f = c->input(l, 0); + input_file.level = l; + input_file.index = 0; + smallest_key_priority_q.push(std::move(input_file)); + } + } + } + return smallest_key_priority_q; +} + } // anonymous namespace // Determine compression type, based on user options, level of the output @@ -1106,6 +1165,50 @@ void GetSmallestLargestSeqno(const std::vector& files, } // namespace #endif +// Algorithm that checks to see if there are any overlapping +// files in the input +bool CompactionPicker::IsInputNonOverlapping(Compaction* c) { + auto comparator = icmp_->user_comparator(); + int first_iter = 1; + + InputFileInfo prev, curr, next; + + SmallestKeyHeap smallest_key_priority_q = + create_level_heap(c, icmp_->user_comparator()); + + while (!smallest_key_priority_q.empty()) { + curr = smallest_key_priority_q.top(); + smallest_key_priority_q.pop(); + + if (first_iter) { + prev = curr; + first_iter = 0; + } else { + if (comparator->Compare(prev.f->largest.user_key(), + curr.f->smallest.user_key()) >= 0) { + // found overlapping files, return false + return false; + } + assert(comparator->Compare(curr.f->largest.user_key(), + prev.f->largest.user_key()) > 0); + prev = curr; + } + + next.f = nullptr; + + if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) { + next.f = c->input(curr.level, curr.index + 1); + next.level = curr.level; + next.index = curr.index + 1; + } + + if (next.f) { + smallest_key_priority_q.push(std::move(next)); + } + } + return true; +} + // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // @@ -1168,6 +1271,10 @@ Compaction* UniversalCompactionPicker::PickCompaction( return nullptr; } + if (ioptions_.compaction_options_universal.allow_trivial_move == true) { + c->set_is_trivial_move(IsInputNonOverlapping(c)); + } + // validate that all the chosen files of L0 are non overlapping in time #ifndef NDEBUG SequenceNumber prev_smallest_seqno = 0U; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 403410196..65ca73abf 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -105,6 +105,12 @@ class CompactionPicker { const VersionStorageInfo* vstorage, const CompactionOptions& compact_options) const; + // Used in universal compaction when the enabled_trivial_move + // option is set. Checks whether there are any overlapping files + // in the input. Returns true if the input files are non + // overlapping. + bool IsInputNonOverlapping(Compaction* c); + protected: int NumberLevels() const { return ioptions_.num_levels; } diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index f1145afc5..e6b31fbfa 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -77,6 +77,8 @@ class CompactionPickerTest : public testing::Test { f->fd = FileDescriptor(file_number, path_id, file_size); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue); + f->smallest_seqno = smallest_seq; + f->largest_seqno = largest_seq; f->compensated_file_size = file_size; f->refs = 0; vstorage_->AddFile(level, f); @@ -365,6 +367,64 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) { vstorage_->CompactionScore(0) >= 1); } } +// Tests if the files can be trivially moved in multi level +// universal compaction when allow_trivial_move option is set +// In this test as the input files overlaps, they cannot +// be trivially moved. + +TEST_F(CompactionPickerTest, CannotTrivialMoveUniversal) { + const uint64_t kFileSize = 100000; + + ioptions_.compaction_options_universal.allow_trivial_move = true; + NewVersionStorage(1, kCompactionStyleUniversal); + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + // must return false when there's no files. + ASSERT_EQ(universal_compaction_picker.NeedsCompaction(vstorage_.get()), + false); + + NewVersionStorage(3, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 2U, "201", "250", kFileSize, 0, 401, 450); + Add(0, 4U, "260", "300", kFileSize, 0, 260, 300); + Add(1, 5U, "100", "151", kFileSize, 0, 200, 251); + Add(1, 3U, "301", "350", kFileSize, 0, 101, 150); + Add(2, 6U, "120", "200", kFileSize, 0, 20, 100); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(!compaction->is_trivial_move()); +} +// Tests if the files can be trivially moved in multi level +// universal compaction when allow_trivial_move option is set +// In this test as the input files doesn't overlaps, they should +// be trivially moved. +TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) { + const uint64_t kFileSize = 100000; + + ioptions_.compaction_options_universal.allow_trivial_move = true; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(3, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(0, 2U, "201", "250", kFileSize, 0, 401, 450); + Add(0, 4U, "260", "300", kFileSize, 0, 260, 300); + Add(1, 5U, "010", "080", kFileSize, 0, 200, 251); + Add(2, 3U, "301", "350", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + + ASSERT_TRUE(compaction->is_trivial_move()); +} TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { NewVersionStorage(1, kCompactionStyleFIFO); diff --git a/db/db_impl.cc b/db/db_impl.cc index 66eac6ad0..75535c27d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2538,21 +2538,27 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // Move files to next level int32_t moved_files = 0; int64_t moved_bytes = 0; - for (size_t i = 0; i < c->num_input_files(0); i++) { - FileMetaData* f = c->input(0, i); - c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); - c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), - f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, - f->largest, f->smallest_seqno, f->largest_seqno, - f->marked_for_compaction); + for (unsigned int l = 0; l < c->num_input_levels(); l++) { + if (l == static_cast(c->output_level())) { + continue; + } + for (size_t i = 0; i < c->num_input_files(l); i++) { + FileMetaData* f = c->input(l, i); + c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); + c->edit()->AddFile(c->output_level(), f->fd.GetNumber(), + f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, + f->largest, f->smallest_seqno, f->largest_seqno, + f->marked_for_compaction); - LogToBuffer(log_buffer, - "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", - c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), - c->output_level(), f->fd.GetFileSize()); - ++moved_files; - moved_bytes += f->fd.GetFileSize(); + LogToBuffer(log_buffer, + "[%s] Moving #%" PRIu64 " to level-%d %" PRIu64 " bytes\n", + c->column_family_data()->GetName().c_str(), + f->fd.GetNumber(), c->output_level(), f->fd.GetFileSize()); + ++moved_files; + moved_bytes += f->fd.GetFileSize(); + } } + status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); diff --git a/db/db_test.cc b/db/db_test.cc index 6ae8f878a..124f6afbb 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4528,7 +4528,50 @@ TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) { ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); } } +// Tests universal compaction with trivial move enabled +TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionTrivialMove) { + int32_t trivial_move = 0; + int32_t non_trivial_move = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial_move++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Options options; + options.compaction_style = kCompactionStyleUniversal; + options.compaction_options_universal.allow_trivial_move = true; + options.num_levels = 3; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 1; + options.target_file_size_base = 32 * 1024; + options = CurrentOptions(options); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + options = CurrentOptions(options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + Random rnd(301); + int num_keys = 15000; + for (int i = 0; i < num_keys; i++) { + ASSERT_OK(Put(1, Key(i), Key(i))); + } + std::vector values; + + ASSERT_OK(Flush(1)); + dbfull()->TEST_WaitForCompact(); + + ASSERT_GT(trivial_move, 0); + ASSERT_EQ(non_trivial_move, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, DBTestUniversalCompactionMultiLevels, ::testing::Values(3, 20)); diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index 229e50b25..e0f9f830f 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -69,6 +69,11 @@ class CompactionOptionsUniversal { // Default: kCompactionStopStyleTotalSize CompactionStopStyle stop_style; + // Option to optimize the universal multi level compaction by enabling + // trivial move for non overlapping files. + // Default: false + bool allow_trivial_move; + // Default set of parameters CompactionOptionsUniversal() : size_ratio(1), @@ -76,7 +81,8 @@ class CompactionOptionsUniversal { max_merge_width(UINT_MAX), max_size_amplification_percent(200), compression_size_percent(-1), - stop_style(kCompactionStopStyleTotalSize) {} + stop_style(kCompactionStopStyleTotalSize), + allow_trivial_move(false) {} }; } // namespace rocksdb From b7a2369fb2ac8bb762553d8492c401fb80826498 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 7 Jul 2015 14:45:20 -0700 Subject: [PATCH 14/18] Revert "Replace std::priority_queue in MergingIterator with custom heap" Summary: This patch reverts "Replace std::priority_queue in MergingIterator with custom heap" (commit commit b6655a679d11f42ce9a4915f54d7995f85b7556a) as it causes db_stress failure. Test Plan: ./db_stress --test_batches_snapshots=1 --threads=32 --write_buffer_size=4194304 --destroy_db_initially=0 --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 --delpercent=5 --iterpercent=10 --db=/tmp/rocksdb_crashtest_KdCI5F --max_key=100000000 --mmap_read=0 --block_size=16384 --cache_size=1048576 --open_files=500000 --verify_checksum=1 --sync=0 --progress_reports=0 --disable_wal=0 --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=0 --memtablerep=prefix_hash --prefix_size=7 --ops_per_thread=200 --kill_random_test=97 Reviewers: igor, anthony, lovro, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41343 --- table/iter_heap.h | 16 ++-- table/merger.cc | 185 ++++++++++++++++++++++++++-------------------- util/heap.h | 140 ----------------------------------- 3 files changed, 114 insertions(+), 227 deletions(-) delete mode 100644 util/heap.h diff --git a/table/iter_heap.h b/table/iter_heap.h index 5343175c3..9569d3638 100644 --- a/table/iter_heap.h +++ b/table/iter_heap.h @@ -5,34 +5,36 @@ // #pragma once +#include #include "rocksdb/comparator.h" #include "table/iterator_wrapper.h" namespace rocksdb { -// When used with std::priority_queue, this comparison functor puts the -// iterator with the max/largest key on top. +// Return the max of two keys. class MaxIteratorComparator { public: MaxIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { - return comparator_->Compare(a->key(), b->key()) < 0; + bool operator()(IteratorWrapper* a, IteratorWrapper* b) { + return comparator_->Compare(a->key(), b->key()) <= 0; } private: const Comparator* comparator_; }; -// When used with std::priority_queue, this comparison functor puts the -// iterator with the min/smallest key on top. +// Return the max of two keys. class MinIteratorComparator { public: + // if maxHeap is set comparator returns the max value. + // else returns the min Value. + // Can use to create a minHeap or a maxHeap. MinIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { + bool operator()(IteratorWrapper* a, IteratorWrapper* b) { return comparator_->Compare(a->key(), b->key()) > 0; } private: diff --git a/table/merger.cc b/table/merger.cc index f380e0137..32220571c 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -9,6 +9,7 @@ #include "table/merger.h" +#include #include #include "rocksdb/comparator.h" @@ -17,7 +18,6 @@ #include "table/iter_heap.h" #include "table/iterator_wrapper.h" #include "util/arena.h" -#include "util/heap.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" @@ -25,8 +25,21 @@ namespace rocksdb { // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { -typedef BinaryHeap MergerMaxIterHeap; -typedef BinaryHeap MergerMinIterHeap; +typedef std::priority_queue, + MaxIteratorComparator> MergerMaxIterHeap; + +typedef std::priority_queue, + MinIteratorComparator> MergerMinIterHeap; + +// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator. +MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) { + return MergerMaxIterHeap(MaxIteratorComparator(comparator)); +} + +// Return's a new MinHeap of IteratorWrapper's using the provided Comparator. +MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) { + return MergerMinIterHeap(MinIteratorComparator(comparator)); +} } // namespace const size_t kNumIterReserve = 4; @@ -38,8 +51,10 @@ class MergingIterator : public Iterator { : is_arena_mode_(is_arena_mode), comparator_(comparator), current_(nullptr), + use_heap_(true), direction_(kForward), - minHeap_(comparator_) { + maxHeap_(NewMergerMaxIterHeap(comparator_)), + minHeap_(NewMergerMinIterHeap(comparator_)) { children_.resize(n); for (int i = 0; i < n; i++) { children_[i].Set(children[i]); @@ -49,7 +64,6 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } - current_ = CurrentForward(); } virtual void AddIterator(Iterator* iter) { @@ -58,7 +72,6 @@ class MergingIterator : public Iterator { auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { minHeap_.push(&new_wrapper); - current_ = CurrentForward(); } } @@ -78,25 +91,27 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } + FindSmallest(); direction_ = kForward; - current_ = CurrentForward(); } virtual void SeekToLast() override { ClearHeaps(); - InitMaxHeap(); for (auto& child : children_) { child.SeekToLast(); if (child.Valid()) { - maxHeap_->push(&child); + maxHeap_.push(&child); } } + FindLargest(); direction_ = kReverse; - current_ = CurrentReverse(); } virtual void Seek(const Slice& target) override { - ClearHeaps(); + // Invalidate the heap. + use_heap_ = false; + IteratorWrapper* first_child = nullptr; + for (auto& child : children_) { { PERF_TIMER_GUARD(seek_child_seek_time); @@ -105,15 +120,36 @@ class MergingIterator : public Iterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { - PERF_TIMER_GUARD(seek_min_heap_time); - minHeap_.push(&child); + // This child has valid key + if (!use_heap_) { + if (first_child == nullptr) { + // It's the first child has valid key. Only put it int + // current_. Now the values in the heap should be invalid. + first_child = &child; + } else { + // We have more than one children with valid keys. Initialize + // the heap and put the first child into the heap. + PERF_TIMER_GUARD(seek_min_heap_time); + ClearHeaps(); + minHeap_.push(first_child); + } + } + if (use_heap_) { + PERF_TIMER_GUARD(seek_min_heap_time); + minHeap_.push(&child); + } } } - direction_ = kForward; - { + if (use_heap_) { + // If heap is valid, need to put the smallest key to curent_. PERF_TIMER_GUARD(seek_min_heap_time); - current_ = CurrentForward(); + FindSmallest(); + } else { + // The heap is not valid, then the current_ iterator is the first + // one, or null if there is no first child. + current_ = first_child; } + direction_ = kForward; } virtual void Next() override { @@ -121,11 +157,10 @@ class MergingIterator : public Iterator { // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already - // true for all of the non-current children since current_ is - // the smallest child and key() == current_->key(). + // true for all of the non-current_ children since current_ is + // the smallest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. if (direction_ != kForward) { - // Otherwise, advance the non-current children. We advance current_ - // just after the if-block. ClearHeaps(); for (auto& child : children_) { if (&child != current_) { @@ -134,42 +169,36 @@ class MergingIterator : public Iterator { comparator_->Compare(key(), child.key()) == 0) { child.Next(); } - } - if (child.Valid()) { - minHeap_.push(&child); + if (child.Valid()) { + minHeap_.push(&child); + } } } direction_ = kForward; - // The loop advanced all non-current children to be > key() so current_ - // should still be strictly the smallest key. - assert(current_ == CurrentForward()); } // as the current points to the current record. move the iterator forward. + // and if it is valid add it to the heap. current_->Next(); - if (current_->Valid()) { - // current is still valid after the Next() call above. Call - // replace_top() to restore the heap property. When the same child - // iterator yields a sequence of keys, this is cheap. - minHeap_.replace_top(current_); - } else { - // current stopped being valid, remove it from the heap. - minHeap_.pop(); + if (use_heap_) { + if (current_->Valid()) { + minHeap_.push(current_); + } + FindSmallest(); + } else if (!current_->Valid()) { + current_ = nullptr; } - current_ = CurrentForward(); } virtual void Prev() override { assert(Valid()); // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already - // true for all of the non-current children since current_ is - // the largest child and key() == current_->key(). + // true for all of the non-current_ children since current_ is + // the largest child and key() == current_->key(). Otherwise, + // we explicitly position the non-current_ children. if (direction_ != kReverse) { - // Otherwise, retreat the non-current children. We retreat current_ - // just after the if-block. ClearHeaps(); - InitMaxHeap(); for (auto& child : children_) { if (&child != current_) { child.Seek(key()); @@ -180,9 +209,9 @@ class MergingIterator : public Iterator { // Child has no entries >= key(). Position at last entry. child.SeekToLast(); } - } - if (child.Valid()) { - maxHeap_->push(&child); + if (child.Valid()) { + maxHeap_.push(&child); + } } } direction_ = kReverse; @@ -196,15 +225,9 @@ class MergingIterator : public Iterator { current_->Prev(); if (current_->Valid()) { - // current is still valid after the Prev() call above. Call - // replace_top() to restore the heap property. When the same child - // iterator yields a sequence of keys, this is cheap. - maxHeap_->replace_top(current_); - } else { - // current stopped being valid, remove it from the heap. - maxHeap_->pop(); + maxHeap_.push(current_); } - current_ = CurrentReverse(); + FindLargest(); } virtual Slice key() const override { @@ -229,56 +252,58 @@ class MergingIterator : public Iterator { } private: - // Clears heaps for both directions, used when changing direction or seeking + void FindSmallest(); + void FindLargest(); void ClearHeaps(); - // Ensures that maxHeap_ is initialized when starting to go in the reverse - // direction - void InitMaxHeap(); bool is_arena_mode_; const Comparator* comparator_; autovector children_; - - // Cached pointer to child iterator with the current key, or nullptr if no - // child iterators are valid. This is the top of minHeap_ or maxHeap_ - // depending on the direction. IteratorWrapper* current_; + // If the value is true, both of iterators in the heap and current_ + // contain valid rows. If it is false, only current_ can possibly contain + // valid rows. + // This flag is always true for reverse direction, as we always use heap for + // the reverse iterating case. + bool use_heap_; // Which direction is the iterator moving? enum Direction { kForward, kReverse }; Direction direction_; + MergerMaxIterHeap maxHeap_; MergerMinIterHeap minHeap_; - // Max heap is used for reverse iteration, which is way less common than - // forward. Lazily initialize it to save memory. - std::unique_ptr maxHeap_; - - IteratorWrapper* CurrentForward() const { - assert(direction_ == kForward); - return !minHeap_.empty() ? minHeap_.top() : nullptr; - } - - IteratorWrapper* CurrentReverse() const { - assert(direction_ == kReverse); - assert(maxHeap_); - return !maxHeap_->empty() ? maxHeap_->top() : nullptr; - } }; -void MergingIterator::ClearHeaps() { - minHeap_.clear(); - if (maxHeap_) { - maxHeap_->clear(); +void MergingIterator::FindSmallest() { + assert(use_heap_); + if (minHeap_.empty()) { + current_ = nullptr; + } else { + current_ = minHeap_.top(); + assert(current_->Valid()); + minHeap_.pop(); } } -void MergingIterator::InitMaxHeap() { - if (!maxHeap_) { - maxHeap_.reset(new MergerMaxIterHeap(comparator_)); +void MergingIterator::FindLargest() { + assert(use_heap_); + if (maxHeap_.empty()) { + current_ = nullptr; + } else { + current_ = maxHeap_.top(); + assert(current_->Valid()); + maxHeap_.pop(); } } +void MergingIterator::ClearHeaps() { + use_heap_ = true; + maxHeap_ = NewMergerMaxIterHeap(comparator_); + minHeap_ = NewMergerMinIterHeap(comparator_); +} + Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, Arena* arena) { assert(n >= 0); diff --git a/util/heap.h b/util/heap.h deleted file mode 100644 index 7d9e11113..000000000 --- a/util/heap.h +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#pragma once - -#include -#include -#include -#include "util/autovector.h" - -namespace rocksdb { - -// Binary heap implementation optimized for use in multi-way merge sort. -// Comparison to std::priority_queue: -// - In libstdc++, std::priority_queue::pop() usually performs just over logN -// comparisons but never fewer. -// - std::priority_queue does not have a replace-top operation, requiring a -// pop+push. If the replacement element is the new top, this requires -// around 2logN comparisons. -// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN -// comparisons. -// - This heap provides a replace_top() operation which requires [1, 2logN] -// comparisons. When the replacement element is also the new top, this -// takes just 1 or 2 comparisons. -// -// The last property can yield an order-of-magnitude performance improvement -// when merge-sorting real-world non-random data. If the merge operation is -// likely to take chunks of elements from the same input stream, only 1 -// comparison per element is needed. In RocksDB-land, this happens when -// compacting a database where keys are not randomly distributed across L0 -// files but nearby keys are likely to be in the same L0 file. -// -// The container uses the same counterintuitive ordering as -// std::priority_queue: the comparison operator is expected to provide the -// less-than relation, but top() will return the maximum. - -template> -class BinaryHeap { - public: - BinaryHeap() { } - explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { } - - void push(const T& value) { - data_.push_back(value); - upheap(data_.size() - 1); - } - - void push(T&& value) { - data_.push_back(std::move(value)); - upheap(data_.size() - 1); - } - - const T& top() const { - assert(!empty()); - return data_.front(); - } - - void replace_top(const T& value) { - assert(!empty()); - data_.front() = value; - downheap(get_root()); - } - - void replace_top(T&& value) { - assert(!empty()); - data_.front() = std::move(value); - downheap(get_root()); - } - - void pop() { - assert(!empty()); - data_.front() = std::move(data_.back()); - data_.pop_back(); - if (!empty()) { - downheap(get_root()); - } - } - - void swap(BinaryHeap &other) { - std::swap(cmp_, other.cmp_); - data_.swap(other.data_); - } - - void clear() { - data_.clear(); - } - - bool empty() const { - return data_.empty(); - } - - private: - static inline size_t get_root() { return 0; } - static inline size_t get_parent(size_t index) { return (index - 1) / 2; } - static inline size_t get_left(size_t index) { return 2 * index + 1; } - static inline size_t get_right(size_t index) { return 2 * index + 2; } - - void upheap(size_t index) { - T v = std::move(data_[index]); - while (index > get_root()) { - const size_t parent = get_parent(index); - if (!cmp_(data_[parent], v)) { - break; - } - data_[index] = std::move(data_[parent]); - index = parent; - } - data_[index] = std::move(v); - } - - void downheap(size_t index) { - T v = std::move(data_[index]); - while (1) { - const size_t left_child = get_left(index); - if (get_left(index) >= data_.size()) { - break; - } - const size_t right_child = left_child + 1; - assert(right_child == get_right(index)); - size_t picked_child = left_child; - if (right_child < data_.size() && - cmp_(data_[left_child], data_[right_child])) { - picked_child = right_child; - } - if (!cmp_(v, data_[picked_child])) { - break; - } - data_[index] = std::move(data_[picked_child]); - index = picked_child; - } - data_[index] = std::move(v); - } - - Compare cmp_; - autovector data_; -}; - -} // namespace rocksdb From 411c8e3d198cdc2d72751b39f4a005e024fc071f Mon Sep 17 00:00:00 2001 From: Poornima Chozhiyath Raman Date: Tue, 7 Jul 2015 15:21:17 -0700 Subject: [PATCH 15/18] Build fail fix Summary: Build fail fix. Type cast issues. Test Plan: compiled Reviewers: sdong, yhchiang Reviewed By: yhchiang Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D41349 --- db/compaction_picker.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index ec18498ce..81a7f0a25 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -45,8 +45,8 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { struct InputFileInfo { FileMetaData* f; - unsigned int level; - unsigned int index; + size_t level; + size_t index; }; // Used in universal compaction when trivial move is enabled. @@ -76,7 +76,7 @@ SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { InputFileInfo input_file; - for (unsigned int l = 0; l < c->num_input_levels(); l++) { + for (size_t l = 0; l < c->num_input_levels(); l++) { if (c->num_input_files(l) != 0) { if (l == 0 && c->start_level() == 0) { for (size_t i = 0; i < c->num_input_files(0); i++) { From 4f56632b16d8ae62b7e9dd6087e22d6c161e49a9 Mon Sep 17 00:00:00 2001 From: agiardullo Date: Tue, 7 Jul 2015 16:10:23 -0700 Subject: [PATCH 16/18] Fix occasional failure in compaction_job_test Summary: Coverage test has been occasionally failing due to this timing check. Test Plan: run test Reviewers: yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D41367 --- db/compaction_job_test.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 7eed7e490..7aea87b91 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -168,9 +168,8 @@ void VerifyInitializationOfCompactionJobStats( void VerifyCompactionJobStats( const CompactionJobStats& compaction_job_stats, const std::vector& files, - size_t num_output_files, - uint64_t min_elapsed_time) { - ASSERT_GE(compaction_job_stats.elapsed_micros, min_elapsed_time); + size_t num_output_files) { + ASSERT_GE(compaction_job_stats.elapsed_micros, 0U); ASSERT_EQ(compaction_job_stats.num_input_files, files.size()); ASSERT_EQ(compaction_job_stats.num_output_files, num_output_files); } @@ -209,7 +208,6 @@ TEST_F(CompactionJobTest, Simple) { std::move(yield_callback), &event_logger, false, db_name, &compaction_job_stats); - auto start_micros = Env::Default()->NowMicros(); VerifyInitializationOfCompactionJobStats(compaction_job_stats); compaction_job.Prepare(); @@ -223,7 +221,7 @@ TEST_F(CompactionJobTest, Simple) { VerifyCompactionJobStats( compaction_job_stats, - files, 1, (Env::Default()->NowMicros() - start_micros) / 2); + files, 1); mock_table_factory_->AssertLatestFile(expected_results); ASSERT_EQ(yield_callback_called, 20000); From e2e3d84b2c3727d6bb05fd18b4d97136e3a144c8 Mon Sep 17 00:00:00 2001 From: krad Date: Mon, 29 Jun 2015 14:57:18 -0700 Subject: [PATCH 17/18] Added multi WAL log testing to recovery tests. Summary: Currently there is no test in the suite to test the case where there are multiple WAL files and there is a corruption in one of them. We have tests for single WAL file corruption scenarios. Added tests to mock the scenarios for all combinations of recovery modes and corruption in specified file locations. Test Plan: Run make check Reviewers: sdong igor CC: leveldb@ Task ID: #7501229 Blame Rev: --- db/db_test.cc | 264 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 178 insertions(+), 86 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 124f6afbb..a722d6eb8 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8704,28 +8704,78 @@ TEST_F(DBTest, TransactionLogIteratorCorruptedLog) { // // Test WAL recovery for the various modes available -// TODO krad: -// 1. Add tests when there are more than one log file // class RecoveryTestHelper { public: - // Recreate and fill the store with some data - static size_t FillData(DBTest* test, const Options& options) { - size_t count = 0; - test->DestroyAndReopen(options); + // Number of WAL files to generate + static const int kWALFilesCount = 10; + // Starting number for the WAL file name like 00010.log + static const int kWALFileOffset = 10; + // Keys to be written per WAL file + static const int kKeysPerWALFile = 1024; + // Size of the value + static const int kValueSize = 10; - for (int i = 0; i < 1024; i++) { - test->Put("key" + ToString(i), test->DummyString(10)); - ++count; + // Create WAL files with values filled in + static void FillData(DBTest* test, Options& options, + const size_t wal_count, size_t & count) { + DBOptions & db_options = options; + + count = 0; + + shared_ptr table_cache = NewLRUCache(50000, 16); + EnvOptions env_options; + WriteBuffer write_buffer(db_options.db_write_buffer_size); + + unique_ptr versions; + unique_ptr wal_manager; + WriteController write_controller; + + versions.reset(new VersionSet(test->dbname_, &db_options, env_options, + table_cache.get(), &write_buffer, + &write_controller)); + + wal_manager.reset(new WalManager(db_options, env_options)); + + std::unique_ptr current_log_writer; + + for (size_t j = kWALFileOffset; j < wal_count + kWALFileOffset; j++) { + uint64_t current_log_number = j; + std::string fname = LogFileName(test->dbname_, current_log_number); + unique_ptr file; + ASSERT_OK(db_options.env->NewWritableFile(fname, &file, env_options)); + current_log_writer.reset(new log::Writer(std::move(file))); + + for (int i = 0; i < kKeysPerWALFile; i++) { + std::string key = "key" + ToString(count++); + std::string value = test->DummyString(kValueSize); + assert(current_log_writer.get() != nullptr); + uint64_t seq = versions->LastSequence() + 1; + WriteBatch batch; + batch.Put(key, value); + WriteBatchInternal::SetSequence(&batch, seq); + current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); + versions->SetLastSequence(seq); + } } + } + + // Recreate and fill the store with some data + static size_t FillData(DBTest* test, Options& options) { + options.create_if_missing = true; + test->DestroyAndReopen(options); + test->Close(); + + size_t count = 0; + FillData(test, options, kWALFilesCount, count); return count; } // Read back all the keys we wrote and return the number of keys found static size_t GetData(DBTest* test) { size_t count = 0; - for (size_t i = 0; i < 1024; i++) { + for (size_t i = 0; i < kWALFilesCount * kKeysPerWALFile; i++) { if (test->Get("key" + ToString(i)) != "NOT_FOUND") { ++count; } @@ -8733,6 +8783,23 @@ class RecoveryTestHelper { return count; } + // Manuall corrupt the specified WAL + static void CorruptWAL(DBTest * test, Options& options, + const double off, const double len, + const int wal_file_id, const bool trunc = false) { + Env* env = options.env; + std::string fname = LogFileName(test->dbname_, wal_file_id); + uint64_t size; + ASSERT_OK(env->GetFileSize(fname, &size)); + ASSERT_GT(size, 0); + + if (trunc) { + ASSERT_EQ(0, truncate(fname.c_str(), size * off)); + } else { + InduceCorruption(fname, size * off, size * len); + } + } + // Overwrite data with 'a' from offset for length len static void InduceCorruption(const std::string& filename, uint32_t offset, uint32_t len) { @@ -8749,23 +8816,6 @@ class RecoveryTestHelper { close(fd); } - - // Corrupt the last WAL file from (filesize * off) for length (filesize * len) - static void CorruptWAL(DBTest* test, const double off, const double len, - const bool trunc = false) { - rocksdb::VectorLogPtr wal_files; - ASSERT_OK(test->dbfull()->GetSortedWalFiles(wal_files)); - ASSERT_EQ(wal_files.size(), 1); - const auto logfile_path = - test->dbname_ + "/" + wal_files.front()->PathName(); - auto size = wal_files.front()->SizeFileBytes(); - - if (trunc) { - ASSERT_EQ(0, truncate(logfile_path.c_str(), size * off)); - } else { - InduceCorruption(logfile_path, size * off, size * len); - } - } }; // Test scope: @@ -8773,26 +8823,32 @@ class RecoveryTestHelper { // at the end of any of the logs // - We do not expect to open the data store for corruption TEST_F(DBTest, kTolerateCorruptedTailRecords) { - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { - // Fill data for testing - Options options = CurrentOptions(); - const size_t row_count = RecoveryTestHelper::FillData(this, options); + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; - // test checksum failure or parsing - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Corruption offset position */ + for (int j = jstart; j < jend; j++) { /* WAL file */ + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); + // test checksum failure or parsing + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/ .1, /*wal=*/ j, trunc); - if (trunc) { - options.wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords; - ASSERT_OK(TryReopen(options)); - const size_t recovered_row_count = RecoveryTestHelper::GetData(this); - ASSERT_TRUE(i == 0 || recovered_row_count > 0); - ASSERT_LT(recovered_row_count, row_count); - } else { - options.wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords; - ASSERT_NOK(TryReopen(options)); + if (trunc) { + options.wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + const size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_TRUE(i == 0 || recovered_row_count > 0); + ASSERT_LT(recovered_row_count, row_count); + } else { + options.wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords; + ASSERT_NOK(TryReopen(options)); + } } } } @@ -8802,23 +8858,34 @@ TEST_F(DBTest, kTolerateCorruptedTailRecords) { // We don't expect the data store to be opened if there is any corruption // (leading, middle or trailing -- incomplete writes or corruption) TEST_F(DBTest, kAbsoluteConsistency) { + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; + + // Verify clean slate behavior Options options = CurrentOptions(); const size_t row_count = RecoveryTestHelper::FillData(this, options); options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + options.create_if_missing = false; ASSERT_OK(TryReopen(options)); ASSERT_EQ(RecoveryTestHelper::GetData(this), row_count); - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Corruption offset position */ if (trunc && i == 0) { continue; } - options = CurrentOptions(); - RecoveryTestHelper::FillData(this, options); - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); - options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; - ASSERT_NOK(TryReopen(options)); + for (int j = jstart; j < jend; j++) { /* wal files */ + // fill with new date + RecoveryTestHelper::FillData(this, options); + // corrupt the wal + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/.1, j, trunc); + // verify + options.wal_recovery_mode = WALRecoveryMode::kAbsoluteConsistency; + options.create_if_missing = false; + ASSERT_NOK(TryReopen(options)); + } } } } @@ -8827,34 +8894,49 @@ TEST_F(DBTest, kAbsoluteConsistency) { // - We expect to open data store under all circumstances // - We expect only data upto the point where the first error was encountered TEST_F(DBTest, kPointInTimeRecovery) { - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { - // Fill data for testing - Options options = CurrentOptions(); - const size_t row_count = RecoveryTestHelper::FillData(this, options); + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; + const int maxkeys = RecoveryTestHelper::kWALFilesCount * + RecoveryTestHelper::kKeysPerWALFile; - // test checksum failure or parsing - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Offset of corruption */ + for (int j = jstart; j < jend; j++) { /* WAL file */ + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); - options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + // Corrupt the wal + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/.1, j, trunc); - ASSERT_OK(TryReopen(options)); + // Verify + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); - size_t recovered_row_count = RecoveryTestHelper::GetData(this); - ASSERT_LT(recovered_row_count, row_count); + // Probe data for invariants + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); - // verify that the keys are sequential and there is no break - bool expect_data = true; - for (size_t j = 0; j < 1024; ++j) { - bool found = Get("key" + ToString(i)) != "NOT_FOUND"; - if (expect_data && !found) { - expect_data = false; + bool expect_data = true; + for (size_t k = 0; k < maxkeys; ++k) { + bool found = Get("key" + ToString(i)) != "NOT_FOUND"; + if (expect_data && !found) { + expect_data = false; + } + ASSERT_EQ(found, expect_data); } - ASSERT_EQ(found, expect_data); - } - ASSERT_TRUE(i != 0 || recovered_row_count == 0); - ASSERT_TRUE(i != 1 || recovered_row_count < row_count / 2); + const size_t min = RecoveryTestHelper::kKeysPerWALFile * + (j - RecoveryTestHelper::kWALFileOffset); + ASSERT_GE(recovered_row_count, min); + if (!trunc && i != 0) { + const size_t max = RecoveryTestHelper::kKeysPerWALFile * + (j - RecoveryTestHelper::kWALFileOffset + 1); + ASSERT_LE(recovered_row_count, max); + } + } } } } @@ -8863,22 +8945,32 @@ TEST_F(DBTest, kPointInTimeRecovery) { // - We expect to open the data store under all scenarios // - We expect to have recovered records past the corruption zone TEST_F(DBTest, kSkipAnyCorruptedRecords) { - for (auto trunc : {true, false}) { - for (int i = 0; i < 4; i++) { - // Fill data for testing - Options options = CurrentOptions(); - const size_t row_count = RecoveryTestHelper::FillData(this, options); + const int jstart = RecoveryTestHelper::kWALFileOffset; + const int jend = jstart + RecoveryTestHelper::kWALFilesCount; - // induce leading corruption - RecoveryTestHelper::CorruptWAL(this, i * .3, /*len%=*/.1, trunc); + for (auto trunc : {true, false}) { /* Corruption style */ + for (int i = 0; i < 4; i++) { /* Corruption offset */ + for (int j = jstart; j < jend; j++) { /* wal files */ + // Fill data for testing + Options options = CurrentOptions(); + const size_t row_count = RecoveryTestHelper::FillData(this, options); - options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; - ASSERT_OK(TryReopen(options)); - size_t recovered_row_count = RecoveryTestHelper::GetData(this); - ASSERT_LT(recovered_row_count, row_count); + // Corrupt the WAL + RecoveryTestHelper::CorruptWAL(this, options, /*off=*/ i * .3, + /*len%=*/.1, j, trunc); - if (!trunc) { - ASSERT_TRUE(i != 0 || recovered_row_count > 0); + // Verify behavior + options.wal_recovery_mode = WALRecoveryMode::kSkipAnyCorruptedRecords; + options.create_if_missing = false; + ASSERT_OK(TryReopen(options)); + + // Probe data for invariants + size_t recovered_row_count = RecoveryTestHelper::GetData(this); + ASSERT_LT(recovered_row_count, row_count); + + if (!trunc) { + ASSERT_TRUE(i != 0 || recovered_row_count > 0); + } } } } From 4bed00a44bdc0e64f92a57109d4c747d535e002f Mon Sep 17 00:00:00 2001 From: Poornima Chozhiyath Raman Date: Wed, 8 Jul 2015 15:21:10 -0700 Subject: [PATCH 18/18] Fix function name format according to google style Summary: Change the naming style of getter and setters according to Google C++ style in compaction.h file Test Plan: Compilation success Reviewers: sdong Reviewed By: sdong Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D41265 --- db/compaction.cc | 2 +- db/compaction.h | 14 +++++++------- db/compaction_job.cc | 18 +++++++++--------- db/db_impl.cc | 8 ++++---- db/db_test.cc | 6 +++--- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index a7f2a9742..8d4b5efda 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -174,7 +174,7 @@ bool Compaction::IsTrivialMove() const { } return (start_level_ != output_level_ && num_input_levels() == 1 && - input(0, 0)->fd.GetPathId() == GetOutputPathId() && + input(0, 0)->fd.GetPathId() == output_path_id() && InputCompressionMatchesOutput() && TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } diff --git a/db/compaction.h b/db/compaction.h index d40864f39..64cd3565a 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -109,20 +109,20 @@ class Compaction { } // Maximum size of files to build during this compaction. - uint64_t MaxOutputFileSize() const { return max_output_file_size_; } + uint64_t max_output_file_size() const { return max_output_file_size_; } // What compression for output - CompressionType OutputCompressionType() const { return output_compression_; } + CompressionType output_compression() const { return output_compression_; } // Whether need to write output file to second DB path. - uint32_t GetOutputPathId() const { return output_path_id_; } + uint32_t output_path_id() const { return output_path_id_; } // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; // If true, then the compaction can be done by simply deleting input files. - bool IsDeletionCompaction() const { + bool deletion_compaction() const { return deletion_compaction_; } @@ -150,13 +150,13 @@ class Compaction { double score() const { return score_; } // Is this compaction creating a file in the bottom most level? - bool BottomMostLevel() { return bottommost_level_; } + bool bottommost_level() { return bottommost_level_; } // Does this compaction include all sst files? - bool IsFullCompaction() { return is_full_compaction_; } + bool is_full_compaction() { return is_full_compaction_; } // Was this compaction triggered manually by the client? - bool IsManualCompaction() { return is_manual_compaction_; } + bool is_manual_compaction() { return is_manual_compaction_; } // Used when allow_trivial_move option is set in // Universal compaction. If all the input files are diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 9a4b7bc91..ac07851f2 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -238,12 +238,12 @@ void CompactionJob::ReportStartedCompaction( // In the current design, a CompactionJob is always created // for non-trivial compaction. assert(compaction->IsTrivialMove() == false || - compaction->IsManualCompaction() == true); + compaction->is_manual_compaction() == true); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_PROP_FLAGS, - compaction->IsManualCompaction() + - (compaction->IsDeletionCompaction() << 1)); + compaction->is_manual_compaction() + + (compaction->deletion_compaction() << 1)); ThreadStatusUtil::SetThreadOperationProperty( ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, @@ -263,7 +263,7 @@ void CompactionJob::ReportStartedCompaction( if (compaction_job_stats_) { compaction_job_stats_->is_manual_compaction = - compaction->IsManualCompaction(); + compaction->is_manual_compaction(); } } @@ -298,7 +298,7 @@ void CompactionJob::Prepare() { } // Is this compaction producing files at the bottommost level? - bottommost_level_ = compact_->compaction->BottomMostLevel(); + bottommost_level_ = compact_->compaction->bottommost_level(); } Status CompactionJob::Run() { @@ -864,7 +864,7 @@ Status CompactionJob::ProcessKeyValueCompaction(int64_t* imm_micros, // Close output file if it is big enough if (compact_->builder->FileSize() >= - compact_->compaction->MaxOutputFileSize()) { + compact_->compaction->max_output_file_size()) { status = FinishCompactionOutputFile(input); if (!status.ok()) { break; @@ -1160,7 +1160,7 @@ Status CompactionJob::OpenCompactionOutputFile() { uint64_t file_number = versions_->NewFileNumber(); // Make the output file std::string fname = TableFileName(db_options_.db_paths, file_number, - compact_->compaction->GetOutputPathId()); + compact_->compaction->output_path_id()); Status s = env_->NewWritableFile(fname, &compact_->outfile, env_options_); if (!s.ok()) { @@ -1174,7 +1174,7 @@ Status CompactionJob::OpenCompactionOutputFile() { } CompactionState::Output out; out.number = file_number; - out.path_id = compact_->compaction->GetOutputPathId(); + out.path_id = compact_->compaction->output_path_id(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; @@ -1198,7 +1198,7 @@ Status CompactionJob::OpenCompactionOutputFile() { compact_->builder.reset(NewTableBuilder( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), compact_->outfile.get(), - compact_->compaction->OutputCompressionType(), + compact_->compaction->output_compression(), cfd->ioptions()->compression_opts, skip_filters)); LogFlush(db_options_.info_log); return s; diff --git a/db/db_impl.cc b/db/db_impl.cc index 75535c27d..030adc8e2 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1617,7 +1617,7 @@ Status DBImpl::CompactFilesImpl( assert(c); c->SetInputVersion(version); // deletion compaction currently not allowed in CompactFiles. - assert(!c->IsDeletionCompaction()); + assert(!c->deletion_compaction()); auto yield_callback = [&]() { return CallFlushDuringCompaction( @@ -1628,7 +1628,7 @@ Status DBImpl::CompactFilesImpl( CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->GetOutputPathId()), stats_, + directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), &event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_, nullptr); // Here we pass a nullptr for CompactionJobStats because @@ -2504,7 +2504,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); - } else if (c->IsDeletionCompaction()) { + } else if (c->deletion_compaction()) { // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old // file if there is alive snapshot pointing to it assert(c->num_input_files(1) == 0); @@ -2597,7 +2597,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, CompactionJob compaction_job( job_context->job_id, c.get(), db_options_, env_options_, versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->GetOutputPathId()), stats_, + directories_.GetDataDir(c->output_path_id()), stats_, snapshots_.GetAll(), table_cache_, std::move(yield_callback), &event_logger_, c->mutable_cf_options()->paranoid_file_checks, dbname_, &compaction_job_stats); diff --git a/db/db_test.cc b/db/db_test.cc index a722d6eb8..42f4bf0cb 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -12292,7 +12292,7 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { Compaction* compaction = reinterpret_cast(arg); if (compaction->output_level() == 4) { - ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + ASSERT_TRUE(compaction->output_compression() == kLZ4Compression); num_lz4.fetch_add(1); } }); @@ -12327,10 +12327,10 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { Compaction* compaction = reinterpret_cast(arg); if (compaction->output_level() == 4 && compaction->start_level() == 3) { - ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression); + ASSERT_TRUE(compaction->output_compression() == kZlibCompression); num_zlib.fetch_add(1); } else { - ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + ASSERT_TRUE(compaction->output_compression() == kLZ4Compression); num_lz4.fetch_add(1); } });