320d9a8e8a
Summary: The patch replaces `std::map` with a sorted `std::vector` for `VersionStorageInfo::blob_files_` and preallocates the space for the `vector` before saving the `BlobFileMetaData` into the new `VersionStorageInfo` in `VersionBuilder::Rep::SaveBlobFilesTo`. These changes reduce the time the DB mutex is held while saving new `Version`s, and using a sorted `vector` also makes lookups faster thanks to better memory locality. In addition, the patch introduces helper methods `VersionStorageInfo::GetBlobFileMetaData` and `VersionStorageInfo::GetBlobFileMetaDataLB` that can be used by clients to perform lookups in the `vector`, and does some general cleanup in the parts of code where blob file metadata are used. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9526 Test Plan: Ran `make check` and the crash test script for a while. Performance was tested using a load-optimized benchmark (`fillseq` with vector memtable, no WAL) and small file sizes so that a significant number of files are produced: ``` numactl --interleave=all ./db_bench --benchmarks=fillseq --allow_concurrent_memtable_write=false --level0_file_num_compaction_trigger=4 --level0_slowdown_writes_trigger=20 --level0_stop_writes_trigger=30 --max_background_jobs=8 --max_write_buffer_number=8 --db=/data/ltamasi-dbbench --wal_dir=/data/ltamasi-dbbench --num=800000000 --num_levels=8 --key_size=20 --value_size=400 --block_size=8192 --cache_size=51539607552 --cache_numshardbits=6 --compression_max_dict_bytes=0 --compression_ratio=0.5 --compression_type=lz4 --bytes_per_sync=8388608 --cache_index_and_filter_blocks=1 --cache_high_pri_pool_ratio=0.5 --benchmark_write_rate_limit=0 --write_buffer_size=16777216 --target_file_size_base=16777216 --max_bytes_for_level_base=67108864 --verify_checksum=1 --delete_obsolete_files_period_micros=62914560 --max_bytes_for_level_multiplier=8 --statistics=0 --stats_per_interval=1 --stats_interval_seconds=20 --histogram=1 --memtablerep=skip_list --bloom_bits=10 --open_files=-1 --subcompactions=1 --compaction_style=0 --min_level_to_compress=3 --level_compaction_dynamic_level_bytes=true --pin_l0_filter_and_index_blocks_in_cache=1 --soft_pending_compaction_bytes_limit=167503724544 --hard_pending_compaction_bytes_limit=335007449088 --min_level_to_compress=0 --use_existing_db=0 --sync=0 --threads=1 --memtablerep=vector --allow_concurrent_memtable_write=false --disable_wal=1 --enable_blob_files=1 --blob_file_size=16777216 --min_blob_size=0 --blob_compression_type=lz4 --enable_blob_garbage_collection=1 --seed=<some value> ``` Final statistics before the patch: ``` Cumulative writes: 0 writes, 700M keys, 0 commit groups, 0.0 writes per commit group, ingest: 284.62 GB, 121.27 MB/s Interval writes: 0 writes, 334K keys, 0 commit groups, 0.0 writes per commit group, ingest: 139.28 MB, 72.46 MB/s ``` With the patch: ``` Cumulative writes: 0 writes, 760M keys, 0 commit groups, 0.0 writes per commit group, ingest: 308.66 GB, 131.52 MB/s Interval writes: 0 writes, 445K keys, 0 commit groups, 0.0 writes per commit group, ingest: 185.35 MB, 93.15 MB/s ``` Total time to complete the benchmark is 2611 seconds with the patch, down from 2986 secs. Reviewed By: riversand963 Differential Revision: D34082728 Pulled By: ltamasi fbshipit-source-id: fc598abf676dce436734d06bb9d2d99a26a004fc
321 lines
9.3 KiB
C++
321 lines
9.3 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#ifndef NDEBUG
|
|
|
|
#include "db/column_family.h"
|
|
#include "db/db_impl/db_impl.h"
|
|
#include "db/error_handler.h"
|
|
#include "db/periodic_work_scheduler.h"
|
|
#include "monitoring/thread_status_updater.h"
|
|
#include "util/cast_util.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
uint64_t DBImpl::TEST_GetLevel0TotalSize() {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
|
|
}
|
|
|
|
Status DBImpl::TEST_SwitchWAL() {
|
|
WriteContext write_context;
|
|
InstrumentedMutexLock l(&mutex_);
|
|
void* writer = TEST_BeginWrite();
|
|
auto s = SwitchWAL(&write_context);
|
|
TEST_EndWrite(writer);
|
|
return s;
|
|
}
|
|
|
|
bool DBImpl::TEST_WALBufferIsEmpty(bool lock) {
|
|
if (lock) {
|
|
log_write_mutex_.Lock();
|
|
}
|
|
log::Writer* cur_log_writer = logs_.back().writer;
|
|
auto res = cur_log_writer->TEST_BufferIsEmpty();
|
|
if (lock) {
|
|
log_write_mutex_.Unlock();
|
|
}
|
|
return res;
|
|
}
|
|
|
|
uint64_t DBImpl::TEST_MaxNextLevelOverlappingBytes(
|
|
ColumnFamilyHandle* column_family) {
|
|
ColumnFamilyData* cfd;
|
|
if (column_family == nullptr) {
|
|
cfd = default_cf_handle_->cfd();
|
|
} else {
|
|
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
|
cfd = cfh->cfd();
|
|
}
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return cfd->current()->storage_info()->MaxNextLevelOverlappingBytes();
|
|
}
|
|
|
|
void DBImpl::TEST_GetFilesMetaData(
|
|
ColumnFamilyHandle* column_family,
|
|
std::vector<std::vector<FileMetaData>>* metadata,
|
|
std::vector<std::shared_ptr<BlobFileMetaData>>* blob_metadata) {
|
|
assert(metadata);
|
|
|
|
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
|
assert(cfh);
|
|
|
|
auto cfd = cfh->cfd();
|
|
assert(cfd);
|
|
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
const auto* current = cfd->current();
|
|
assert(current);
|
|
|
|
const auto* vstorage = current->storage_info();
|
|
assert(vstorage);
|
|
|
|
metadata->resize(NumberLevels());
|
|
|
|
for (int level = 0; level < NumberLevels(); ++level) {
|
|
const std::vector<FileMetaData*>& files = vstorage->LevelFiles(level);
|
|
|
|
(*metadata)[level].clear();
|
|
(*metadata)[level].reserve(files.size());
|
|
|
|
for (const auto& f : files) {
|
|
(*metadata)[level].push_back(*f);
|
|
}
|
|
}
|
|
|
|
if (blob_metadata) {
|
|
*blob_metadata = vstorage->GetBlobFiles();
|
|
}
|
|
}
|
|
|
|
uint64_t DBImpl::TEST_Current_Manifest_FileNo() {
|
|
return versions_->manifest_file_number();
|
|
}
|
|
|
|
uint64_t DBImpl::TEST_Current_Next_FileNo() {
|
|
return versions_->current_next_file_number();
|
|
}
|
|
|
|
Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
|
|
const Slice* end,
|
|
ColumnFamilyHandle* column_family,
|
|
bool disallow_trivial_move) {
|
|
ColumnFamilyData* cfd;
|
|
if (column_family == nullptr) {
|
|
cfd = default_cf_handle_->cfd();
|
|
} else {
|
|
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
|
cfd = cfh->cfd();
|
|
}
|
|
int output_level =
|
|
(cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
|
|
cfd->ioptions()->compaction_style == kCompactionStyleFIFO)
|
|
? level
|
|
: level + 1;
|
|
return RunManualCompaction(cfd, level, output_level, CompactRangeOptions(),
|
|
begin, end, true, disallow_trivial_move,
|
|
port::kMaxUint64 /*max_file_num_to_ignore*/);
|
|
}
|
|
|
|
Status DBImpl::TEST_SwitchMemtable(ColumnFamilyData* cfd) {
|
|
WriteContext write_context;
|
|
InstrumentedMutexLock l(&mutex_);
|
|
if (cfd == nullptr) {
|
|
cfd = default_cf_handle_->cfd();
|
|
}
|
|
|
|
Status s;
|
|
void* writer = TEST_BeginWrite();
|
|
if (two_write_queues_) {
|
|
WriteThread::Writer nonmem_w;
|
|
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
|
|
s = SwitchMemtable(cfd, &write_context);
|
|
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
|
|
} else {
|
|
s = SwitchMemtable(cfd, &write_context);
|
|
}
|
|
TEST_EndWrite(writer);
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::TEST_FlushMemTable(bool wait, bool allow_write_stall,
|
|
ColumnFamilyHandle* cfh) {
|
|
FlushOptions fo;
|
|
fo.wait = wait;
|
|
fo.allow_write_stall = allow_write_stall;
|
|
ColumnFamilyData* cfd;
|
|
if (cfh == nullptr) {
|
|
cfd = default_cf_handle_->cfd();
|
|
} else {
|
|
auto cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(cfh);
|
|
cfd = cfhi->cfd();
|
|
}
|
|
return FlushMemTable(cfd, fo, FlushReason::kTest);
|
|
}
|
|
|
|
Status DBImpl::TEST_FlushMemTable(ColumnFamilyData* cfd,
|
|
const FlushOptions& flush_opts) {
|
|
return FlushMemTable(cfd, flush_opts, FlushReason::kTest);
|
|
}
|
|
|
|
Status DBImpl::TEST_AtomicFlushMemTables(
|
|
const autovector<ColumnFamilyData*>& cfds, const FlushOptions& flush_opts) {
|
|
return AtomicFlushMemTables(cfds, flush_opts, FlushReason::kTest);
|
|
}
|
|
|
|
Status DBImpl::TEST_WaitForBackgroundWork() {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
WaitForBackgroundWork();
|
|
return error_handler_.GetBGError();
|
|
}
|
|
|
|
Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
|
|
ColumnFamilyData* cfd;
|
|
if (column_family == nullptr) {
|
|
cfd = default_cf_handle_->cfd();
|
|
} else {
|
|
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
|
cfd = cfh->cfd();
|
|
}
|
|
return WaitForFlushMemTable(cfd, nullptr, false);
|
|
}
|
|
|
|
Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
|
|
// Wait until the compaction completes
|
|
return WaitForCompact(wait_unscheduled);
|
|
}
|
|
|
|
Status DBImpl::TEST_WaitForPurge() {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
while (bg_purge_scheduled_ && error_handler_.GetBGError().ok()) {
|
|
bg_cv_.Wait();
|
|
}
|
|
return error_handler_.GetBGError();
|
|
}
|
|
|
|
Status DBImpl::TEST_GetBGError() {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return error_handler_.GetBGError();
|
|
}
|
|
|
|
void DBImpl::TEST_LockMutex() { mutex_.Lock(); }
|
|
|
|
void DBImpl::TEST_UnlockMutex() { mutex_.Unlock(); }
|
|
|
|
void* DBImpl::TEST_BeginWrite() {
|
|
auto w = new WriteThread::Writer();
|
|
write_thread_.EnterUnbatched(w, &mutex_);
|
|
return reinterpret_cast<void*>(w);
|
|
}
|
|
|
|
void DBImpl::TEST_EndWrite(void* w) {
|
|
auto writer = reinterpret_cast<WriteThread::Writer*>(w);
|
|
write_thread_.ExitUnbatched(writer);
|
|
delete writer;
|
|
}
|
|
|
|
size_t DBImpl::TEST_LogsToFreeSize() {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return logs_to_free_.size();
|
|
}
|
|
|
|
uint64_t DBImpl::TEST_LogfileNumber() {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return logfile_number_;
|
|
}
|
|
|
|
Status DBImpl::TEST_GetAllImmutableCFOptions(
|
|
std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map) {
|
|
std::vector<std::string> cf_names;
|
|
std::vector<const ImmutableCFOptions*> iopts;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
cf_names.push_back(cfd->GetName());
|
|
iopts.push_back(cfd->ioptions());
|
|
}
|
|
}
|
|
iopts_map->clear();
|
|
for (size_t i = 0; i < cf_names.size(); ++i) {
|
|
iopts_map->insert({cf_names[i], iopts[i]});
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {
|
|
return logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
|
|
}
|
|
|
|
size_t DBImpl::TEST_PreparedSectionCompletedSize() {
|
|
return logs_with_prep_tracker_.TEST_PreparedSectionCompletedSize();
|
|
}
|
|
|
|
size_t DBImpl::TEST_LogsWithPrepSize() {
|
|
return logs_with_prep_tracker_.TEST_LogsWithPrepSize();
|
|
}
|
|
|
|
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
|
|
autovector<MemTable*> empty_list;
|
|
return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr,
|
|
empty_list);
|
|
}
|
|
|
|
Status DBImpl::TEST_GetLatestMutableCFOptions(
|
|
ColumnFamilyHandle* column_family, MutableCFOptions* mutable_cf_options) {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
|
|
*mutable_cf_options = *cfh->cfd()->GetLatestMutableCFOptions();
|
|
return Status::OK();
|
|
}
|
|
|
|
int DBImpl::TEST_BGCompactionsAllowed() const {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return GetBGJobLimits().max_compactions;
|
|
}
|
|
|
|
int DBImpl::TEST_BGFlushesAllowed() const {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return GetBGJobLimits().max_flushes;
|
|
}
|
|
|
|
SequenceNumber DBImpl::TEST_GetLastVisibleSequence() const {
|
|
if (last_seq_same_as_publish_seq_) {
|
|
return versions_->LastSequence();
|
|
} else {
|
|
return versions_->LastAllocatedSequence();
|
|
}
|
|
}
|
|
|
|
size_t DBImpl::TEST_GetWalPreallocateBlockSize(
|
|
uint64_t write_buffer_size) const {
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return GetWalPreallocateBlockSize(write_buffer_size);
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
void DBImpl::TEST_WaitForStatsDumpRun(std::function<void()> callback) const {
|
|
if (periodic_work_scheduler_ != nullptr) {
|
|
static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_)
|
|
->TEST_WaitForRun(callback);
|
|
}
|
|
}
|
|
|
|
PeriodicWorkTestScheduler* DBImpl::TEST_GetPeriodicWorkScheduler() const {
|
|
return static_cast<PeriodicWorkTestScheduler*>(periodic_work_scheduler_);
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
size_t DBImpl::TEST_EstimateInMemoryStatsHistorySize() const {
|
|
return EstimateInMemoryStatsHistorySize();
|
|
}
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
#endif // NDEBUG
|