diff --git a/db/column_family.cc b/db/column_family.cc index 0beb23c91..a728a3fd5 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -326,13 +326,14 @@ void ColumnFamilyData::RecalculateWriteStallConditions( auto write_controller = column_family_set_->write_controller_; - if (imm()->size() == options_.max_write_buffer_number) { + if (imm()->size() >= mutable_cf_options.max_write_buffer_number) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1); Log(ioptions_.info_log, "[%s] Stopping writes because we have %d immutable memtables " - "(waiting for flush)", - name_.c_str(), imm()->size()); + "(waiting for flush), max_write_buffer_number is set to %d", + name_.c_str(), imm()->size(), + mutable_cf_options.max_write_buffer_number); } else if (current_->NumLevelFiles(0) >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); @@ -353,8 +354,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stalling writes because we have %d level-0 files (%" PRIu64 "us)", name_.c_str(), current_->NumLevelFiles(0), slowdown); - } else if (options_.hard_rate_limit > 1.0 && - score > options_.hard_rate_limit) { + } else if (mutable_cf_options.hard_rate_limit > 1.0 && + score > mutable_cf_options.hard_rate_limit) { uint64_t kHardLimitSlowdown = 1000; write_controller_token_ = write_controller->GetDelayToken(kHardLimitSlowdown); @@ -364,10 +365,11 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stalling writes because we hit hard limit on level %d. " "(%" PRIu64 "us)", name_.c_str(), max_level, kHardLimitSlowdown); - } else if (options_.soft_rate_limit > 0.0 && - score > options_.soft_rate_limit) { - uint64_t slowdown = SlowdownAmount(score, options_.soft_rate_limit, - options_.hard_rate_limit); + } else if (mutable_cf_options.soft_rate_limit > 0.0 && + score > mutable_cf_options.soft_rate_limit) { + uint64_t slowdown = SlowdownAmount(score, + mutable_cf_options.soft_rate_limit, + mutable_cf_options.hard_rate_limit); write_controller_token_ = write_controller->GetDelayToken(slowdown); internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true); Log(ioptions_.info_log, diff --git a/db/db_impl.cc b/db/db_impl.cc index 5abfb4ac2..d8df10c5a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1236,9 +1236,12 @@ Status DBImpl::Recover( SetTickerCount(stats_, SEQUENCE_NUMBER, versions_->LastSequence()); } + // Initial value + max_total_in_memory_state_ = 0; for (auto cfd : *versions_->GetColumnFamilySet()) { - max_total_in_memory_state_ += cfd->options()->write_buffer_size * - cfd->options()->max_write_buffer_number; + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; } return s; @@ -1724,9 +1727,37 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, bool DBImpl::SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { - auto cfh = reinterpret_cast(column_family); - MutexLock l(&mutex_); - return cfh->cfd()->SetOptions(options_map); + auto* cfd = reinterpret_cast(column_family)->cfd(); + if (options_map.empty()) { + Log(db_options_.info_log, "SetOptions() on column family [%s], empty input", + cfd->GetName().c_str()); + return false; + } + + MutableCFOptions new_options; + bool succeed = false; + { + MutexLock l(&mutex_); + if (cfd->SetOptions(options_map)) { + new_options = *cfd->GetLatestMutableCFOptions(); + succeed = true; + } + } + + Log(db_options_.info_log, "SetOptions() on column family [%s], inputs:", + cfd->GetName().c_str()); + for (const auto& o : options_map) { + Log(db_options_.info_log, "%s: %s\n", o.first.c_str(), o.second.c_str()); + } + if (succeed) { + Log(db_options_.info_log, "[%s] SetOptions succeeded", + cfd->GetName().c_str()); + new_options.Dump(db_options_.info_log.get()); + } else { + Log(db_options_.info_log, "[%s] SetOptions failed", + cfd->GetName().c_str()); + } + return succeed; } // return the same level if it cannot be moved @@ -1803,8 +1834,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, db_directory_.get()); - superversion_to_free = cfd->InstallSuperVersion( - new_superversion, &mutex_, mutable_cf_options); + superversion_to_free = InstallSuperVersion( + cfd, new_superversion, mutable_cf_options); new_superversion = nullptr; Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(), @@ -1840,10 +1871,10 @@ int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) { return cfh->cfd()->options()->level0_stop_writes_trigger; } -Status DBImpl::Flush(const FlushOptions& options, +Status DBImpl::Flush(const FlushOptions& flush_options, ColumnFamilyHandle* column_family) { auto cfh = reinterpret_cast(column_family); - return FlushMemTable(cfh->cfd(), options); + return FlushMemTable(cfh->cfd(), flush_options); } SequenceNumber DBImpl::GetLatestSequenceNumber() const { @@ -1933,7 +1964,7 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, } Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, - const FlushOptions& options) { + const FlushOptions& flush_options) { Status s; { WriteContext context; @@ -1957,7 +1988,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, write_thread_.ExitWriteThread(&w, &w, s); } - if (s.ok() && options.wait) { + if (s.ok() && flush_options.wait) { // Wait until the compaction completes s = WaitForFlushMemTable(cfd); } @@ -2320,12 +2351,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->options()->disable_auto_compactions) { + // Pick up latest mutable CF Options and use it throughout the + // compaction job + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + if (!mutable_cf_options->disable_auto_compactions) { // NOTE: try to avoid unnecessary copy of MutableCFOptions if // compaction is not necessary. Need to make sure mutex is held // until we make a copy in the following code - c.reset(cfd->PickCompaction( - *cfd->GetLatestMutableCFOptions(), log_buffer)); + c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); if (c != nullptr) { // update statistics MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, @@ -3441,7 +3474,7 @@ static void CleanupIteratorState(void* arg1, void* arg2) { } } // namespace -Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, +Iterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, ColumnFamilyData* cfd, SuperVersion* super_version, Arena* arena) { @@ -3451,11 +3484,11 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); // Collect iterator for mutable mem merge_iter_builder.AddIterator( - super_version->mem->NewIterator(options, arena)); + super_version->mem->NewIterator(read_options, arena)); // Collect all needed child iterators for immutable memtables - super_version->imm->AddIterators(options, &merge_iter_builder); + super_version->imm->AddIterators(read_options, &merge_iter_builder); // Collect iterators for files in L0 - Ln - super_version->current->AddIterators(options, env_options_, + super_version->current->AddIterators(read_options, env_options_, &merge_iter_builder); internal_iter = merge_iter_builder.Finish(); IterState* cleanup = new IterState(this, &mutex_, super_version); @@ -3468,10 +3501,10 @@ ColumnFamilyHandle* DBImpl::DefaultColumnFamily() const { return default_cf_handle_; } -Status DBImpl::Get(const ReadOptions& options, +Status DBImpl::Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { - return GetImpl(options, column_family, key, value); + return GetImpl(read_options, column_family, key, value); } // DeletionState gets created and destructed outside of the lock -- we @@ -3488,17 +3521,39 @@ void DBImpl::InstallSuperVersion( ColumnFamilyData* cfd, DeletionState& deletion_state, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); - // if new_superversion == nullptr, it means somebody already used it - SuperVersion* new_superversion = - (deletion_state.new_superversion != nullptr) ? - deletion_state.new_superversion : new SuperVersion(); SuperVersion* old_superversion = - cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options); + InstallSuperVersion(cfd, deletion_state.new_superversion, + mutable_cf_options); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); } -Status DBImpl::GetImpl(const ReadOptions& options, +SuperVersion* DBImpl::InstallSuperVersion( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options) { + mutex_.AssertHeld(); + auto* old = cfd->InstallSuperVersion( + new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); + + // We want to schedule potential flush or compactions since new options may + // have been picked up in this new version. New options may cause flush + // compaction trigger condition to change. + MaybeScheduleFlushOrCompaction(); + + // Update max_total_in_memory_state_ + auto old_memtable_size = 0; + if (old) { + old_memtable_size = old->mutable_cf_options.write_buffer_size * + old->mutable_cf_options.max_write_buffer_number; + } + max_total_in_memory_state_ = + max_total_in_memory_state_ - old_memtable_size + + mutable_cf_options.write_buffer_size * + mutable_cf_options.max_write_buffer_number; + return old; +} + +Status DBImpl::GetImpl(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { StopWatch sw(env_, stats_, DB_GET); @@ -3508,8 +3563,9 @@ Status DBImpl::GetImpl(const ReadOptions& options, auto cfd = cfh->cfd(); SequenceNumber snapshot; - if (options.snapshot != nullptr) { - snapshot = reinterpret_cast(options.snapshot)->number_; + if (read_options.snapshot != nullptr) { + snapshot = reinterpret_cast( + read_options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); } @@ -3535,7 +3591,8 @@ Status DBImpl::GetImpl(const ReadOptions& options, RecordTick(stats_, MEMTABLE_HIT); } else { PERF_TIMER_GUARD(get_from_output_files_time); - sv->current->Get(options, lkey, value, &s, &merge_context, value_found); + sv->current->Get(read_options, lkey, value, &s, &merge_context, + value_found); RecordTick(stats_, MEMTABLE_MISS); } @@ -3551,7 +3608,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, } std::vector DBImpl::MultiGet( - const ReadOptions& options, + const ReadOptions& read_options, const std::vector& column_family, const std::vector& keys, std::vector* values) { @@ -3577,8 +3634,9 @@ std::vector DBImpl::MultiGet( } mutex_.Lock(); - if (options.snapshot != nullptr) { - snapshot = reinterpret_cast(options.snapshot)->number_; + if (read_options.snapshot != nullptr) { + snapshot = reinterpret_cast( + read_options.snapshot)->number_; } else { snapshot = versions_->LastSequence(); } @@ -3621,7 +3679,8 @@ std::vector DBImpl::MultiGet( // Done } else { PERF_TIMER_GUARD(get_from_output_files_time); - super_version->current->Get(options, lkey, value, &s, &merge_context); + super_version->current->Get(read_options, lkey, value, &s, + &merge_context); } if (s.ok()) { @@ -3659,7 +3718,7 @@ std::vector DBImpl::MultiGet( return stat_list; } -Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, +Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { *handle = nullptr; @@ -3674,26 +3733,23 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(new_id); edit.SetLogNumber(logfile_number_); - edit.SetComparatorName(options.comparator->Name()); + edit.SetComparatorName(cf_options.comparator->Name()); // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object - Options opt(db_options_, options); + Options opt(db_options_, cf_options); Status s = versions_->LogAndApply(nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), - &edit, &mutex_, db_directory_.get(), false, &options); + &edit, &mutex_, db_directory_.get(), false, &cf_options); if (s.ok()) { single_column_family_mode_ = false; auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_, - *cfd->GetLatestMutableCFOptions()); + delete InstallSuperVersion(cfd, nullptr, *cfd->GetLatestMutableCFOptions()); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); Log(db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); - max_total_in_memory_state_ += cfd->options()->write_buffer_size * - cfd->options()->max_write_buffer_number; } else { Log(db_options_.info_log, "Creating column family [%s] FAILED -- %s", column_family_name.c_str(), s.ToString().c_str()); @@ -3712,7 +3768,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { edit.DropColumnFamily(); edit.SetColumnFamily(cfd->GetID()); - Status s; { MutexLock l(&mutex_); @@ -3732,8 +3787,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { if (s.ok()) { assert(cfd->IsDropped()); - max_total_in_memory_state_ -= cfd->options()->write_buffer_size * - cfd->options()->max_write_buffer_number; + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; Log(db_options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); } else { @@ -3745,14 +3801,14 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { return s; } -bool DBImpl::KeyMayExist(const ReadOptions& options, +bool DBImpl::KeyMayExist(const ReadOptions& read_options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { if (value_found != nullptr) { // falsify later if key-may-exist but can't fetch value *value_found = true; } - ReadOptions roptions = options; + ReadOptions roptions = read_options; roptions.read_tier = kBlockCacheTier; // read from block cache only auto s = GetImpl(roptions, column_family, key, value, value_found); @@ -3941,23 +3997,23 @@ Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, } } -Status DBImpl::Delete(const WriteOptions& options, +Status DBImpl::Delete(const WriteOptions& write_options, ColumnFamilyHandle* column_family, const Slice& key) { - return DB::Delete(options, column_family, key); + return DB::Delete(write_options, column_family, key); } -Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { +Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { if (my_batch == nullptr) { return Status::Corruption("Batch is nullptr!"); } PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(&mutex_); w.batch = my_batch; - w.sync = options.sync; - w.disableWAL = options.disableWAL; + w.sync = write_options.sync; + w.disableWAL = write_options.disableWAL; w.in_batch_group = false; w.done = false; - w.timeout_hint_us = options.timeout_hint_us; + w.timeout_hint_us = write_options.timeout_hint_us; uint64_t expiration_time = 0; bool has_timeout = false; @@ -3968,7 +4024,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { has_timeout = true; } - if (!options.disableWAL) { + if (!write_options.disableWAL) { RecordTick(stats_, WRITE_WITH_WAL); default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); } @@ -4036,7 +4092,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { if (UNLIKELY(status.ok()) && (write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) { - DelayWrite(expiration_time); + status = DelayWrite(expiration_time); } if (UNLIKELY(status.ok() && has_timeout && @@ -4074,13 +4130,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // Record statistics RecordTick(stats_, NUMBER_KEYS_WRITTEN, my_batch_count); RecordTick(stats_, BYTES_WRITTEN, WriteBatchInternal::ByteSize(updates)); - if (options.disableWAL) { + if (write_options.disableWAL) { flush_on_destroy_ = true; } PERF_TIMER_STOP(write_pre_and_post_process_time); uint64_t log_size = 0; - if (!options.disableWAL) { + if (!write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); Slice log_entry = WriteBatchInternal::Contents(updates); status = log_->AddRecord(log_entry); @@ -4089,7 +4145,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { log_empty_ = false; log_size = log_entry.size(); RecordTick(stats_, WAL_FILE_BYTES, log_size); - if (status.ok() && options.sync) { + if (status.ok() && write_options.sync) { RecordTick(stats_, WAL_FILE_SYNCED); StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); if (db_options_.use_fsync) { @@ -4104,7 +4160,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { status = WriteBatchInternal::InsertInto( updates, column_family_memtables_.get(), - options.ignore_missing_column_families, 0, this, false); + write_options.ignore_missing_column_families, 0, this, false); // A non-OK status here indicates iteration failure (either in-memory // writebatch corruption (very bad), or the client specified invalid // column family). This will later on trigger bg_error_. @@ -4123,7 +4179,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // internal stats default_cf_internal_stats_->AddDBStats( InternalStats::BYTES_WRITTEN, batch_size); - if (!options.disableWAL) { + if (!write_options.disableWAL) { default_cf_internal_stats_->AddDBStats( InternalStats::WAL_FILE_SYNCED, 1); default_cf_internal_stats_->AddDBStats( @@ -4151,7 +4207,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -void DBImpl::DelayWrite(uint64_t expiration_time) { +Status DBImpl::DelayWrite(uint64_t expiration_time) { StopWatch sw(env_, stats_, WRITE_STALL); bool has_timeout = (expiration_time > 0); auto delay = write_controller_.GetDelay(); @@ -4161,16 +4217,18 @@ void DBImpl::DelayWrite(uint64_t expiration_time) { mutex_.Lock(); } - while (write_controller_.IsStopped()) { + while (bg_error_.ok() && write_controller_.IsStopped()) { if (has_timeout) { bg_cv_.TimedWait(expiration_time); if (env_->NowMicros() > expiration_time) { - break; + return Status::TimedOut(); } } else { bg_cv_.Wait(); } } + + return bg_error_; } Status DBImpl::ScheduleFlushes(WriteContext* context) { @@ -4219,8 +4277,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. - lfile->SetPreallocationBlockSize(1.1 * - cfd->options()->write_buffer_size); + lfile->SetPreallocationBlockSize( + 1.1 * mutable_cf_options.write_buffer_size); new_log = new log::Writer(std::move(lfile)); } } @@ -4232,6 +4290,9 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, new_superversion = new SuperVersion(); } } + Log(db_options_.info_log, + "[%s] New memtable created with log file: #%" PRIu64 "\n", + cfd->GetName().c_str(), new_log_number); mutex_.Lock(); if (!s.ok()) { // how do we fail if we're not creating new log? @@ -4264,11 +4325,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, cfd->imm()->Add(cfd->mem()); new_mem->Ref(); cfd->SetMemtable(new_mem); - Log(db_options_.info_log, - "[%s] New memtable created with log file: #%" PRIu64 "\n", - cfd->GetName().c_str(), logfile_number_); context->superversions_to_free_.push_back( - cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options)); + InstallSuperVersion(cfd, new_superversion, mutable_cf_options)); return s; } @@ -4614,7 +4672,7 @@ Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, } // Default implementation -- returns not supported status -Status DB::CreateColumnFamily(const ColumnFamilyOptions& options, +Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { return Status::NotSupported(""); @@ -4737,8 +4795,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_, - *cfd->GetLatestMutableCFOptions()); + delete impl->InstallSuperVersion( + cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); diff --git a/db/db_impl.h b/db/db_impl.h index 622df4293..2d5cfe6c2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -367,7 +367,7 @@ class DBImpl : public DB { const autovector& mems, VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer); - void DelayWrite(uint64_t expiration_time); + Status DelayWrite(uint64_t expiration_time); Status ScheduleFlushes(WriteContext* context); @@ -630,6 +630,10 @@ class DBImpl : public DB { DeletionState& deletion_state, const MutableCFOptions& mutable_cf_options); + SuperVersion* InstallSuperVersion( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options); + // Find Super version and reference it. Based on options, it might return // the thread local cached one. inline SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd); diff --git a/db/db_test.cc b/db/db_test.cc index f516a488f..169718387 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8137,7 +8137,7 @@ TEST(DBTest, SimpleWriteTimeoutTest) { options.max_background_flushes = 0; options.max_write_buffer_number = 2; options.max_total_wal_size = std::numeric_limits::max(); - WriteOptions write_opt = WriteOptions(); + WriteOptions write_opt; write_opt.timeout_hint_us = 0; DestroyAndReopen(&options); // fill the two write buffers @@ -8173,7 +8173,7 @@ static void RandomTimeoutWriter(void* arg) { DB* db = state->db; Random rnd(1000 + thread_id); - WriteOptions write_opt = WriteOptions(); + WriteOptions write_opt; write_opt.timeout_hint_us = 500; int timeout_count = 0; int num_keys = kNumKeys * 5; @@ -8558,14 +8558,13 @@ TEST(DBTest, DynamicMemtableOptions) { auto gen_l0_kb = [this](int size) { Random rnd(301); - std::vector values; for (int i = 0; i < size; i++) { - values.push_back(RandomString(&rnd, 1024)); - ASSERT_OK(Put(Key(i), values[i])); + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); } dbfull()->TEST_WaitForFlushMemTable(); }; + // Test write_buffer_size gen_l0_kb(64); ASSERT_EQ(NumTableFilesAtLevel(0), 1); ASSERT_TRUE(SizeAtLevel(0) < k64KB + k5KB); @@ -8587,103 +8586,299 @@ TEST(DBTest, DynamicMemtableOptions) { ASSERT_EQ(NumTableFilesAtLevel(0), 2); ASSERT_TRUE(SizeAtLevel(0) < k128KB + k64KB + 2 * k5KB); ASSERT_TRUE(SizeAtLevel(0) > k128KB + k64KB - 2 * k5KB); + + // Test max_write_buffer_number + // Block compaction thread, which will also block the flushes because + // max_background_flushes == 0, so flushes are getting executed by the + // compaction thread + env_->SetBackgroundThreads(1, Env::LOW); + SleepingBackgroundTask sleeping_task_low1; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, + Env::Priority::LOW); + // Start from scratch and disable compaction/flush. Flush can only happen + // during compaction but trigger is pretty high + options.max_background_flushes = 0; + options.disable_auto_compactions = true; + DestroyAndReopen(&options); + + // Put until timeout, bounded by 256 puts. We should see timeout at ~128KB + int count = 0; + Random rnd(301); + WriteOptions wo; + wo.timeout_hint_us = 1000; + + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) { + count++; + } + ASSERT_TRUE(count > (128 * 0.9) && count < (128 * 1.1)); + + sleeping_task_low1.WakeUp(); + sleeping_task_low1.WaitUntilDone(); + + // Increase + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_write_buffer_number", "8"}, + })); + // Clean up memtable and L0 + dbfull()->CompactRange(nullptr, nullptr); + + SleepingBackgroundTask sleeping_task_low2; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, + Env::Priority::LOW); + count = 0; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { + count++; + } + ASSERT_TRUE(count > (512 * 0.9) && count < (512 * 1.1)); + sleeping_task_low2.WakeUp(); + sleeping_task_low2.WaitUntilDone(); + + // Decrease + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_write_buffer_number", "4"}, + })); + // Clean up memtable and L0 + dbfull()->CompactRange(nullptr, nullptr); + + SleepingBackgroundTask sleeping_task_low3; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3, + Env::Priority::LOW); + count = 0; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { + count++; + } + ASSERT_TRUE(count > (256 * 0.9) && count < (256 * 1.1)); + sleeping_task_low3.WakeUp(); + sleeping_task_low3.WaitUntilDone(); } TEST(DBTest, DynamicCompactionOptions) { + // minimum write buffer size is enforced at 64KB + const uint64_t k32KB = 1 << 15; const uint64_t k64KB = 1 << 16; const uint64_t k128KB = 1 << 17; const uint64_t k256KB = 1 << 18; - const uint64_t k5KB = 5 * 1024; + const uint64_t k4KB = 1 << 12; Options options; options.env = env_; options.create_if_missing = true; options.compression = kNoCompression; - options.max_background_compactions = 4; options.hard_rate_limit = 1.1; - options.write_buffer_size = k128KB; + options.write_buffer_size = k64KB; options.max_write_buffer_number = 2; // Compaction related options options.level0_file_num_compaction_trigger = 3; - options.level0_slowdown_writes_trigger = 10; - options.level0_stop_writes_trigger = 20; + options.level0_slowdown_writes_trigger = 4; + options.level0_stop_writes_trigger = 8; options.max_grandparent_overlap_factor = 10; options.expanded_compaction_factor = 25; options.source_compaction_factor = 1; - options.target_file_size_base = k128KB; + options.target_file_size_base = k64KB; options.target_file_size_multiplier = 1; - options.max_bytes_for_level_base = k256KB; + options.max_bytes_for_level_base = k128KB; options.max_bytes_for_level_multiplier = 4; + + // Block flush thread and disable compaction thread + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); DestroyAndReopen(&options); auto gen_l0_kb = [this](int start, int size, int stride) { Random rnd(301); - std::vector values; for (int i = 0; i < size; i++) { - values.push_back(RandomString(&rnd, 1024)); - ASSERT_OK(Put(Key(start + stride * i), values[i])); + ASSERT_OK(Put(Key(start + stride * i), RandomString(&rnd, 1024))); } dbfull()->TEST_WaitForFlushMemTable(); }; // Write 3 files that have the same key range, trigger compaction and // result in one L1 file - gen_l0_kb(0, 128, 1); + gen_l0_kb(0, 64, 1); ASSERT_EQ(NumTableFilesAtLevel(0), 1); - gen_l0_kb(0, 128, 1); + gen_l0_kb(0, 64, 1); ASSERT_EQ(NumTableFilesAtLevel(0), 2); - gen_l0_kb(0, 128, 1); + gen_l0_kb(0, 64, 1); dbfull()->TEST_WaitForCompact(); ASSERT_EQ("0,1", FilesPerLevel()); std::vector metadata; db_->GetLiveFilesMetaData(&metadata); ASSERT_EQ(1U, metadata.size()); - ASSERT_LE(metadata[0].size, k128KB + k5KB); // < 128KB + 5KB - ASSERT_GE(metadata[0].size, k128KB - k5KB); // > 128B - 5KB + ASSERT_LE(metadata[0].size, k64KB + k4KB); + ASSERT_GE(metadata[0].size, k64KB - k4KB); - // Make compaction trigger and file size smaller + // Test compaction trigger and target_file_size_base ASSERT_TRUE(dbfull()->SetOptions({ {"level0_file_num_compaction_trigger", "2"}, - {"target_file_size_base", "65536"} + {"target_file_size_base", std::to_string(k32KB) } })); - gen_l0_kb(0, 128, 1); + gen_l0_kb(0, 64, 1); ASSERT_EQ("1,1", FilesPerLevel()); - gen_l0_kb(0, 128, 1); + gen_l0_kb(0, 64, 1); dbfull()->TEST_WaitForCompact(); ASSERT_EQ("0,2", FilesPerLevel()); metadata.clear(); db_->GetLiveFilesMetaData(&metadata); ASSERT_EQ(2U, metadata.size()); - ASSERT_LE(metadata[0].size, k64KB + k5KB); // < 64KB + 5KB - ASSERT_GE(metadata[0].size, k64KB - k5KB); // > 64KB - 5KB + ASSERT_LE(metadata[0].size, k32KB + k4KB); + ASSERT_GE(metadata[0].size, k32KB - k4KB); - // Change base level size to 1MB - ASSERT_TRUE(dbfull()->SetOptions({ {"max_bytes_for_level_base", "1048576"} })); - - // writing 56 x 128KB => 7MB - // (L1 + L2) = (1 + 4) * 1MB = 5MB - for (int i = 0; i < 56; ++i) { - gen_l0_kb(i, 128, 56); - } - dbfull()->TEST_WaitForCompact(); - ASSERT_TRUE(SizeAtLevel(1) < 1048576 * 1.1); - ASSERT_TRUE(SizeAtLevel(2) < 4 * 1048576 * 1.1); - - // Change multiplier to 2 with smaller base + // Test max_bytes_for_level_base ASSERT_TRUE(dbfull()->SetOptions({ - {"max_bytes_for_level_multiplier", "2"}, - {"max_bytes_for_level_base", "262144"} + {"max_bytes_for_level_base", std::to_string(k256KB) } })); - // writing 16 x 128KB - // (L1 + L2 + L3) = (1 + 2 + 4) * 256KB - for (int i = 0; i < 16; ++i) { - gen_l0_kb(i, 128, 50); + // writing 24 x 64KB => 6 * 256KB + // (L1 + L2) = (1 + 4) * 256KB + for (int i = 0; i < 24; ++i) { + gen_l0_kb(i, 64, 32); } dbfull()->TEST_WaitForCompact(); - ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1); - ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1); - ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1); + ASSERT_TRUE(SizeAtLevel(1) > k256KB * 0.8 && + SizeAtLevel(1) < k256KB * 1.2); + ASSERT_TRUE(SizeAtLevel(2) > 4 * k256KB * 0.8 && + SizeAtLevel(2) < 4 * k256KB * 1.2); + + // Test max_bytes_for_level_multiplier and + // max_bytes_for_level_base (reduce) + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_bytes_for_level_multiplier", "2"}, + {"max_bytes_for_level_base", std::to_string(k128KB) } + })); + + // writing 20 x 64KB = 10 x 128KB + // (L1 + L2 + L3) = (1 + 2 + 4) * 128KB + for (int i = 0; i < 20; ++i) { + gen_l0_kb(i, 64, 32); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(SizeAtLevel(1) > k128KB * 0.8 && + SizeAtLevel(1) < k128KB * 1.2); + ASSERT_TRUE(SizeAtLevel(2) > 2 * k128KB * 0.8 && + SizeAtLevel(2) < 2 * k128KB * 1.2); + ASSERT_TRUE(SizeAtLevel(3) > 4 * k128KB * 0.8 && + SizeAtLevel(3) < 4 * k128KB * 1.2); + + // Clean up memtable and L0 + dbfull()->CompactRange(nullptr, nullptr); + // Block compaction + SleepingBackgroundTask sleeping_task_low1; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low1, + Env::Priority::LOW); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + int count = 0; + Random rnd(301); + WriteOptions wo; + wo.timeout_hint_us = 10000; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { + dbfull()->TEST_FlushMemTable(true); + count++; + } + // Stop trigger = 8 + ASSERT_EQ(count, 8); + // Unblock + sleeping_task_low1.WakeUp(); + sleeping_task_low1.WaitUntilDone(); + + // Test: stop trigger (reduce) + ASSERT_TRUE(dbfull()->SetOptions({ + {"level0_stop_writes_trigger", "6"} + })); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + + // Block compaction + SleepingBackgroundTask sleeping_task_low2; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, + Env::Priority::LOW); + count = 0; + while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 64) { + dbfull()->TEST_FlushMemTable(true); + count++; + } + ASSERT_EQ(count, 6); + // Unblock + sleeping_task_low2.WakeUp(); + sleeping_task_low2.WaitUntilDone(); + + // Test disable_auto_compactions + ASSERT_TRUE(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"} + })); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); + // Wait for compaction so that put won't timeout + dbfull()->TEST_FlushMemTable(true); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(NumTableFilesAtLevel(0), 4); + + ASSERT_TRUE(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"} + })); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + + for (int i = 0; i < 4; ++i) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024))); + // Wait for compaction so that put won't timeout + dbfull()->TEST_FlushMemTable(true); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_LT(NumTableFilesAtLevel(0), 4); + + // Test for hard_rate_limit, change max_bytes_for_level_base to make level + // size big + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_bytes_for_level_base", std::to_string(k256KB) } + })); + // writing 40 x 64KB = 10 x 256KB + // (L1 + L2 + L3) = (1 + 2 + 4) * 256KB + for (int i = 0; i < 40; ++i) { + gen_l0_kb(i, 64, 32); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(SizeAtLevel(1) > k256KB * 0.8 && + SizeAtLevel(1) < k256KB * 1.2); + ASSERT_TRUE(SizeAtLevel(2) > 2 * k256KB * 0.8 && + SizeAtLevel(2) < 2 * k256KB * 1.2); + ASSERT_TRUE(SizeAtLevel(3) > 4 * k256KB * 0.8 && + SizeAtLevel(3) < 4 * k256KB * 1.2); + // Reduce max_bytes_for_level_base and disable compaction at the same time + // This should cause score to increase + ASSERT_TRUE(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"}, + {"max_bytes_for_level_base", "65536"}, + })); + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024))); + dbfull()->TEST_FlushMemTable(true); + + // Check score is above 2 + ASSERT_TRUE(SizeAtLevel(1) / k64KB > 2 || + SizeAtLevel(2) / k64KB > 4 || + SizeAtLevel(3) / k64KB > 8); + + // Enfoce hard rate limit, L0 score is not regulated by this limit + ASSERT_TRUE(dbfull()->SetOptions({ + {"hard_rate_limit", "2"} + })); + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024))); + dbfull()->TEST_FlushMemTable(true); + + // Hard rate limit slow down for 1000 us, so default 10ms should be ok + ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok()); + wo.timeout_hint_us = 500; + ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).IsTimedOut()); + + // Bump up limit + ASSERT_TRUE(dbfull()->SetOptions({ + {"hard_rate_limit", "100"} + })); + dbfull()->TEST_FlushMemTable(true); + ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok()); } } // namespace rocksdb diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 3f60d72ce..aa3b3c850 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -133,6 +133,8 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property, return kBackgroundErrors; } else if (in == "cur-size-active-mem-table") { return kCurSizeActiveMemTable; + } else if (in == "cur-size-all-mem-tables") { + return kCurSizeAllMemTables; } else if (in == "num-entries-active-mem-table") { return kNumEntriesInMutableMemtable; } else if (in == "num-entries-imm-mem-tables") { @@ -250,12 +252,17 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, // Current size of the active memtable *value = cfd_->mem()->ApproximateMemoryUsage(); return true; + case kCurSizeAllMemTables: + // Current size of the active memtable + immutable memtables + *value = cfd_->mem()->ApproximateMemoryUsage() + + cfd_->imm()->ApproximateMemoryUsage(); + return true; case kNumEntriesInMutableMemtable: - // Current size of the active memtable + // Current number of entires in the active memtable *value = cfd_->mem()->GetNumEntries(); return true; case kNumEntriesInImmutableMemtable: - // Current size of the active memtable + // Current number of entries in the immutable memtables *value = cfd_->imm()->current()->GetTotalNumEntries(); return true; case kEstimatedNumKeys: diff --git a/db/internal_stats.h b/db/internal_stats.h index 18d67de5c..4d12a2512 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -36,6 +36,8 @@ enum DBPropertyType : uint32_t { kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. kBackgroundErrors, // Return accumulated background errors encountered. kCurSizeActiveMemTable, // Return current size of the active memtable + kCurSizeAllMemTables, // Return current size of all (active + immutable) + // memtables kNumEntriesInMutableMemtable, // Return number of entries in the mutable // memtable. kNumEntriesInImmutableMemtable, // Return sum of number of entries in all diff --git a/db/version_edit.h b/db/version_edit.h index db133402c..ef883297a 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include #include @@ -74,7 +75,7 @@ struct FileMetaData { // Stats for compensating deletion entries during compaction // File size compensated by deletion entry. - // This is updated in Version::UpdateTemporaryStats() first time when the + // This is updated in Version::UpdateAccumulatedStats() first time when the // file is created or loaded. After it is updated, it is immutable. uint64_t compensated_file_size; uint64_t num_entries; // the number of entries. diff --git a/db/version_set.cc b/db/version_set.cc index 78241d1f0..0819196fb 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -597,7 +597,19 @@ uint64_t Version::GetEstimatedActiveKeys() { // (1) there is merge keys // (2) keys are directly overwritten // (3) deletion on non-existing keys - return num_non_deletions_ - num_deletions_; + // (4) low number of samples + if (num_samples_ == 0) { + return 0; + } + + if (num_samples_ < files_->size()) { + // casting to avoid overflowing + return static_cast(static_cast( + accumulated_num_non_deletions_ - accumulated_num_deletions_) * + files_->size() / num_samples_); + } else { + return accumulated_num_non_deletions_ - accumulated_num_deletions_; + } } void Version::AddIterators(const ReadOptions& read_options, @@ -658,17 +670,21 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, compaction_score_(num_levels_), compaction_level_(num_levels_), version_number_(version_number), - total_file_size_(0), - total_raw_key_size_(0), - total_raw_value_size_(0), - num_non_deletions_(0), - num_deletions_(0) { + accumulated_file_size_(0), + accumulated_raw_key_size_(0), + accumulated_raw_value_size_(0), + accumulated_num_non_deletions_(0), + accumulated_num_deletions_(0), + num_samples_(0) { if (cfd != nullptr && cfd->current() != nullptr) { - total_file_size_ = cfd->current()->total_file_size_; - total_raw_key_size_ = cfd->current()->total_raw_key_size_; - total_raw_value_size_ = cfd->current()->total_raw_value_size_; - num_non_deletions_ = cfd->current()->num_non_deletions_; - num_deletions_ = cfd->current()->num_deletions_; + accumulated_file_size_ = cfd->current()->accumulated_file_size_; + accumulated_raw_key_size_ = cfd->current()->accumulated_raw_key_size_; + accumulated_raw_value_size_ = + cfd->current()->accumulated_raw_value_size_; + accumulated_num_non_deletions_ = + cfd->current()->accumulated_num_non_deletions_; + accumulated_num_deletions_ = cfd->current()->accumulated_num_deletions_; + num_samples_ = cfd->current()->num_samples_; } } @@ -748,7 +764,7 @@ void Version::GenerateFileLevels() { void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, std::vector& size_being_compacted) { - UpdateTemporaryStats(); + UpdateAccumulatedStats(); ComputeCompactionScore(mutable_cf_options, size_being_compacted); UpdateFilesBySize(); UpdateNumNonEmptyLevels(); @@ -757,7 +773,8 @@ void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, } bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { - if (file_meta->init_stats_from_file) { + if (file_meta->init_stats_from_file || + file_meta->compensated_file_size > 0) { return false; } std::shared_ptr tp; @@ -778,26 +795,55 @@ bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { return true; } -void Version::UpdateTemporaryStats() { +void Version::UpdateAccumulatedStats(FileMetaData* file_meta) { + assert(file_meta->init_stats_from_file); + accumulated_file_size_ += file_meta->fd.GetFileSize(); + accumulated_raw_key_size_ += file_meta->raw_key_size; + accumulated_raw_value_size_ += file_meta->raw_value_size; + accumulated_num_non_deletions_ += + file_meta->num_entries - file_meta->num_deletions; + accumulated_num_deletions_ += file_meta->num_deletions; + num_samples_++; +} + +void Version::UpdateAccumulatedStats() { static const int kDeletionWeightOnCompaction = 2; - // incrementally update the average value size by - // including newly added files into the global stats + // maximum number of table properties loaded from files. + const int kMaxInitCount = 20; int init_count = 0; - int total_count = 0; - for (int level = 0; level < num_levels_; level++) { + // here only the first kMaxInitCount files which haven't been + // initialized from file will be updated with num_deletions. + // The motivation here is to cap the maximum I/O per Version creation. + // The reason for choosing files from lower-level instead of higher-level + // is that such design is able to propagate the initialization from + // lower-level to higher-level: When the num_deletions of lower-level + // files are updated, it will make the lower-level files have accurate + // compensated_file_size, making lower-level to higher-level compaction + // will be triggered, which creates higher-level files whose num_deletions + // will be updated here. + for (int level = 0; + level < num_levels_ && init_count < kMaxInitCount; ++level) { for (auto* file_meta : files_[level]) { if (MaybeInitializeFileMetaData(file_meta)) { // each FileMeta will be initialized only once. - total_file_size_ += file_meta->fd.GetFileSize(); - total_raw_key_size_ += file_meta->raw_key_size; - total_raw_value_size_ += file_meta->raw_value_size; - num_non_deletions_ += - file_meta->num_entries - file_meta->num_deletions; - num_deletions_ += file_meta->num_deletions; - init_count++; + UpdateAccumulatedStats(file_meta); + if (++init_count >= kMaxInitCount) { + break; + } + } + } + } + // In case all sampled-files contain only deletion entries, then we + // load the table-property of a file in higher-level to initialize + // that value. + for (int level = num_levels_ - 1; + accumulated_raw_value_size_ == 0 && level >= 0; --level) { + for (int i = static_cast(files_[level].size()) - 1; + accumulated_raw_value_size_ == 0 && i >= 0; --i) { + if (MaybeInitializeFileMetaData(files_[level][i])) { + UpdateAccumulatedStats(files_[level][i]); } - total_count++; } } diff --git a/db/version_set.h b/db/version_set.h index 05e6e9a65..93e9e0c9d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -212,13 +212,15 @@ class Version { uint64_t GetVersionNumber() const { return version_number_; } uint64_t GetAverageValueSize() const { - if (num_non_deletions_ == 0) { + if (accumulated_num_non_deletions_ == 0) { return 0; } - assert(total_raw_key_size_ + total_raw_value_size_ > 0); - assert(total_file_size_ > 0); - return total_raw_value_size_ / num_non_deletions_ * total_file_size_ / - (total_raw_key_size_ + total_raw_value_size_); + assert(accumulated_raw_key_size_ + accumulated_raw_value_size_ > 0); + assert(accumulated_file_size_ > 0); + return accumulated_raw_value_size_ / + accumulated_num_non_deletions_ * + accumulated_file_size_ / + (accumulated_raw_key_size_ + accumulated_raw_value_size_); } // REQUIRES: lock is held @@ -268,14 +270,17 @@ class Version { // Update num_non_empty_levels_. void UpdateNumNonEmptyLevels(); - // The helper function of UpdateTemporaryStats, which may fill the missing + // The helper function of UpdateAccumulatedStats, which may fill the missing // fields of file_mata from its associated TableProperties. // Returns true if it does initialize FileMetaData. bool MaybeInitializeFileMetaData(FileMetaData* file_meta); - // Update the temporary stats associated with the current version. - // This temporary stats will be used in compaction. - void UpdateTemporaryStats(); + // Update the accumulated stats from a file-meta. + void UpdateAccumulatedStats(FileMetaData* file_meta); + + // Update the accumulated stats associated with the current version. + // This accumulated stats will be used in compaction. + void UpdateAccumulatedStats(); // Sort all files for this version based on their file size and // record results in files_by_size_. The largest files are listed first. @@ -337,16 +342,19 @@ class Version { Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0); - // total file size - uint64_t total_file_size_; - // the total size of all raw keys. - uint64_t total_raw_key_size_; - // the total size of all raw values. - uint64_t total_raw_value_size_; + // the following are the sampled temporary stats. + // the current accumulated size of sampled files. + uint64_t accumulated_file_size_; + // the current accumulated size of all raw keys based on the sampled files. + uint64_t accumulated_raw_key_size_; + // the current accumulated size of all raw keys based on the sampled files. + uint64_t accumulated_raw_value_size_; // total number of non-deletion entries - uint64_t num_non_deletions_; + uint64_t accumulated_num_non_deletions_; // total number of deletion entries - uint64_t num_deletions_; + uint64_t accumulated_num_deletions_; + // the number of samples + uint64_t num_samples_; ~Version(); diff --git a/java/Makefile b/java/Makefile index 441238930..f06cea579 100644 --- a/java/Makefile +++ b/java/Makefile @@ -42,7 +42,7 @@ test: java java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ColumnFamilyTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.FilterTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.KeyMayExistTest - #java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest + java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOnlyTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.MergeTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest diff --git a/java/org/rocksdb/Options.java b/java/org/rocksdb/Options.java index 586585a35..e5eb96374 100644 --- a/java/org/rocksdb/Options.java +++ b/java/org/rocksdb/Options.java @@ -112,7 +112,7 @@ public class Options extends RocksObject { */ public boolean createMissingColumnFamilies() { assert(isInitialized()); - return createIfMissing(nativeHandle_); + return createMissingColumnFamilies(nativeHandle_); } /** @@ -1154,6 +1154,7 @@ public class Options extends RocksObject { */ public Options setMemTableConfig(MemTableConfig config) throws RocksDBException { + memTableConfig_ = config; setMemTableFactory(nativeHandle_, config.newMemTableFactoryHandle()); return this; } @@ -1168,6 +1169,7 @@ public class Options extends RocksObject { * @throws RocksDBException */ public Options setRateLimiterConfig(RateLimiterConfig config) { + rateLimiterConfig_ = config; setRateLimiter(nativeHandle_, config.newRateLimiterHandle()); return this; } @@ -1191,6 +1193,7 @@ public class Options extends RocksObject { * @return the reference of the current Options. */ public Options setTableFormatConfig(TableFormatConfig config) { + tableFormatConfig_ = config; setTableFactory(nativeHandle_, config.newTableFactoryHandle()); return this; } @@ -2316,4 +2319,7 @@ public class Options extends RocksObject { long cacheSize_; int numShardBits_; RocksEnv env_; + MemTableConfig memTableConfig_; + TableFormatConfig tableFormatConfig_; + RateLimiterConfig rateLimiterConfig_; } diff --git a/java/org/rocksdb/test/FilterTest.java b/java/org/rocksdb/test/FilterTest.java index 00214d033..fc4fabf56 100644 --- a/java/org/rocksdb/test/FilterTest.java +++ b/java/org/rocksdb/test/FilterTest.java @@ -13,19 +13,30 @@ public class FilterTest { } public static void main(String[] args) { Options options = new Options(); - // test table config without filter + // test table config BlockBasedTableConfig blockConfig = new BlockBasedTableConfig(); - options.setTableFormatConfig(blockConfig); + options.setTableFormatConfig(new BlockBasedTableConfig(). + setFilter(new BloomFilter())); options.dispose(); + System.gc(); + System.runFinalization(); // new Bloom filter options = new Options(); blockConfig = new BlockBasedTableConfig(); blockConfig.setFilter(new BloomFilter()); options.setTableFormatConfig(blockConfig); - blockConfig.setFilter(new BloomFilter(10)); + BloomFilter bloomFilter = new BloomFilter(10); + blockConfig.setFilter(bloomFilter); options.setTableFormatConfig(blockConfig); + System.gc(); + System.runFinalization(); blockConfig.setFilter(new BloomFilter(10, false)); options.setTableFormatConfig(blockConfig); + options.dispose(); + options = null; + blockConfig = null; + System.gc(); + System.runFinalization(); System.out.println("Filter test passed"); } } diff --git a/java/rocksjni/filter.cc b/java/rocksjni/filter.cc index 1b5d368b6..2ce17d499 100644 --- a/java/rocksjni/filter.cc +++ b/java/rocksjni/filter.cc @@ -24,9 +24,12 @@ void Java_org_rocksdb_BloomFilter_createNewBloomFilter( JNIEnv* env, jobject jobj, jint bits_per_key, jboolean use_block_base_builder) { - const rocksdb::FilterPolicy* fp = rocksdb::NewBloomFilterPolicy(bits_per_key, - use_block_base_builder); - rocksdb::FilterJni::setHandle(env, jobj, fp); + rocksdb::FilterPolicy* fp = const_cast( + rocksdb::NewBloomFilterPolicy(bits_per_key, use_block_base_builder)); + std::shared_ptr *pFilterPolicy = + new std::shared_ptr; + *pFilterPolicy = std::shared_ptr(fp); + rocksdb::FilterJni::setHandle(env, jobj, pFilterPolicy); } /* @@ -35,6 +38,9 @@ void Java_org_rocksdb_BloomFilter_createNewBloomFilter( * Signature: (J)V */ void Java_org_rocksdb_Filter_disposeInternal( - JNIEnv* env, jobject jobj, jlong handle) { - delete reinterpret_cast(handle); + JNIEnv* env, jobject jobj, jlong jhandle) { + + std::shared_ptr *handle = + reinterpret_cast *>(jhandle); + handle->reset(); } diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h index 14b2cb98a..8300a6e66 100644 --- a/java/rocksjni/portal.h +++ b/java/rocksjni/portal.h @@ -313,14 +313,16 @@ class FilterJni { } // Get the pointer to rocksdb::FilterPolicy. - static rocksdb::FilterPolicy* getHandle(JNIEnv* env, jobject jobj) { - return reinterpret_cast( + static std::shared_ptr* getHandle( + JNIEnv* env, jobject jobj) { + return reinterpret_cast + *>( env->GetLongField(jobj, getHandleFieldID(env))); } // Pass the rocksdb::FilterPolicy pointer to the java side. static void setHandle( - JNIEnv* env, jobject jobj, const rocksdb::FilterPolicy* op) { + JNIEnv* env, jobject jobj, std::shared_ptr* op) { env->SetLongField( jobj, getHandleFieldID(env), reinterpret_cast(op)); diff --git a/java/rocksjni/table.cc b/java/rocksjni/table.cc index 846526292..1582900f3 100644 --- a/java/rocksjni/table.cc +++ b/java/rocksjni/table.cc @@ -56,8 +56,10 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( options.block_restart_interval = block_restart_interval; options.whole_key_filtering = whole_key_filtering; if (jfilterPolicy > 0) { - options.filter_policy.reset( - reinterpret_cast(jfilterPolicy)); + std::shared_ptr *pFilterPolicy = + reinterpret_cast *>( + jfilterPolicy); + options.filter_policy = *pFilterPolicy; } options.cache_index_and_filter_blocks = cache_index_and_filter_blocks; options.hash_index_allow_collision = hash_index_allow_collision; diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 8eda39bf9..70f0c6a94 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -40,6 +40,7 @@ const string LDBCommand::ARG_FROM = "from"; const string LDBCommand::ARG_TO = "to"; const string LDBCommand::ARG_MAX_KEYS = "max_keys"; const string LDBCommand::ARG_BLOOM_BITS = "bloom_bits"; +const string LDBCommand::ARG_FIX_PREFIX_LEN = "fix_prefix_len"; const string LDBCommand::ARG_COMPRESSION_TYPE = "compression_type"; const string LDBCommand::ARG_BLOCK_SIZE = "block_size"; const string LDBCommand::ARG_AUTO_COMPACTION = "auto_compaction"; @@ -221,9 +222,11 @@ Options LDBCommand::PrepareOptionsForOpenDB() { map::const_iterator itr; BlockBasedTableOptions table_options; + bool use_table_options = false; int bits; if (ParseIntOption(option_map_, ARG_BLOOM_BITS, bits, exec_state_)) { if (bits > 0) { + use_table_options = true; table_options.filter_policy.reset(NewBloomFilterPolicy(bits)); } else { exec_state_ = LDBCommandExecuteResult::FAILED(ARG_BLOOM_BITS + @@ -234,14 +237,18 @@ Options LDBCommand::PrepareOptionsForOpenDB() { int block_size; if (ParseIntOption(option_map_, ARG_BLOCK_SIZE, block_size, exec_state_)) { if (block_size > 0) { + use_table_options = true; table_options.block_size = block_size; - opt.table_factory.reset(NewBlockBasedTableFactory(table_options)); } else { exec_state_ = LDBCommandExecuteResult::FAILED(ARG_BLOCK_SIZE + " must be > 0."); } } + if (use_table_options) { + opt.table_factory.reset(NewBlockBasedTableFactory(table_options)); + } + itr = option_map_.find(ARG_AUTO_COMPACTION); if (itr != option_map_.end()) { opt.disable_auto_compactions = ! StringToBool(itr->second); @@ -294,6 +301,18 @@ Options LDBCommand::PrepareOptionsForOpenDB() { opt.db_paths.emplace_back(db_path_, std::numeric_limits::max()); } + int fix_prefix_len; + if (ParseIntOption(option_map_, ARG_FIX_PREFIX_LEN, fix_prefix_len, + exec_state_)) { + if (fix_prefix_len > 0) { + opt.prefix_extractor.reset( + NewFixedPrefixTransform(static_cast(fix_prefix_len))); + } else { + exec_state_ = + LDBCommandExecuteResult::FAILED(ARG_FIX_PREFIX_LEN + " must be > 0."); + } + } + return opt; } diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h index 0553fe64a..9ffe0eabc 100644 --- a/util/ldb_cmd.h +++ b/util/ldb_cmd.h @@ -46,6 +46,7 @@ public: static const string ARG_TO; static const string ARG_MAX_KEYS; static const string ARG_BLOOM_BITS; + static const string ARG_FIX_PREFIX_LEN; static const string ARG_COMPRESSION_TYPE; static const string ARG_BLOCK_SIZE; static const string ARG_AUTO_COMPACTION; @@ -284,9 +285,10 @@ protected: * passed in. */ vector BuildCmdLineOptions(vector options) { - vector ret = {ARG_DB, ARG_BLOOM_BITS, ARG_BLOCK_SIZE, - ARG_AUTO_COMPACTION, ARG_COMPRESSION_TYPE, - ARG_WRITE_BUFFER_SIZE, ARG_FILE_SIZE}; + vector ret = {ARG_DB, ARG_BLOOM_BITS, + ARG_BLOCK_SIZE, ARG_AUTO_COMPACTION, + ARG_COMPRESSION_TYPE, ARG_WRITE_BUFFER_SIZE, + ARG_FILE_SIZE, ARG_FIX_PREFIX_LEN}; ret.insert(ret.end(), options.begin(), options.end()); return ret; } diff --git a/util/ldb_tool.cc b/util/ldb_tool.cc index 271dba350..bb6c8ffca 100644 --- a/util/ldb_tool.cc +++ b/util/ldb_tool.cc @@ -47,6 +47,7 @@ public: " with 'put','get','scan','dump','query','batchput'" " : DB supports ttl and value is internally timestamp-suffixed\n"); ret.append(" --" + LDBCommand::ARG_BLOOM_BITS + "=\n"); + ret.append(" --" + LDBCommand::ARG_FIX_PREFIX_LEN + "=\n"); ret.append(" --" + LDBCommand::ARG_COMPRESSION_TYPE + "=\n"); ret.append(" --" + LDBCommand::ARG_BLOCK_SIZE + diff --git a/util/mutable_cf_options.cc b/util/mutable_cf_options.cc index 1c710c656..1b3197b18 100644 --- a/util/mutable_cf_options.cc +++ b/util/mutable_cf_options.cc @@ -3,8 +3,15 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include #include #include +#include +#include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/immutable_options.h" #include "util/mutable_cf_options.h" @@ -69,4 +76,56 @@ uint64_t MutableCFOptions::ExpandedCompactionByteSizeLimit(int level) const { return MaxFileSizeForLevel(level) * expanded_compaction_factor; } +void MutableCFOptions::Dump(Logger* log) const { + // Memtable related options + Log(log, " write_buffer_size: %zu", write_buffer_size); + Log(log, " max_write_buffer_number: %d", + max_write_buffer_number); + Log(log, " arena_block_size: %zu", arena_block_size); + Log(log, " memtable_prefix_bloom_bits: %" PRIu32, + memtable_prefix_bloom_bits); + Log(log, " memtable_prefix_bloom_probes: %" PRIu32, + memtable_prefix_bloom_probes); + Log(log, " memtable_prefix_bloom_huge_page_tlb_size: %zu", + memtable_prefix_bloom_huge_page_tlb_size); + Log(log, " max_successive_merges: %zu", + max_successive_merges); + Log(log, " filter_deletes: %d", + filter_deletes); + Log(log, " disable_auto_compactions: %d", + disable_auto_compactions); + Log(log, " soft_rate_limit: %lf", + soft_rate_limit); + Log(log, " hard_rate_limit: %lf", + hard_rate_limit); + Log(log, " level0_file_num_compaction_trigger: %d", + level0_file_num_compaction_trigger); + Log(log, " level0_slowdown_writes_trigger: %d", + level0_slowdown_writes_trigger); + Log(log, " level0_stop_writes_trigger: %d", + level0_stop_writes_trigger); + Log(log, " max_grandparent_overlap_factor: %d", + max_grandparent_overlap_factor); + Log(log, " expanded_compaction_factor: %d", + expanded_compaction_factor); + Log(log, " source_compaction_factor: %d", + source_compaction_factor); + Log(log, " target_file_size_base: %d", + target_file_size_base); + Log(log, " target_file_size_multiplier: %d", + target_file_size_multiplier); + Log(log, " max_bytes_for_level_base: %" PRIu64, + max_bytes_for_level_base); + Log(log, " max_bytes_for_level_multiplier: %d", + max_bytes_for_level_multiplier); + std::string result; + char buf[10]; + for (const auto m : max_bytes_for_level_multiplier_additional) { + snprintf(buf, sizeof(buf), "%d, ", m); + result += buf; + } + result.resize(result.size() - 2); + Log(log, "max_bytes_for_level_multiplier_additional: %s", result.c_str()); +} + } // namespace rocksdb diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 02f63fed4..eaecaa487 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -14,6 +14,7 @@ namespace rocksdb { struct MutableCFOptions { MutableCFOptions(const Options& options, const ImmutableCFOptions& ioptions) : write_buffer_size(options.write_buffer_size), + max_write_buffer_number(options.max_write_buffer_number), arena_block_size(options.arena_block_size), memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits), memtable_prefix_bloom_probes(options.memtable_prefix_bloom_probes), @@ -21,6 +22,9 @@ struct MutableCFOptions { options.memtable_prefix_bloom_huge_page_tlb_size), max_successive_merges(options.max_successive_merges), filter_deletes(options.filter_deletes), + disable_auto_compactions(options.disable_auto_compactions), + soft_rate_limit(options.soft_rate_limit), + hard_rate_limit(options.hard_rate_limit), level0_file_num_compaction_trigger( options.level0_file_num_compaction_trigger), level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), @@ -39,12 +43,16 @@ struct MutableCFOptions { } MutableCFOptions() : write_buffer_size(0), + max_write_buffer_number(0), arena_block_size(0), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(0), memtable_prefix_bloom_huge_page_tlb_size(0), max_successive_merges(0), filter_deletes(false), + disable_auto_compactions(false), + soft_rate_limit(0), + hard_rate_limit(0), level0_file_num_compaction_trigger(0), level0_slowdown_writes_trigger(0), level0_stop_writes_trigger(0), @@ -70,8 +78,11 @@ struct MutableCFOptions { uint64_t MaxGrandParentOverlapBytes(int level) const; uint64_t ExpandedCompactionByteSizeLimit(int level) const; + void Dump(Logger* log) const; + // Memtable related options size_t write_buffer_size; + int max_write_buffer_number; size_t arena_block_size; uint32_t memtable_prefix_bloom_bits; uint32_t memtable_prefix_bloom_probes; @@ -80,6 +91,9 @@ struct MutableCFOptions { bool filter_deletes; // Compaction related options + bool disable_auto_compactions; + double soft_rate_limit; + double hard_rate_limit; int level0_file_num_compaction_trigger; int level0_slowdown_writes_trigger; int level0_stop_writes_trigger; diff --git a/util/options_helper.cc b/util/options_helper.cc index 67726dc8f..2a56a1ccf 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -92,6 +92,8 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value, new_options->max_successive_merges = ParseInt64(value); } else if (name == "filter_deletes") { new_options->filter_deletes = ParseBoolean(name, value); + } else if (name == "max_write_buffer_number") { + new_options->max_write_buffer_number = ParseInt(value); } else { return false; } @@ -101,7 +103,13 @@ bool ParseMemtableOptions(const std::string& name, const std::string& value, template bool ParseCompactionOptions(const std::string& name, const std::string& value, OptionsType* new_options) { - if (name == "level0_file_num_compaction_trigger") { + if (name == "disable_auto_compactions") { + new_options->disable_auto_compactions = ParseBoolean(name, value); + } else if (name == "soft_rate_limit") { + new_options->soft_rate_limit = ParseDouble(value); + } else if (name == "hard_rate_limit") { + new_options->hard_rate_limit = ParseDouble(value); + } else if (name == "level0_file_num_compaction_trigger") { new_options->level0_file_num_compaction_trigger = ParseInt(value); } else if (name == "level0_slowdown_writes_trigger") { new_options->level0_slowdown_writes_trigger = ParseInt(value); @@ -220,8 +228,6 @@ bool GetColumnFamilyOptionsFromMap( try { if (ParseMemtableOptions(o.first, o.second, new_options)) { } else if (ParseCompactionOptions(o.first, o.second, new_options)) { - } else if (o.first == "max_write_buffer_number") { - new_options->max_write_buffer_number = ParseInt(o.second); } else if (o.first == "min_write_buffer_number_to_merge") { new_options->min_write_buffer_number_to_merge = ParseInt(o.second); } else if (o.first == "compression") { @@ -266,12 +272,6 @@ bool GetColumnFamilyOptionsFromMap( new_options->num_levels = ParseInt(o.second); } else if (o.first == "max_mem_compaction_level") { new_options->max_mem_compaction_level = ParseInt(o.second); - } else if (o.first == "soft_rate_limit") { - new_options->soft_rate_limit = ParseDouble(o.second); - } else if (o.first == "hard_rate_limit") { - new_options->hard_rate_limit = ParseDouble(o.second); - } else if (o.first == "disable_auto_compactions") { - new_options->disable_auto_compactions = ParseBoolean(o.first, o.second); } else if (o.first == "purge_redundant_kvs_while_flush") { new_options->purge_redundant_kvs_while_flush = ParseBoolean(o.first, o.second); diff --git a/utilities/write_batch_with_index/write_batch_with_index.cc b/utilities/write_batch_with_index/write_batch_with_index.cc index adfa5b324..4ba063a06 100644 --- a/utilities/write_batch_with_index/write_batch_with_index.cc +++ b/utilities/write_batch_with_index/write_batch_with_index.cc @@ -304,6 +304,10 @@ struct WriteBatchIndexEntry { WriteBatchIndexEntry(const Slice* sk, uint32_t c) : offset(0), column_family(c), search_key(sk) {} + // If this flag appears in the offset, it indicates a key that is smaller + // than any other entry for the same column family + static const size_t kFlagMin = std::numeric_limits::max(); + size_t offset; // offset of an entry in write batch's string buffer. uint32_t column_family; // column family of the entry const Slice* search_key; // if not null, instead of reading keys from @@ -354,14 +358,16 @@ class WBWIIteratorImpl : public WBWIIterator { virtual void SeekToFirst() { valid_ = true; - WriteBatchIndexEntry search_entry(nullptr, column_family_id_); + WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, + column_family_id_); skip_list_iter_.Seek(&search_entry); ReadEntry(); } virtual void SeekToLast() { valid_ = true; - WriteBatchIndexEntry search_entry(nullptr, column_family_id_ + 1); + WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, + column_family_id_ + 1); skip_list_iter_.Seek(&search_entry); if (!skip_list_iter_.Valid()) { skip_list_iter_.SeekToLast(); @@ -636,6 +642,12 @@ int WriteBatchEntryComparator::operator()( return -1; } + if (entry1->offset == WriteBatchIndexEntry::kFlagMin) { + return -1; + } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) { + return 1; + } + Status s; Slice key1, key2; if (entry1->search_key == nullptr) { diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 32b45e339..8667079d3 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -522,7 +522,18 @@ TEST(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { Random rnd(rand_seed); ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf3(8, BytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + + if (rand_seed % 2 == 0) { + batch.Put(&cf2, "zoo", "bar"); + } + if (rand_seed % 4 == 1) { + batch.Put(&cf3, "zoo", "bar"); + } + KVMap map; KVMap merged_map; for (auto key : source_strings) { @@ -619,6 +630,7 @@ TEST(WriteBatchWithIndexTest, TestRandomIteraratorWithBase) { TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) { ColumnFamilyHandleImplDummy cf1(6, BytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, BytewiseComparator()); WriteBatchWithIndex batch(BytewiseComparator(), 20, true); { @@ -659,7 +671,21 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) { AssertIter(iter.get(), "a", "aa"); } + // Test the case that there is one element in the write batch + batch.Put(&cf2, "zoo", "bar"); batch.Put(&cf1, "a", "aa"); + { + KVMap empty_map; + std::unique_ptr iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + batch.Delete(&cf1, "b"); batch.Put(&cf1, "c", "cc"); batch.Put(&cf1, "d", "dd"); @@ -725,6 +751,7 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) { iter->Next(); AssertIter(iter.get(), "f", "ff"); } + { KVMap empty_map; std::unique_ptr iter( @@ -763,6 +790,60 @@ TEST(WriteBatchWithIndexTest, TestIteraratorWithBase) { AssertIter(iter.get(), "c", "cc"); } } + +TEST(WriteBatchWithIndexTest, TestIteraratorWithBaseReverseCmp) { + ColumnFamilyHandleImplDummy cf1(6, ReverseBytewiseComparator()); + ColumnFamilyHandleImplDummy cf2(2, ReverseBytewiseComparator()); + WriteBatchWithIndex batch(BytewiseComparator(), 20, true); + + // Test the case that there is one element in the write batch + batch.Put(&cf2, "zoo", "bar"); + batch.Put(&cf1, "a", "aa"); + { + KVMap empty_map; + std::unique_ptr iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&empty_map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + } + + batch.Put(&cf1, "c", "cc"); + { + KVMap map; + std::unique_ptr iter( + batch.NewIteratorWithBase(&cf1, new KVIter(&map))); + + iter->SeekToFirst(); + AssertIter(iter.get(), "c", "cc"); + iter->Next(); + AssertIter(iter.get(), "a", "aa"); + iter->Next(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->SeekToLast(); + AssertIter(iter.get(), "a", "aa"); + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + iter->Prev(); + ASSERT_OK(iter->status()); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("b"); + AssertIter(iter.get(), "a", "aa"); + + iter->Prev(); + AssertIter(iter.get(), "c", "cc"); + + iter->Seek("a"); + AssertIter(iter.get(), "a", "aa"); + } +} + } // namespace int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }