6837a17621
Summary: CreateColumnFamily() releases DB mutex after adding column family to the set and install super version (to write option file), so if users call GetAggregatedIntProperty() in the middle, then super version will be null and the process will crash. Fix it by skipping those column families without super version installed. Maybe we should also fix the problem of releasing the lock when reading option file, but it is more risky. so I'm doing a quick and safer fix and we can investigate it later. Closes https://github.com/facebook/rocksdb/pull/2475 Differential Revision: D5298053 Pulled By: siying fbshipit-source-id: 4b3c8f91c60400b163fcc6cda8a0c77723be0ef6
1140 lines
42 KiB
C++
1140 lines
42 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
// This source code is also licensed under the GPLv2 license found in the
|
|
// COPYING file in the root directory of this source tree.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/column_family.h"
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
|
|
#include <inttypes.h>
|
|
#include <vector>
|
|
#include <string>
|
|
#include <algorithm>
|
|
#include <limits>
|
|
|
|
#include "db/compaction_picker.h"
|
|
#include "db/compaction_picker_universal.h"
|
|
#include "db/db_impl.h"
|
|
#include "db/internal_stats.h"
|
|
#include "db/job_context.h"
|
|
#include "db/table_properties_collector.h"
|
|
#include "db/version_set.h"
|
|
#include "db/write_controller.h"
|
|
#include "memtable/hash_skiplist_rep.h"
|
|
#include "monitoring/thread_status_util.h"
|
|
#include "options/options_helper.h"
|
|
#include "table/block_based_table_factory.h"
|
|
#include "util/autovector.h"
|
|
#include "util/compression.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
|
|
ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
|
|
: cfd_(column_family_data), db_(db), mutex_(mutex) {
|
|
if (cfd_ != nullptr) {
|
|
cfd_->Ref();
|
|
}
|
|
}
|
|
|
|
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
|
|
if (cfd_ != nullptr) {
|
|
#ifndef ROCKSDB_LITE
|
|
for (auto& listener : cfd_->ioptions()->listeners) {
|
|
listener->OnColumnFamilyHandleDeletionStarted(this);
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
// Job id == 0 means that this is not our background process, but rather
|
|
// user thread
|
|
JobContext job_context(0);
|
|
mutex_->Lock();
|
|
if (cfd_->Unref()) {
|
|
delete cfd_;
|
|
}
|
|
db_->FindObsoleteFiles(&job_context, false, true);
|
|
mutex_->Unlock();
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
db_->PurgeObsoleteFiles(job_context);
|
|
}
|
|
job_context.Clean();
|
|
}
|
|
}
|
|
|
|
uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); }
|
|
|
|
const std::string& ColumnFamilyHandleImpl::GetName() const {
|
|
return cfd()->GetName();
|
|
}
|
|
|
|
Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) {
|
|
#ifndef ROCKSDB_LITE
|
|
// accessing mutable cf-options requires db mutex.
|
|
InstrumentedMutexLock l(mutex_);
|
|
*desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions());
|
|
return Status::OK();
|
|
#else
|
|
return Status::NotSupported();
|
|
#endif // !ROCKSDB_LITE
|
|
}
|
|
|
|
const Comparator* ColumnFamilyHandleImpl::GetComparator() const {
|
|
return cfd()->user_comparator();
|
|
}
|
|
|
|
void GetIntTblPropCollectorFactory(
|
|
const ImmutableCFOptions& ioptions,
|
|
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
|
|
int_tbl_prop_collector_factories) {
|
|
auto& collector_factories = ioptions.table_properties_collector_factories;
|
|
for (size_t i = 0; i < ioptions.table_properties_collector_factories.size();
|
|
++i) {
|
|
assert(collector_factories[i]);
|
|
int_tbl_prop_collector_factories->emplace_back(
|
|
new UserKeyTablePropertiesCollectorFactory(collector_factories[i]));
|
|
}
|
|
// Add collector to collect internal key statistics
|
|
int_tbl_prop_collector_factories->emplace_back(
|
|
new InternalKeyPropertiesCollectorFactory);
|
|
}
|
|
|
|
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
|
|
if (!cf_options.compression_per_level.empty()) {
|
|
for (size_t level = 0; level < cf_options.compression_per_level.size();
|
|
++level) {
|
|
if (!CompressionTypeSupported(cf_options.compression_per_level[level])) {
|
|
return Status::InvalidArgument(
|
|
"Compression type " +
|
|
CompressionTypeToString(cf_options.compression_per_level[level]) +
|
|
" is not linked with the binary.");
|
|
}
|
|
}
|
|
} else {
|
|
if (!CompressionTypeSupported(cf_options.compression)) {
|
|
return Status::InvalidArgument(
|
|
"Compression type " +
|
|
CompressionTypeToString(cf_options.compression) +
|
|
" is not linked with the binary.");
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
|
|
if (cf_options.inplace_update_support) {
|
|
return Status::InvalidArgument(
|
|
"In-place memtable updates (inplace_update_support) is not compatible "
|
|
"with concurrent writes (allow_concurrent_memtable_write)");
|
|
}
|
|
if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) {
|
|
return Status::InvalidArgument(
|
|
"Memtable doesn't concurrent writes (allow_concurrent_memtable_write)");
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
|
|
const ColumnFamilyOptions& src) {
|
|
ColumnFamilyOptions result = src;
|
|
size_t clamp_max = std::conditional<
|
|
sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>,
|
|
std::integral_constant<uint64_t, 64ull << 30>>::type::value;
|
|
ClipToRange(&result.write_buffer_size, ((size_t)64) << 10, clamp_max);
|
|
// if user sets arena_block_size, we trust user to use this value. Otherwise,
|
|
// calculate a proper value from writer_buffer_size;
|
|
if (result.arena_block_size <= 0) {
|
|
result.arena_block_size = result.write_buffer_size / 8;
|
|
|
|
// Align up to 4k
|
|
const size_t align = 4 * 1024;
|
|
result.arena_block_size =
|
|
((result.arena_block_size + align - 1) / align) * align;
|
|
}
|
|
result.min_write_buffer_number_to_merge =
|
|
std::min(result.min_write_buffer_number_to_merge,
|
|
result.max_write_buffer_number - 1);
|
|
if (result.min_write_buffer_number_to_merge < 1) {
|
|
result.min_write_buffer_number_to_merge = 1;
|
|
}
|
|
|
|
if (result.num_levels < 1) {
|
|
result.num_levels = 1;
|
|
}
|
|
if (result.compaction_style == kCompactionStyleLevel &&
|
|
result.num_levels < 2) {
|
|
result.num_levels = 2;
|
|
}
|
|
|
|
if (result.compaction_style == kCompactionStyleUniversal &&
|
|
db_options.allow_ingest_behind && result.num_levels < 3) {
|
|
result.num_levels = 3;
|
|
}
|
|
|
|
if (result.max_write_buffer_number < 2) {
|
|
result.max_write_buffer_number = 2;
|
|
}
|
|
if (result.max_write_buffer_number_to_maintain < 0) {
|
|
result.max_write_buffer_number_to_maintain = result.max_write_buffer_number;
|
|
}
|
|
// bloom filter size shouldn't exceed 1/4 of memtable size.
|
|
if (result.memtable_prefix_bloom_size_ratio > 0.25) {
|
|
result.memtable_prefix_bloom_size_ratio = 0.25;
|
|
} else if (result.memtable_prefix_bloom_size_ratio < 0) {
|
|
result.memtable_prefix_bloom_size_ratio = 0;
|
|
}
|
|
|
|
if (!result.prefix_extractor) {
|
|
assert(result.memtable_factory);
|
|
Slice name = result.memtable_factory->Name();
|
|
if (name.compare("HashSkipListRepFactory") == 0 ||
|
|
name.compare("HashLinkListRepFactory") == 0) {
|
|
result.memtable_factory = std::make_shared<SkipListFactory>();
|
|
}
|
|
}
|
|
|
|
if (result.compaction_style == kCompactionStyleFIFO) {
|
|
result.num_levels = 1;
|
|
// since we delete level0 files in FIFO compaction when there are too many
|
|
// of them, these options don't really mean anything
|
|
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
|
|
result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
|
|
}
|
|
|
|
if (result.max_bytes_for_level_multiplier <= 0) {
|
|
result.max_bytes_for_level_multiplier = 1;
|
|
}
|
|
|
|
if (result.level0_file_num_compaction_trigger == 0) {
|
|
ROCKS_LOG_WARN(db_options.info_log.get(),
|
|
"level0_file_num_compaction_trigger cannot be 0");
|
|
result.level0_file_num_compaction_trigger = 1;
|
|
}
|
|
|
|
if (result.level0_stop_writes_trigger <
|
|
result.level0_slowdown_writes_trigger ||
|
|
result.level0_slowdown_writes_trigger <
|
|
result.level0_file_num_compaction_trigger) {
|
|
ROCKS_LOG_WARN(db_options.info_log.get(),
|
|
"This condition must be satisfied: "
|
|
"level0_stop_writes_trigger(%d) >= "
|
|
"level0_slowdown_writes_trigger(%d) >= "
|
|
"level0_file_num_compaction_trigger(%d)",
|
|
result.level0_stop_writes_trigger,
|
|
result.level0_slowdown_writes_trigger,
|
|
result.level0_file_num_compaction_trigger);
|
|
if (result.level0_slowdown_writes_trigger <
|
|
result.level0_file_num_compaction_trigger) {
|
|
result.level0_slowdown_writes_trigger =
|
|
result.level0_file_num_compaction_trigger;
|
|
}
|
|
if (result.level0_stop_writes_trigger <
|
|
result.level0_slowdown_writes_trigger) {
|
|
result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger;
|
|
}
|
|
ROCKS_LOG_WARN(db_options.info_log.get(),
|
|
"Adjust the value to "
|
|
"level0_stop_writes_trigger(%d)"
|
|
"level0_slowdown_writes_trigger(%d)"
|
|
"level0_file_num_compaction_trigger(%d)",
|
|
result.level0_stop_writes_trigger,
|
|
result.level0_slowdown_writes_trigger,
|
|
result.level0_file_num_compaction_trigger);
|
|
}
|
|
|
|
if (result.soft_pending_compaction_bytes_limit == 0) {
|
|
result.soft_pending_compaction_bytes_limit =
|
|
result.hard_pending_compaction_bytes_limit;
|
|
} else if (result.hard_pending_compaction_bytes_limit > 0 &&
|
|
result.soft_pending_compaction_bytes_limit >
|
|
result.hard_pending_compaction_bytes_limit) {
|
|
result.soft_pending_compaction_bytes_limit =
|
|
result.hard_pending_compaction_bytes_limit;
|
|
}
|
|
|
|
if (result.level_compaction_dynamic_level_bytes) {
|
|
if (result.compaction_style != kCompactionStyleLevel ||
|
|
db_options.db_paths.size() > 1U) {
|
|
// 1. level_compaction_dynamic_level_bytes only makes sense for
|
|
// level-based compaction.
|
|
// 2. we don't yet know how to make both of this feature and multiple
|
|
// DB path work.
|
|
result.level_compaction_dynamic_level_bytes = false;
|
|
}
|
|
}
|
|
|
|
if (result.max_compaction_bytes == 0) {
|
|
result.max_compaction_bytes = result.target_file_size_base * 25;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
int SuperVersion::dummy = 0;
|
|
void* const SuperVersion::kSVInUse = &SuperVersion::dummy;
|
|
void* const SuperVersion::kSVObsolete = nullptr;
|
|
|
|
SuperVersion::~SuperVersion() {
|
|
for (auto td : to_delete) {
|
|
delete td;
|
|
}
|
|
}
|
|
|
|
SuperVersion* SuperVersion::Ref() {
|
|
refs.fetch_add(1, std::memory_order_relaxed);
|
|
return this;
|
|
}
|
|
|
|
bool SuperVersion::Unref() {
|
|
// fetch_sub returns the previous value of ref
|
|
uint32_t previous_refs = refs.fetch_sub(1);
|
|
assert(previous_refs > 0);
|
|
return previous_refs == 1;
|
|
}
|
|
|
|
void SuperVersion::Cleanup() {
|
|
assert(refs.load(std::memory_order_relaxed) == 0);
|
|
imm->Unref(&to_delete);
|
|
MemTable* m = mem->Unref();
|
|
if (m != nullptr) {
|
|
auto* memory_usage = current->cfd()->imm()->current_memory_usage();
|
|
assert(*memory_usage >= m->ApproximateMemoryUsage());
|
|
*memory_usage -= m->ApproximateMemoryUsage();
|
|
to_delete.push_back(m);
|
|
}
|
|
current->Unref();
|
|
}
|
|
|
|
void SuperVersion::Init(MemTable* new_mem, MemTableListVersion* new_imm,
|
|
Version* new_current) {
|
|
mem = new_mem;
|
|
imm = new_imm;
|
|
current = new_current;
|
|
mem->Ref();
|
|
imm->Ref();
|
|
current->Ref();
|
|
refs.store(1, std::memory_order_relaxed);
|
|
}
|
|
|
|
namespace {
|
|
void SuperVersionUnrefHandle(void* ptr) {
|
|
// UnrefHandle is called when a thread exists or a ThreadLocalPtr gets
|
|
// destroyed. When former happens, the thread shouldn't see kSVInUse.
|
|
// When latter happens, we are in ~ColumnFamilyData(), no get should happen as
|
|
// well.
|
|
SuperVersion* sv = static_cast<SuperVersion*>(ptr);
|
|
if (sv->Unref()) {
|
|
sv->db_mutex->Lock();
|
|
sv->Cleanup();
|
|
sv->db_mutex->Unlock();
|
|
delete sv;
|
|
}
|
|
}
|
|
} // anonymous namespace
|
|
|
|
ColumnFamilyData::ColumnFamilyData(
|
|
uint32_t id, const std::string& name, Version* _dummy_versions,
|
|
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
|
|
const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options,
|
|
const EnvOptions& env_options, ColumnFamilySet* column_family_set)
|
|
: id_(id),
|
|
name_(name),
|
|
dummy_versions_(_dummy_versions),
|
|
current_(nullptr),
|
|
refs_(0),
|
|
initialized_(false),
|
|
dropped_(false),
|
|
internal_comparator_(cf_options.comparator),
|
|
initial_cf_options_(SanitizeOptions(db_options, cf_options)),
|
|
ioptions_(db_options, initial_cf_options_),
|
|
mutable_cf_options_(initial_cf_options_),
|
|
is_delete_range_supported_(strcmp(cf_options.table_factory->Name(),
|
|
BlockBasedTableFactory().Name()) == 0),
|
|
write_buffer_manager_(write_buffer_manager),
|
|
mem_(nullptr),
|
|
imm_(ioptions_.min_write_buffer_number_to_merge,
|
|
ioptions_.max_write_buffer_number_to_maintain),
|
|
super_version_(nullptr),
|
|
super_version_number_(0),
|
|
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
|
|
next_(nullptr),
|
|
prev_(nullptr),
|
|
log_number_(0),
|
|
column_family_set_(column_family_set),
|
|
pending_flush_(false),
|
|
pending_compaction_(false),
|
|
prev_compaction_needed_bytes_(0),
|
|
allow_2pc_(db_options.allow_2pc) {
|
|
Ref();
|
|
|
|
// Convert user defined table properties collector factories to internal ones.
|
|
GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_);
|
|
|
|
// if _dummy_versions is nullptr, then this is a dummy column family.
|
|
if (_dummy_versions != nullptr) {
|
|
internal_stats_.reset(
|
|
new InternalStats(ioptions_.num_levels, db_options.env, this));
|
|
table_cache_.reset(new TableCache(ioptions_, env_options, _table_cache));
|
|
if (ioptions_.compaction_style == kCompactionStyleLevel) {
|
|
compaction_picker_.reset(
|
|
new LevelCompactionPicker(ioptions_, &internal_comparator_));
|
|
#ifndef ROCKSDB_LITE
|
|
} else if (ioptions_.compaction_style == kCompactionStyleUniversal) {
|
|
compaction_picker_.reset(
|
|
new UniversalCompactionPicker(ioptions_, &internal_comparator_));
|
|
} else if (ioptions_.compaction_style == kCompactionStyleFIFO) {
|
|
compaction_picker_.reset(
|
|
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
|
|
} else if (ioptions_.compaction_style == kCompactionStyleNone) {
|
|
compaction_picker_.reset(new NullCompactionPicker(
|
|
ioptions_, &internal_comparator_));
|
|
ROCKS_LOG_WARN(ioptions_.info_log,
|
|
"Column family %s does not use any background compaction. "
|
|
"Compactions can only be done via CompactFiles\n",
|
|
GetName().c_str());
|
|
#endif // !ROCKSDB_LITE
|
|
} else {
|
|
ROCKS_LOG_ERROR(ioptions_.info_log,
|
|
"Unable to recognize the specified compaction style %d. "
|
|
"Column family %s will use kCompactionStyleLevel.\n",
|
|
ioptions_.compaction_style, GetName().c_str());
|
|
compaction_picker_.reset(
|
|
new LevelCompactionPicker(ioptions_, &internal_comparator_));
|
|
}
|
|
|
|
if (column_family_set_->NumberOfColumnFamilies() < 10) {
|
|
ROCKS_LOG_INFO(ioptions_.info_log,
|
|
"--------------- Options for column family [%s]:\n",
|
|
name.c_str());
|
|
initial_cf_options_.Dump(ioptions_.info_log);
|
|
} else {
|
|
ROCKS_LOG_INFO(ioptions_.info_log, "\t(skipping printing options)\n");
|
|
}
|
|
}
|
|
|
|
RecalculateWriteStallConditions(mutable_cf_options_);
|
|
}
|
|
|
|
// DB mutex held
|
|
ColumnFamilyData::~ColumnFamilyData() {
|
|
assert(refs_.load(std::memory_order_relaxed) == 0);
|
|
// remove from linked list
|
|
auto prev = prev_;
|
|
auto next = next_;
|
|
prev->next_ = next;
|
|
next->prev_ = prev;
|
|
|
|
if (!dropped_ && column_family_set_ != nullptr) {
|
|
// If it's dropped, it's already removed from column family set
|
|
// If column_family_set_ == nullptr, this is dummy CFD and not in
|
|
// ColumnFamilySet
|
|
column_family_set_->RemoveColumnFamily(this);
|
|
}
|
|
|
|
if (current_ != nullptr) {
|
|
current_->Unref();
|
|
}
|
|
|
|
// It would be wrong if this ColumnFamilyData is in flush_queue_ or
|
|
// compaction_queue_ and we destroyed it
|
|
assert(!pending_flush_);
|
|
assert(!pending_compaction_);
|
|
|
|
if (super_version_ != nullptr) {
|
|
// Release SuperVersion reference kept in ThreadLocalPtr.
|
|
// This must be done outside of mutex_ since unref handler can lock mutex.
|
|
super_version_->db_mutex->Unlock();
|
|
local_sv_.reset();
|
|
super_version_->db_mutex->Lock();
|
|
|
|
bool is_last_reference __attribute__((unused));
|
|
is_last_reference = super_version_->Unref();
|
|
assert(is_last_reference);
|
|
super_version_->Cleanup();
|
|
delete super_version_;
|
|
super_version_ = nullptr;
|
|
}
|
|
|
|
if (dummy_versions_ != nullptr) {
|
|
// List must be empty
|
|
assert(dummy_versions_->TEST_Next() == dummy_versions_);
|
|
bool deleted __attribute__((unused)) = dummy_versions_->Unref();
|
|
assert(deleted);
|
|
}
|
|
|
|
if (mem_ != nullptr) {
|
|
delete mem_->Unref();
|
|
}
|
|
autovector<MemTable*> to_delete;
|
|
imm_.current()->Unref(&to_delete);
|
|
for (MemTable* m : to_delete) {
|
|
delete m;
|
|
}
|
|
}
|
|
|
|
void ColumnFamilyData::SetDropped() {
|
|
// can't drop default CF
|
|
assert(id_ != 0);
|
|
dropped_ = true;
|
|
write_controller_token_.reset();
|
|
|
|
// remove from column_family_set
|
|
column_family_set_->RemoveColumnFamily(this);
|
|
}
|
|
|
|
ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
|
|
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
|
|
}
|
|
|
|
uint64_t ColumnFamilyData::OldestLogToKeep() {
|
|
auto current_log = GetLogNumber();
|
|
|
|
if (allow_2pc_) {
|
|
auto imm_prep_log = imm()->GetMinLogContainingPrepSection();
|
|
auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
|
|
|
|
if (imm_prep_log > 0 && imm_prep_log < current_log) {
|
|
current_log = imm_prep_log;
|
|
}
|
|
|
|
if (mem_prep_log > 0 && mem_prep_log < current_log) {
|
|
current_log = mem_prep_log;
|
|
}
|
|
}
|
|
|
|
return current_log;
|
|
}
|
|
|
|
const double kIncSlowdownRatio = 0.8;
|
|
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
|
|
const double kNearStopSlowdownRatio = 0.6;
|
|
const double kDelayRecoverSlowdownRatio = 1.4;
|
|
|
|
namespace {
|
|
// If penalize_stop is true, we further reduce slowdown rate.
|
|
std::unique_ptr<WriteControllerToken> SetupDelay(
|
|
WriteController* write_controller, uint64_t compaction_needed_bytes,
|
|
uint64_t prev_compaction_need_bytes, bool penalize_stop,
|
|
bool auto_comapctions_disabled) {
|
|
const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
|
|
|
|
uint64_t max_write_rate = write_controller->max_delayed_write_rate();
|
|
uint64_t write_rate = write_controller->delayed_write_rate();
|
|
|
|
if (auto_comapctions_disabled) {
|
|
// When auto compaction is disabled, always use the value user gave.
|
|
write_rate = max_write_rate;
|
|
} else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
|
|
// If user gives rate less than kMinWriteRate, don't adjust it.
|
|
//
|
|
// If already delayed, need to adjust based on previous compaction debt.
|
|
// When there are two or more column families require delay, we always
|
|
// increase or reduce write rate based on information for one single
|
|
// column family. It is likely to be OK but we can improve if there is a
|
|
// problem.
|
|
// Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
|
|
// is only available in level-based compaction
|
|
//
|
|
// If the compaction debt stays the same as previously, we also further slow
|
|
// down. It usually means a mem table is full. It's mainly for the case
|
|
// where both of flush and compaction are much slower than the speed we
|
|
// insert to mem tables, so we need to actively slow down before we get
|
|
// feedback signal from compaction and flushes to avoid the full stop
|
|
// because of hitting the max write buffer number.
|
|
//
|
|
// If DB just falled into the stop condition, we need to further reduce
|
|
// the write rate to avoid the stop condition.
|
|
if (penalize_stop) {
|
|
// Penalize the near stop or stop condition by more aggressive slowdown.
|
|
// This is to provide the long term slowdown increase signal.
|
|
// The penalty is more than the reward of recovering to the normal
|
|
// condition.
|
|
write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
|
|
kNearStopSlowdownRatio);
|
|
if (write_rate < kMinWriteRate) {
|
|
write_rate = kMinWriteRate;
|
|
}
|
|
} else if (prev_compaction_need_bytes > 0 &&
|
|
prev_compaction_need_bytes <= compaction_needed_bytes) {
|
|
write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
|
|
kIncSlowdownRatio);
|
|
if (write_rate < kMinWriteRate) {
|
|
write_rate = kMinWriteRate;
|
|
}
|
|
} else if (prev_compaction_need_bytes > compaction_needed_bytes) {
|
|
// We are speeding up by ratio of kSlowdownRatio when we have paid
|
|
// compaction debt. But we'll never speed up to faster than the write rate
|
|
// given by users.
|
|
write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
|
|
kDecSlowdownRatio);
|
|
if (write_rate > max_write_rate) {
|
|
write_rate = max_write_rate;
|
|
}
|
|
}
|
|
}
|
|
return write_controller->GetDelayToken(write_rate);
|
|
}
|
|
|
|
int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
|
|
int level0_slowdown_writes_trigger) {
|
|
// SanitizeOptions() ensures it.
|
|
assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);
|
|
|
|
if (level0_file_num_compaction_trigger < 0) {
|
|
return std::numeric_limits<int>::max();
|
|
}
|
|
|
|
const int64_t twice_level0_trigger =
|
|
static_cast<int64_t>(level0_file_num_compaction_trigger) * 2;
|
|
|
|
const int64_t one_fourth_trigger_slowdown =
|
|
static_cast<int64_t>(level0_file_num_compaction_trigger) +
|
|
((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) /
|
|
4);
|
|
|
|
assert(twice_level0_trigger >= 0);
|
|
assert(one_fourth_trigger_slowdown >= 0);
|
|
|
|
// 1/4 of the way between L0 compaction trigger threshold and slowdown
|
|
// condition.
|
|
// Or twice as compaction trigger, if it is smaller.
|
|
int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown);
|
|
if (res >= port::kMaxInt32) {
|
|
return port::kMaxInt32;
|
|
} else {
|
|
// res fits in int
|
|
return static_cast<int>(res);
|
|
}
|
|
}
|
|
} // namespace
|
|
|
|
void ColumnFamilyData::RecalculateWriteStallConditions(
|
|
const MutableCFOptions& mutable_cf_options) {
|
|
if (current_ != nullptr) {
|
|
auto* vstorage = current_->storage_info();
|
|
auto write_controller = column_family_set_->write_controller_;
|
|
uint64_t compaction_needed_bytes =
|
|
vstorage->estimated_compaction_needed_bytes();
|
|
|
|
bool was_stopped = write_controller->IsStopped();
|
|
bool needed_delay = write_controller->NeedsDelay();
|
|
|
|
if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
|
|
write_controller_token_ = write_controller->GetStopToken();
|
|
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
|
|
ROCKS_LOG_WARN(
|
|
ioptions_.info_log,
|
|
"[%s] Stopping writes because we have %d immutable memtables "
|
|
"(waiting for flush), max_write_buffer_number is set to %d",
|
|
name_.c_str(), imm()->NumNotFlushed(),
|
|
mutable_cf_options.max_write_buffer_number);
|
|
} else if (!mutable_cf_options.disable_auto_compactions &&
|
|
vstorage->l0_delay_trigger_count() >=
|
|
mutable_cf_options.level0_stop_writes_trigger) {
|
|
write_controller_token_ = write_controller->GetStopToken();
|
|
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
|
|
if (compaction_picker_->IsLevel0CompactionInProgress()) {
|
|
internal_stats_->AddCFStats(
|
|
InternalStats::LEVEL0_NUM_FILES_WITH_COMPACTION, 1);
|
|
}
|
|
ROCKS_LOG_WARN(ioptions_.info_log,
|
|
"[%s] Stopping writes because we have %d level-0 files",
|
|
name_.c_str(), vstorage->l0_delay_trigger_count());
|
|
} else if (!mutable_cf_options.disable_auto_compactions &&
|
|
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
|
compaction_needed_bytes >=
|
|
mutable_cf_options.hard_pending_compaction_bytes_limit) {
|
|
write_controller_token_ = write_controller->GetStopToken();
|
|
internal_stats_->AddCFStats(
|
|
InternalStats::HARD_PENDING_COMPACTION_BYTES_LIMIT, 1);
|
|
ROCKS_LOG_WARN(
|
|
ioptions_.info_log,
|
|
"[%s] Stopping writes because of estimated pending compaction "
|
|
"bytes %" PRIu64,
|
|
name_.c_str(), compaction_needed_bytes);
|
|
} else if (mutable_cf_options.max_write_buffer_number > 3 &&
|
|
imm()->NumNotFlushed() >=
|
|
mutable_cf_options.max_write_buffer_number - 1) {
|
|
write_controller_token_ =
|
|
SetupDelay(write_controller, compaction_needed_bytes,
|
|
prev_compaction_needed_bytes_, was_stopped,
|
|
mutable_cf_options.disable_auto_compactions);
|
|
internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
|
|
ROCKS_LOG_WARN(
|
|
ioptions_.info_log,
|
|
"[%s] Stalling writes because we have %d immutable memtables "
|
|
"(waiting for flush), max_write_buffer_number is set to %d "
|
|
"rate %" PRIu64,
|
|
name_.c_str(), imm()->NumNotFlushed(),
|
|
mutable_cf_options.max_write_buffer_number,
|
|
write_controller->delayed_write_rate());
|
|
} else if (!mutable_cf_options.disable_auto_compactions &&
|
|
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
|
|
vstorage->l0_delay_trigger_count() >=
|
|
mutable_cf_options.level0_slowdown_writes_trigger) {
|
|
// L0 is the last two files from stopping.
|
|
bool near_stop = vstorage->l0_delay_trigger_count() >=
|
|
mutable_cf_options.level0_stop_writes_trigger - 2;
|
|
write_controller_token_ =
|
|
SetupDelay(write_controller, compaction_needed_bytes,
|
|
prev_compaction_needed_bytes_, was_stopped || near_stop,
|
|
mutable_cf_options.disable_auto_compactions);
|
|
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
|
|
if (compaction_picker_->IsLevel0CompactionInProgress()) {
|
|
internal_stats_->AddCFStats(
|
|
InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
|
|
}
|
|
ROCKS_LOG_WARN(ioptions_.info_log,
|
|
"[%s] Stalling writes because we have %d level-0 files "
|
|
"rate %" PRIu64,
|
|
name_.c_str(), vstorage->l0_delay_trigger_count(),
|
|
write_controller->delayed_write_rate());
|
|
} else if (!mutable_cf_options.disable_auto_compactions &&
|
|
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
|
|
vstorage->estimated_compaction_needed_bytes() >=
|
|
mutable_cf_options.soft_pending_compaction_bytes_limit) {
|
|
// If the distance to hard limit is less than 1/4 of the gap between soft
|
|
// and
|
|
// hard bytes limit, we think it is near stop and speed up the slowdown.
|
|
bool near_stop =
|
|
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
|
|
(compaction_needed_bytes -
|
|
mutable_cf_options.soft_pending_compaction_bytes_limit) >
|
|
3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
|
|
mutable_cf_options.soft_pending_compaction_bytes_limit) /
|
|
4;
|
|
|
|
write_controller_token_ =
|
|
SetupDelay(write_controller, compaction_needed_bytes,
|
|
prev_compaction_needed_bytes_, was_stopped || near_stop,
|
|
mutable_cf_options.disable_auto_compactions);
|
|
internal_stats_->AddCFStats(
|
|
InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
|
|
ROCKS_LOG_WARN(
|
|
ioptions_.info_log,
|
|
"[%s] Stalling writes because of estimated pending compaction "
|
|
"bytes %" PRIu64 " rate %" PRIu64,
|
|
name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
|
|
write_controller->delayed_write_rate());
|
|
} else {
|
|
if (vstorage->l0_delay_trigger_count() >=
|
|
GetL0ThresholdSpeedupCompaction(
|
|
mutable_cf_options.level0_file_num_compaction_trigger,
|
|
mutable_cf_options.level0_slowdown_writes_trigger)) {
|
|
write_controller_token_ =
|
|
write_controller->GetCompactionPressureToken();
|
|
ROCKS_LOG_INFO(
|
|
ioptions_.info_log,
|
|
"[%s] Increasing compaction threads because we have %d level-0 "
|
|
"files ",
|
|
name_.c_str(), vstorage->l0_delay_trigger_count());
|
|
} else if (vstorage->estimated_compaction_needed_bytes() >=
|
|
mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
|
|
// Increase compaction threads if bytes needed for compaction exceeds
|
|
// 1/4 of threshold for slowing down.
|
|
// If soft pending compaction byte limit is not set, always speed up
|
|
// compaction.
|
|
write_controller_token_ =
|
|
write_controller->GetCompactionPressureToken();
|
|
if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
|
|
ROCKS_LOG_INFO(
|
|
ioptions_.info_log,
|
|
"[%s] Increasing compaction threads because of estimated pending "
|
|
"compaction "
|
|
"bytes %" PRIu64,
|
|
name_.c_str(), vstorage->estimated_compaction_needed_bytes());
|
|
}
|
|
} else {
|
|
write_controller_token_.reset();
|
|
}
|
|
// If the DB recovers from delay conditions, we reward with reducing
|
|
// double the slowdown ratio. This is to balance the long term slowdown
|
|
// increase signal.
|
|
if (needed_delay) {
|
|
uint64_t write_rate = write_controller->delayed_write_rate();
|
|
write_controller->set_delayed_write_rate(static_cast<uint64_t>(
|
|
static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
|
|
// Set the low pri limit to be 1/4 the delayed write rate.
|
|
// Note we don't reset this value even after delay condition is relased.
|
|
// Low-pri rate will continue to apply if there is a compaction
|
|
// pressure.
|
|
write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate /
|
|
4);
|
|
}
|
|
}
|
|
prev_compaction_needed_bytes_ = compaction_needed_bytes;
|
|
}
|
|
}
|
|
|
|
const EnvOptions* ColumnFamilyData::soptions() const {
|
|
return &(column_family_set_->env_options_);
|
|
}
|
|
|
|
void ColumnFamilyData::SetCurrent(Version* current_version) {
|
|
current_ = current_version;
|
|
}
|
|
|
|
uint64_t ColumnFamilyData::GetNumLiveVersions() const {
|
|
return VersionSet::GetNumLiveVersions(dummy_versions_);
|
|
}
|
|
|
|
uint64_t ColumnFamilyData::GetTotalSstFilesSize() const {
|
|
return VersionSet::GetTotalSstFilesSize(dummy_versions_);
|
|
}
|
|
|
|
MemTable* ColumnFamilyData::ConstructNewMemtable(
|
|
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
|
|
return new MemTable(internal_comparator_, ioptions_, mutable_cf_options,
|
|
write_buffer_manager_, earliest_seq, id_);
|
|
}
|
|
|
|
void ColumnFamilyData::CreateNewMemtable(
|
|
const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) {
|
|
if (mem_ != nullptr) {
|
|
delete mem_->Unref();
|
|
}
|
|
SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq));
|
|
mem_->Ref();
|
|
}
|
|
|
|
bool ColumnFamilyData::NeedsCompaction() const {
|
|
return compaction_picker_->NeedsCompaction(current_->storage_info());
|
|
}
|
|
|
|
Compaction* ColumnFamilyData::PickCompaction(
|
|
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
|
|
auto* result = compaction_picker_->PickCompaction(
|
|
GetName(), mutable_options, current_->storage_info(), log_buffer);
|
|
if (result != nullptr) {
|
|
result->SetInputVersion(current_);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
bool ColumnFamilyData::RangeOverlapWithCompaction(
|
|
const Slice& smallest_user_key, const Slice& largest_user_key,
|
|
int level) const {
|
|
return compaction_picker_->RangeOverlapWithCompaction(
|
|
smallest_user_key, largest_user_key, level);
|
|
}
|
|
|
|
const int ColumnFamilyData::kCompactAllLevels = -1;
|
|
const int ColumnFamilyData::kCompactToBaseLevel = -2;
|
|
|
|
Compaction* ColumnFamilyData::CompactRange(
|
|
const MutableCFOptions& mutable_cf_options, int input_level,
|
|
int output_level, uint32_t output_path_id, const InternalKey* begin,
|
|
const InternalKey* end, InternalKey** compaction_end, bool* conflict) {
|
|
auto* result = compaction_picker_->CompactRange(
|
|
GetName(), mutable_cf_options, current_->storage_info(), input_level,
|
|
output_level, output_path_id, begin, end, compaction_end, conflict);
|
|
if (result != nullptr) {
|
|
result->SetInputVersion(current_);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
|
|
InstrumentedMutex* db_mutex) {
|
|
SuperVersion* sv = nullptr;
|
|
sv = GetThreadLocalSuperVersion(db_mutex);
|
|
sv->Ref();
|
|
if (!ReturnThreadLocalSuperVersion(sv)) {
|
|
sv->Unref();
|
|
}
|
|
return sv;
|
|
}
|
|
|
|
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
|
|
InstrumentedMutex* db_mutex) {
|
|
SuperVersion* sv = nullptr;
|
|
// The SuperVersion is cached in thread local storage to avoid acquiring
|
|
// mutex when SuperVersion does not change since the last use. When a new
|
|
// SuperVersion is installed, the compaction or flush thread cleans up
|
|
// cached SuperVersion in all existing thread local storage. To avoid
|
|
// acquiring mutex for this operation, we use atomic Swap() on the thread
|
|
// local pointer to guarantee exclusive access. If the thread local pointer
|
|
// is being used while a new SuperVersion is installed, the cached
|
|
// SuperVersion can become stale. In that case, the background thread would
|
|
// have swapped in kSVObsolete. We re-check the value at when returning
|
|
// SuperVersion back to thread local, with an atomic compare and swap.
|
|
// The superversion will need to be released if detected to be stale.
|
|
void* ptr = local_sv_->Swap(SuperVersion::kSVInUse);
|
|
// Invariant:
|
|
// (1) Scrape (always) installs kSVObsolete in ThreadLocal storage
|
|
// (2) the Swap above (always) installs kSVInUse, ThreadLocal storage
|
|
// should only keep kSVInUse before ReturnThreadLocalSuperVersion call
|
|
// (if no Scrape happens).
|
|
assert(ptr != SuperVersion::kSVInUse);
|
|
sv = static_cast<SuperVersion*>(ptr);
|
|
if (sv == SuperVersion::kSVObsolete ||
|
|
sv->version_number != super_version_number_.load()) {
|
|
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
|
|
SuperVersion* sv_to_delete = nullptr;
|
|
|
|
if (sv && sv->Unref()) {
|
|
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
|
|
db_mutex->Lock();
|
|
// NOTE: underlying resources held by superversion (sst files) might
|
|
// not be released until the next background job.
|
|
sv->Cleanup();
|
|
sv_to_delete = sv;
|
|
} else {
|
|
db_mutex->Lock();
|
|
}
|
|
sv = super_version_->Ref();
|
|
db_mutex->Unlock();
|
|
|
|
delete sv_to_delete;
|
|
}
|
|
assert(sv != nullptr);
|
|
return sv;
|
|
}
|
|
|
|
bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) {
|
|
assert(sv != nullptr);
|
|
// Put the SuperVersion back
|
|
void* expected = SuperVersion::kSVInUse;
|
|
if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) {
|
|
// When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal
|
|
// storage has not been altered and no Scrape has happened. The
|
|
// SuperVersion is still current.
|
|
return true;
|
|
} else {
|
|
// ThreadLocal scrape happened in the process of this GetImpl call (after
|
|
// thread local Swap() at the beginning and before CompareAndSwap()).
|
|
// This means the SuperVersion it holds is obsolete.
|
|
assert(expected == SuperVersion::kSVObsolete);
|
|
}
|
|
return false;
|
|
}
|
|
|
|
SuperVersion* ColumnFamilyData::InstallSuperVersion(
|
|
SuperVersion* new_superversion, InstrumentedMutex* db_mutex) {
|
|
db_mutex->AssertHeld();
|
|
return InstallSuperVersion(new_superversion, db_mutex, mutable_cf_options_);
|
|
}
|
|
|
|
SuperVersion* ColumnFamilyData::InstallSuperVersion(
|
|
SuperVersion* new_superversion, InstrumentedMutex* db_mutex,
|
|
const MutableCFOptions& mutable_cf_options) {
|
|
new_superversion->db_mutex = db_mutex;
|
|
new_superversion->mutable_cf_options = mutable_cf_options;
|
|
new_superversion->Init(mem_, imm_.current(), current_);
|
|
SuperVersion* old_superversion = super_version_;
|
|
super_version_ = new_superversion;
|
|
++super_version_number_;
|
|
super_version_->version_number = super_version_number_;
|
|
// Reset SuperVersions cached in thread local storage
|
|
ResetThreadLocalSuperVersions();
|
|
|
|
RecalculateWriteStallConditions(mutable_cf_options);
|
|
|
|
if (old_superversion != nullptr && old_superversion->Unref()) {
|
|
old_superversion->Cleanup();
|
|
return old_superversion; // will let caller delete outside of mutex
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
void ColumnFamilyData::ResetThreadLocalSuperVersions() {
|
|
autovector<void*> sv_ptrs;
|
|
local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete);
|
|
for (auto ptr : sv_ptrs) {
|
|
assert(ptr);
|
|
if (ptr == SuperVersion::kSVInUse) {
|
|
continue;
|
|
}
|
|
auto sv = static_cast<SuperVersion*>(ptr);
|
|
if (sv->Unref()) {
|
|
sv->Cleanup();
|
|
delete sv;
|
|
}
|
|
}
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
Status ColumnFamilyData::SetOptions(
|
|
const std::unordered_map<std::string, std::string>& options_map) {
|
|
MutableCFOptions new_mutable_cf_options;
|
|
Status s = GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
|
|
&new_mutable_cf_options);
|
|
if (s.ok()) {
|
|
mutable_cf_options_ = new_mutable_cf_options;
|
|
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
|
|
}
|
|
return s;
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
|
|
const ImmutableDBOptions* db_options,
|
|
const EnvOptions& env_options,
|
|
Cache* table_cache,
|
|
WriteBufferManager* write_buffer_manager,
|
|
WriteController* write_controller)
|
|
: max_column_family_(0),
|
|
dummy_cfd_(new ColumnFamilyData(0, "", nullptr, nullptr, nullptr,
|
|
ColumnFamilyOptions(), *db_options,
|
|
env_options, nullptr)),
|
|
default_cfd_cache_(nullptr),
|
|
db_name_(dbname),
|
|
db_options_(db_options),
|
|
env_options_(env_options),
|
|
table_cache_(table_cache),
|
|
write_buffer_manager_(write_buffer_manager),
|
|
write_controller_(write_controller) {
|
|
// initialize linked list
|
|
dummy_cfd_->prev_ = dummy_cfd_;
|
|
dummy_cfd_->next_ = dummy_cfd_;
|
|
}
|
|
|
|
ColumnFamilySet::~ColumnFamilySet() {
|
|
while (column_family_data_.size() > 0) {
|
|
// cfd destructor will delete itself from column_family_data_
|
|
auto cfd = column_family_data_.begin()->second;
|
|
cfd->Unref();
|
|
delete cfd;
|
|
}
|
|
dummy_cfd_->Unref();
|
|
delete dummy_cfd_;
|
|
}
|
|
|
|
ColumnFamilyData* ColumnFamilySet::GetDefault() const {
|
|
assert(default_cfd_cache_ != nullptr);
|
|
return default_cfd_cache_;
|
|
}
|
|
|
|
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const {
|
|
auto cfd_iter = column_family_data_.find(id);
|
|
if (cfd_iter != column_family_data_.end()) {
|
|
return cfd_iter->second;
|
|
} else {
|
|
return nullptr;
|
|
}
|
|
}
|
|
|
|
ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name)
|
|
const {
|
|
auto cfd_iter = column_families_.find(name);
|
|
if (cfd_iter != column_families_.end()) {
|
|
auto cfd = GetColumnFamily(cfd_iter->second);
|
|
assert(cfd != nullptr);
|
|
return cfd;
|
|
} else {
|
|
return nullptr;
|
|
}
|
|
}
|
|
|
|
uint32_t ColumnFamilySet::GetNextColumnFamilyID() {
|
|
return ++max_column_family_;
|
|
}
|
|
|
|
uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; }
|
|
|
|
void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) {
|
|
max_column_family_ = std::max(new_max_column_family, max_column_family_);
|
|
}
|
|
|
|
size_t ColumnFamilySet::NumberOfColumnFamilies() const {
|
|
return column_families_.size();
|
|
}
|
|
|
|
// under a DB mutex AND write thread
|
|
ColumnFamilyData* ColumnFamilySet::CreateColumnFamily(
|
|
const std::string& name, uint32_t id, Version* dummy_versions,
|
|
const ColumnFamilyOptions& options) {
|
|
assert(column_families_.find(name) == column_families_.end());
|
|
ColumnFamilyData* new_cfd = new ColumnFamilyData(
|
|
id, name, dummy_versions, table_cache_, write_buffer_manager_, options,
|
|
*db_options_, env_options_, this);
|
|
column_families_.insert({name, id});
|
|
column_family_data_.insert({id, new_cfd});
|
|
max_column_family_ = std::max(max_column_family_, id);
|
|
// add to linked list
|
|
new_cfd->next_ = dummy_cfd_;
|
|
auto prev = dummy_cfd_->prev_;
|
|
new_cfd->prev_ = prev;
|
|
prev->next_ = new_cfd;
|
|
dummy_cfd_->prev_ = new_cfd;
|
|
if (id == 0) {
|
|
default_cfd_cache_ = new_cfd;
|
|
}
|
|
return new_cfd;
|
|
}
|
|
|
|
// REQUIRES: DB mutex held
|
|
void ColumnFamilySet::FreeDeadColumnFamilies() {
|
|
autovector<ColumnFamilyData*> to_delete;
|
|
for (auto cfd = dummy_cfd_->next_; cfd != dummy_cfd_; cfd = cfd->next_) {
|
|
if (cfd->refs_.load(std::memory_order_relaxed) == 0) {
|
|
to_delete.push_back(cfd);
|
|
}
|
|
}
|
|
for (auto cfd : to_delete) {
|
|
// this is very rare, so it's not a problem that we do it under a mutex
|
|
delete cfd;
|
|
}
|
|
}
|
|
|
|
// under a DB mutex AND from a write thread
|
|
void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) {
|
|
auto cfd_iter = column_family_data_.find(cfd->GetID());
|
|
assert(cfd_iter != column_family_data_.end());
|
|
column_family_data_.erase(cfd_iter);
|
|
column_families_.erase(cfd->GetName());
|
|
}
|
|
|
|
// under a DB mutex OR from a write thread
|
|
bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) {
|
|
if (column_family_id == 0) {
|
|
// optimization for common case
|
|
current_ = column_family_set_->GetDefault();
|
|
} else {
|
|
current_ = column_family_set_->GetColumnFamily(column_family_id);
|
|
}
|
|
handle_.SetCFD(current_);
|
|
return current_ != nullptr;
|
|
}
|
|
|
|
uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const {
|
|
assert(current_ != nullptr);
|
|
return current_->GetLogNumber();
|
|
}
|
|
|
|
MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const {
|
|
assert(current_ != nullptr);
|
|
return current_->mem();
|
|
}
|
|
|
|
ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() {
|
|
assert(current_ != nullptr);
|
|
return &handle_;
|
|
}
|
|
|
|
uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
|
|
uint32_t column_family_id = 0;
|
|
if (column_family != nullptr) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
column_family_id = cfh->GetID();
|
|
}
|
|
return column_family_id;
|
|
}
|
|
|
|
const Comparator* GetColumnFamilyUserComparator(
|
|
ColumnFamilyHandle* column_family) {
|
|
if (column_family != nullptr) {
|
|
return column_family->GetComparator();
|
|
}
|
|
return nullptr;
|
|
}
|
|
|
|
} // namespace rocksdb
|