Charge file metadata memory
This commit is contained in:
parent
6a51af16b3
commit
594415d7b5
@ -10,6 +10,8 @@
|
|||||||
### New Features
|
### New Features
|
||||||
* DB::GetLiveFilesStorageInfo is ready for production use.
|
* DB::GetLiveFilesStorageInfo is ready for production use.
|
||||||
* Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion.
|
* Add new stats PREFETCHED_BYTES_DISCARDED which records number of prefetched bytes discarded by RocksDB FilePrefetchBuffer on destruction and POLL_WAIT_MICROS records wait time for FS::Poll API completion.
|
||||||
|
* Track memory usage of file metadata created for newly added files to Version. If such file metadata's memory exceeds the avaible space left in the block
|
||||||
|
cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, set `BlockBasedTableOptions::cache_usage_options.options_overrides.insert({CacheEntryRole::kFileMetadata, {/*.charged = */CacheEntryRoleOptions::Decision::kEnabled}})`.
|
||||||
|
|
||||||
### Public API changes
|
### Public API changes
|
||||||
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.
|
* Add rollback_deletion_type_callback to TransactionDBOptions so that write-prepared transactions know whether to issue a Delete or SingleDelete to cancel a previous key written during prior prepare phase. The PR aims to prevent mixing SingleDeletes and Deletes for the same key that can lead to undefined behaviors for write-prepared transactions.
|
||||||
|
2
cache/cache_entry_roles.cc
vendored
2
cache/cache_entry_roles.cc
vendored
@ -22,6 +22,7 @@ std::array<std::string, kNumCacheEntryRoles> kCacheEntryRoleToCamelString{{
|
|||||||
"CompressionDictionaryBuildingBuffer",
|
"CompressionDictionaryBuildingBuffer",
|
||||||
"FilterConstruction",
|
"FilterConstruction",
|
||||||
"BlockBasedTableReader",
|
"BlockBasedTableReader",
|
||||||
|
"FileMetadata",
|
||||||
"Misc",
|
"Misc",
|
||||||
}};
|
}};
|
||||||
|
|
||||||
@ -36,6 +37,7 @@ std::array<std::string, kNumCacheEntryRoles> kCacheEntryRoleToHyphenString{{
|
|||||||
"compression-dictionary-building-buffer",
|
"compression-dictionary-building-buffer",
|
||||||
"filter-construction",
|
"filter-construction",
|
||||||
"block-based-table-reader",
|
"block-based-table-reader",
|
||||||
|
"file-metadata",
|
||||||
"misc",
|
"misc",
|
||||||
}};
|
}};
|
||||||
|
|
||||||
|
1
cache/cache_reservation_manager.cc
vendored
1
cache/cache_reservation_manager.cc
vendored
@ -180,4 +180,5 @@ template class CacheReservationManagerImpl<
|
|||||||
template class CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>;
|
template class CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>;
|
||||||
template class CacheReservationManagerImpl<CacheEntryRole::kMisc>;
|
template class CacheReservationManagerImpl<CacheEntryRole::kMisc>;
|
||||||
template class CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>;
|
template class CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>;
|
||||||
|
template class CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>;
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
24
cache/cache_reservation_manager.h
vendored
24
cache/cache_reservation_manager.h
vendored
@ -36,6 +36,8 @@ class CacheReservationManager {
|
|||||||
};
|
};
|
||||||
virtual ~CacheReservationManager() {}
|
virtual ~CacheReservationManager() {}
|
||||||
virtual Status UpdateCacheReservation(std::size_t new_memory_used) = 0;
|
virtual Status UpdateCacheReservation(std::size_t new_memory_used) = 0;
|
||||||
|
virtual Status UpdateCacheReservation(std::size_t memory_used_delta,
|
||||||
|
bool increase) = 0;
|
||||||
virtual Status MakeCacheReservation(
|
virtual Status MakeCacheReservation(
|
||||||
std::size_t incremental_memory_used,
|
std::size_t incremental_memory_used,
|
||||||
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
|
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
|
||||||
@ -128,6 +130,11 @@ class CacheReservationManagerImpl
|
|||||||
// On keeping dummy entries the same, it always returns Status::OK().
|
// On keeping dummy entries the same, it always returns Status::OK().
|
||||||
Status UpdateCacheReservation(std::size_t new_memory_used) override;
|
Status UpdateCacheReservation(std::size_t new_memory_used) override;
|
||||||
|
|
||||||
|
Status UpdateCacheReservation(std::size_t /* memory_used_delta */,
|
||||||
|
bool /* increase */) override {
|
||||||
|
return Status::NotSupported();
|
||||||
|
}
|
||||||
|
|
||||||
// One of the two ways of reserving cache space and releasing is done through
|
// One of the two ways of reserving cache space and releasing is done through
|
||||||
// destruction of CacheReservationHandle.
|
// destruction of CacheReservationHandle.
|
||||||
// See UpdateCacheReservation() for the other way.
|
// See UpdateCacheReservation() for the other way.
|
||||||
@ -254,6 +261,23 @@ class ConcurrentCacheReservationManager
|
|||||||
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
|
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
|
||||||
return cache_res_mgr_->UpdateCacheReservation(new_memory_used);
|
return cache_res_mgr_->UpdateCacheReservation(new_memory_used);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline Status UpdateCacheReservation(std::size_t memory_used_delta,
|
||||||
|
bool increase) override {
|
||||||
|
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
|
||||||
|
std::size_t total_mem_used = cache_res_mgr_->GetTotalMemoryUsed();
|
||||||
|
Status s;
|
||||||
|
if (!increase) {
|
||||||
|
assert(total_mem_used >= memory_used_delta);
|
||||||
|
s = cache_res_mgr_->UpdateCacheReservation(total_mem_used -
|
||||||
|
memory_used_delta);
|
||||||
|
} else {
|
||||||
|
s = cache_res_mgr_->UpdateCacheReservation(total_mem_used +
|
||||||
|
memory_used_delta);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
inline Status MakeCacheReservation(
|
inline Status MakeCacheReservation(
|
||||||
std::size_t incremental_memory_used,
|
std::size_t incremental_memory_used,
|
||||||
std::unique_ptr<CacheReservationManager::CacheReservationHandle> *handle)
|
std::unique_ptr<CacheReservationManager::CacheReservationHandle> *handle)
|
||||||
|
@ -613,6 +613,23 @@ ColumnFamilyData::ColumnFamilyData(
|
|||||||
}
|
}
|
||||||
|
|
||||||
RecalculateWriteStallConditions(mutable_cf_options_);
|
RecalculateWriteStallConditions(mutable_cf_options_);
|
||||||
|
|
||||||
|
if (cf_options.table_factory->IsInstanceOf(
|
||||||
|
TableFactory::kBlockBasedTableName()) &&
|
||||||
|
cf_options.table_factory->GetOptions<BlockBasedTableOptions>()) {
|
||||||
|
const BlockBasedTableOptions* bbto =
|
||||||
|
cf_options.table_factory->GetOptions<BlockBasedTableOptions>();
|
||||||
|
const auto& options_overrides = bbto->cache_usage_options.options_overrides;
|
||||||
|
const auto file_metadata_charged =
|
||||||
|
options_overrides.at(CacheEntryRole::kFileMetadata).charged;
|
||||||
|
if (bbto->block_cache &&
|
||||||
|
file_metadata_charged == CacheEntryRoleOptions::Decision::kEnabled) {
|
||||||
|
file_metadata_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
|
||||||
|
std::make_shared<
|
||||||
|
CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>(
|
||||||
|
bbto->block_cache)));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DB mutex held
|
// DB mutex held
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include "cache/cache_reservation_manager.h"
|
||||||
#include "db/memtable_list.h"
|
#include "db/memtable_list.h"
|
||||||
#include "db/table_cache.h"
|
#include "db/table_cache.h"
|
||||||
#include "db/table_properties_collector.h"
|
#include "db/table_properties_collector.h"
|
||||||
@ -520,6 +521,10 @@ class ColumnFamilyData {
|
|||||||
|
|
||||||
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
|
ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); }
|
||||||
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
|
WriteBufferManager* write_buffer_mgr() { return write_buffer_manager_; }
|
||||||
|
std::shared_ptr<CacheReservationManager>
|
||||||
|
GetFileMetadataCacheReservationManager() {
|
||||||
|
return file_metadata_cache_res_mgr_;
|
||||||
|
}
|
||||||
|
|
||||||
static const uint32_t kDummyColumnFamilyDataId;
|
static const uint32_t kDummyColumnFamilyDataId;
|
||||||
|
|
||||||
@ -618,6 +623,8 @@ class ColumnFamilyData {
|
|||||||
bool db_paths_registered_;
|
bool db_paths_registered_;
|
||||||
|
|
||||||
std::string full_history_ts_low_;
|
std::string full_history_ts_low_;
|
||||||
|
|
||||||
|
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// ColumnFamilySet has interesting thread-safety requirements
|
// ColumnFamilySet has interesting thread-safety requirements
|
||||||
|
@ -1740,5 +1740,6 @@ template class TargetCacheChargeTrackingCache<
|
|||||||
CacheEntryRole::kFilterConstruction>;
|
CacheEntryRole::kFilterConstruction>;
|
||||||
template class TargetCacheChargeTrackingCache<
|
template class TargetCacheChargeTrackingCache<
|
||||||
CacheEntryRole::kBlockBasedTableReader>;
|
CacheEntryRole::kBlockBasedTableReader>;
|
||||||
|
template class TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>;
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
@ -320,7 +320,7 @@ class InternalKey {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Slice user_key() const { return ExtractUserKey(rep_); }
|
Slice user_key() const { return ExtractUserKey(rep_); }
|
||||||
size_t size() { return rep_.size(); }
|
size_t size() const { return rep_.size(); }
|
||||||
|
|
||||||
void Set(const Slice& _user_key, SequenceNumber s, ValueType t) {
|
void Set(const Slice& _user_key, SequenceNumber s, ValueType t) {
|
||||||
SetFrom(ParsedInternalKey(_user_key, s, t));
|
SetFrom(ParsedInternalKey(_user_key, s, t));
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include "cache/cache_reservation_manager.h"
|
||||||
#include "db/blob/blob_file_meta.h"
|
#include "db/blob/blob_file_meta.h"
|
||||||
#include "db/dbformat.h"
|
#include "db/dbformat.h"
|
||||||
#include "db/internal_stats.h"
|
#include "db/internal_stats.h"
|
||||||
@ -255,10 +256,13 @@ class VersionBuilder::Rep {
|
|||||||
// version edits.
|
// version edits.
|
||||||
std::map<uint64_t, MutableBlobFileMetaData> mutable_blob_file_metas_;
|
std::map<uint64_t, MutableBlobFileMetaData> mutable_blob_file_metas_;
|
||||||
|
|
||||||
|
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
|
Rep(const FileOptions& file_options, const ImmutableCFOptions* ioptions,
|
||||||
TableCache* table_cache, VersionStorageInfo* base_vstorage,
|
TableCache* table_cache, VersionStorageInfo* base_vstorage,
|
||||||
VersionSet* version_set)
|
VersionSet* version_set,
|
||||||
|
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr)
|
||||||
: file_options_(file_options),
|
: file_options_(file_options),
|
||||||
ioptions_(ioptions),
|
ioptions_(ioptions),
|
||||||
table_cache_(table_cache),
|
table_cache_(table_cache),
|
||||||
@ -266,7 +270,8 @@ class VersionBuilder::Rep {
|
|||||||
version_set_(version_set),
|
version_set_(version_set),
|
||||||
num_levels_(base_vstorage->num_levels()),
|
num_levels_(base_vstorage->num_levels()),
|
||||||
has_invalid_levels_(false),
|
has_invalid_levels_(false),
|
||||||
level_nonzero_cmp_(base_vstorage_->InternalComparator()) {
|
level_nonzero_cmp_(base_vstorage_->InternalComparator()),
|
||||||
|
file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr) {
|
||||||
assert(ioptions_);
|
assert(ioptions_);
|
||||||
|
|
||||||
levels_ = new LevelState[num_levels_];
|
levels_ = new LevelState[num_levels_];
|
||||||
@ -291,6 +296,12 @@ class VersionBuilder::Rep {
|
|||||||
table_cache_->ReleaseHandle(f->table_reader_handle);
|
table_cache_->ReleaseHandle(f->table_reader_handle);
|
||||||
f->table_reader_handle = nullptr;
|
f->table_reader_handle = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (file_metadata_cache_res_mgr_) {
|
||||||
|
Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
|
||||||
|
f->ApproximateMemoryUsage(), false /* increase */);
|
||||||
|
s.PermitUncheckedError();
|
||||||
|
}
|
||||||
delete f;
|
delete f;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -763,6 +774,22 @@ class VersionBuilder::Rep {
|
|||||||
FileMetaData* const f = new FileMetaData(meta);
|
FileMetaData* const f = new FileMetaData(meta);
|
||||||
f->refs = 1;
|
f->refs = 1;
|
||||||
|
|
||||||
|
if (file_metadata_cache_res_mgr_) {
|
||||||
|
Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
|
||||||
|
f->ApproximateMemoryUsage(), true /* increase */);
|
||||||
|
if (!s.ok()) {
|
||||||
|
delete f;
|
||||||
|
s = Status::MemoryLimit(
|
||||||
|
"Can't allocate " +
|
||||||
|
kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
|
||||||
|
CacheEntryRole::kFileMetadata)] +
|
||||||
|
" due to exceeding the memory limit "
|
||||||
|
"based on "
|
||||||
|
"cache capacity");
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto& add_files = level_state.added_files;
|
auto& add_files = level_state.added_files;
|
||||||
assert(add_files.find(file_number) == add_files.end());
|
assert(add_files.find(file_number) == add_files.end());
|
||||||
add_files.emplace(file_number, f);
|
add_files.emplace(file_number, f);
|
||||||
@ -1239,13 +1266,13 @@ class VersionBuilder::Rep {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
VersionBuilder::VersionBuilder(const FileOptions& file_options,
|
VersionBuilder::VersionBuilder(
|
||||||
const ImmutableCFOptions* ioptions,
|
const FileOptions& file_options, const ImmutableCFOptions* ioptions,
|
||||||
TableCache* table_cache,
|
TableCache* table_cache, VersionStorageInfo* base_vstorage,
|
||||||
VersionStorageInfo* base_vstorage,
|
VersionSet* version_set,
|
||||||
VersionSet* version_set)
|
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr)
|
||||||
: rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
|
: rep_(new Rep(file_options, ioptions, table_cache, base_vstorage,
|
||||||
version_set)) {}
|
version_set, file_metadata_cache_res_mgr)) {}
|
||||||
|
|
||||||
VersionBuilder::~VersionBuilder() = default;
|
VersionBuilder::~VersionBuilder() = default;
|
||||||
|
|
||||||
@ -1280,7 +1307,8 @@ BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
|
|||||||
: version_builder_(new VersionBuilder(
|
: version_builder_(new VersionBuilder(
|
||||||
cfd->current()->version_set()->file_options(), cfd->ioptions(),
|
cfd->current()->version_set()->file_options(), cfd->ioptions(),
|
||||||
cfd->table_cache(), cfd->current()->storage_info(),
|
cfd->table_cache(), cfd->current()->storage_info(),
|
||||||
cfd->current()->version_set())),
|
cfd->current()->version_set(),
|
||||||
|
cfd->GetFileMetadataCacheReservationManager())),
|
||||||
version_(cfd->current()) {
|
version_(cfd->current()) {
|
||||||
version_->Ref();
|
version_->Ref();
|
||||||
}
|
}
|
||||||
@ -1289,7 +1317,8 @@ BaseReferencedVersionBuilder::BaseReferencedVersionBuilder(
|
|||||||
ColumnFamilyData* cfd, Version* v)
|
ColumnFamilyData* cfd, Version* v)
|
||||||
: version_builder_(new VersionBuilder(
|
: version_builder_(new VersionBuilder(
|
||||||
cfd->current()->version_set()->file_options(), cfd->ioptions(),
|
cfd->current()->version_set()->file_options(), cfd->ioptions(),
|
||||||
cfd->table_cache(), v->storage_info(), v->version_set())),
|
cfd->table_cache(), v->storage_info(), v->version_set(),
|
||||||
|
cfd->GetFileMetadataCacheReservationManager())),
|
||||||
version_(v) {
|
version_(v) {
|
||||||
assert(version_ != cfd->current());
|
assert(version_ != cfd->current());
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,7 @@ class InternalStats;
|
|||||||
class Version;
|
class Version;
|
||||||
class VersionSet;
|
class VersionSet;
|
||||||
class ColumnFamilyData;
|
class ColumnFamilyData;
|
||||||
|
class CacheReservationManager;
|
||||||
|
|
||||||
// A helper class so we can efficiently apply a whole sequence
|
// A helper class so we can efficiently apply a whole sequence
|
||||||
// of edits to a particular state without creating intermediate
|
// of edits to a particular state without creating intermediate
|
||||||
@ -33,7 +34,9 @@ class VersionBuilder {
|
|||||||
public:
|
public:
|
||||||
VersionBuilder(const FileOptions& file_options,
|
VersionBuilder(const FileOptions& file_options,
|
||||||
const ImmutableCFOptions* ioptions, TableCache* table_cache,
|
const ImmutableCFOptions* ioptions, TableCache* table_cache,
|
||||||
VersionStorageInfo* base_vstorage, VersionSet* version_set);
|
VersionStorageInfo* base_vstorage, VersionSet* version_set,
|
||||||
|
std::shared_ptr<CacheReservationManager>
|
||||||
|
file_metadata_cache_res_mgr = nullptr);
|
||||||
~VersionBuilder();
|
~VersionBuilder();
|
||||||
|
|
||||||
bool CheckConsistencyForNumLevels();
|
bool CheckConsistencyForNumLevels();
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
#include "db/dbformat.h"
|
#include "db/dbformat.h"
|
||||||
#include "db/wal_edit.h"
|
#include "db/wal_edit.h"
|
||||||
#include "memory/arena.h"
|
#include "memory/arena.h"
|
||||||
|
#include "port/malloc.h"
|
||||||
#include "rocksdb/advanced_options.h"
|
#include "rocksdb/advanced_options.h"
|
||||||
#include "rocksdb/cache.h"
|
#include "rocksdb/cache.h"
|
||||||
#include "table/table_reader.h"
|
#include "table/table_reader.h"
|
||||||
@ -285,6 +286,19 @@ struct FileMetaData {
|
|||||||
}
|
}
|
||||||
return kUnknownFileCreationTime;
|
return kUnknownFileCreationTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t ApproximateMemoryUsage() const {
|
||||||
|
size_t usage = 0;
|
||||||
|
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
|
||||||
|
usage += malloc_usable_size(const_cast<FileMetaData*>(this));
|
||||||
|
#else
|
||||||
|
usage += sizeof(*this);
|
||||||
|
#endif // ROCKSDB_MALLOC_USABLE_SIZE
|
||||||
|
usage += smallest.size() + largest.size() + file_checksum.size() +
|
||||||
|
file_checksum_func_name.size() + min_timestamp.size() +
|
||||||
|
max_timestamp.size();
|
||||||
|
return usage;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// A compressed copy of file meta data that just contain minimum data needed
|
// A compressed copy of file meta data that just contain minimum data needed
|
||||||
|
@ -753,7 +753,8 @@ Version::~Version() {
|
|||||||
uint32_t path_id = f->fd.GetPathId();
|
uint32_t path_id = f->fd.GetPathId();
|
||||||
assert(path_id < cfd_->ioptions()->cf_paths.size());
|
assert(path_id < cfd_->ioptions()->cf_paths.size());
|
||||||
vset_->obsolete_files_.push_back(
|
vset_->obsolete_files_.push_back(
|
||||||
ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
|
ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path,
|
||||||
|
cfd_->GetFileMetadataCacheReservationManager()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -921,8 +921,12 @@ struct ObsoleteFileInfo {
|
|||||||
std::string path;
|
std::string path;
|
||||||
|
|
||||||
ObsoleteFileInfo() noexcept : metadata(nullptr) {}
|
ObsoleteFileInfo() noexcept : metadata(nullptr) {}
|
||||||
ObsoleteFileInfo(FileMetaData* f, const std::string& file_path)
|
ObsoleteFileInfo(FileMetaData* f, const std::string& file_path,
|
||||||
: metadata(f), path(file_path) {}
|
std::shared_ptr<CacheReservationManager>
|
||||||
|
file_metadata_cache_res_mgr = nullptr)
|
||||||
|
: metadata(f),
|
||||||
|
path(file_path),
|
||||||
|
file_metadata_cache_res_mgr_(file_metadata_cache_res_mgr) {}
|
||||||
|
|
||||||
ObsoleteFileInfo(const ObsoleteFileInfo&) = delete;
|
ObsoleteFileInfo(const ObsoleteFileInfo&) = delete;
|
||||||
ObsoleteFileInfo& operator=(const ObsoleteFileInfo&) = delete;
|
ObsoleteFileInfo& operator=(const ObsoleteFileInfo&) = delete;
|
||||||
@ -936,14 +940,24 @@ struct ObsoleteFileInfo {
|
|||||||
path = std::move(rhs.path);
|
path = std::move(rhs.path);
|
||||||
metadata = rhs.metadata;
|
metadata = rhs.metadata;
|
||||||
rhs.metadata = nullptr;
|
rhs.metadata = nullptr;
|
||||||
|
file_metadata_cache_res_mgr_ = rhs.file_metadata_cache_res_mgr_;
|
||||||
|
rhs.file_metadata_cache_res_mgr_ = nullptr;
|
||||||
|
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DeleteMetadata() {
|
void DeleteMetadata() {
|
||||||
|
if (file_metadata_cache_res_mgr_) {
|
||||||
|
Status s = file_metadata_cache_res_mgr_->UpdateCacheReservation(
|
||||||
|
metadata->ApproximateMemoryUsage(), false /* increase */);
|
||||||
|
s.PermitUncheckedError();
|
||||||
|
}
|
||||||
delete metadata;
|
delete metadata;
|
||||||
metadata = nullptr;
|
metadata = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::shared_ptr<CacheReservationManager> file_metadata_cache_res_mgr_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObsoleteBlobFileInfo {
|
class ObsoleteBlobFileInfo {
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
#include "db/db_impl/db_impl.h"
|
#include "db/db_impl/db_impl.h"
|
||||||
|
#include "db/db_test_util.h"
|
||||||
#include "db/log_writer.h"
|
#include "db/log_writer.h"
|
||||||
#include "rocksdb/advanced_options.h"
|
#include "rocksdb/advanced_options.h"
|
||||||
#include "rocksdb/convenience.h"
|
#include "rocksdb/convenience.h"
|
||||||
@ -3445,6 +3446,124 @@ TEST_F(VersionSetTestMissingFiles, MinLogNumberToKeep2PC) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class ChargeFileMetadataTest : public DBTestBase {
|
||||||
|
public:
|
||||||
|
ChargeFileMetadataTest()
|
||||||
|
: DBTestBase("charge_file_metadata_test", /*env_do_fsync=*/true) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
class ChargeFileMetadataTestWithParam
|
||||||
|
: public ChargeFileMetadataTest,
|
||||||
|
public testing::WithParamInterface<CacheEntryRoleOptions::Decision> {
|
||||||
|
public:
|
||||||
|
ChargeFileMetadataTestWithParam() {}
|
||||||
|
};
|
||||||
|
|
||||||
|
#ifndef ROCKSDB_LITE
|
||||||
|
INSTANTIATE_TEST_CASE_P(
|
||||||
|
ChargeFileMetadataTestWithParam, ChargeFileMetadataTestWithParam,
|
||||||
|
::testing::Values(CacheEntryRoleOptions::Decision::kEnabled,
|
||||||
|
CacheEntryRoleOptions::Decision::kDisabled));
|
||||||
|
|
||||||
|
TEST_P(ChargeFileMetadataTestWithParam, Basic) {
|
||||||
|
Options options;
|
||||||
|
std::shared_ptr<TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>
|
||||||
|
file_metadata_charge_only_cache = std::make_shared<
|
||||||
|
TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>(
|
||||||
|
NewLRUCache(
|
||||||
|
4 * CacheReservationManagerImpl<
|
||||||
|
CacheEntryRole::kFileMetadata>::GetDummyEntrySize(),
|
||||||
|
0 /* num_shard_bits */, true /* strict_capacity_limit */));
|
||||||
|
BlockBasedTableOptions table_options;
|
||||||
|
CacheEntryRoleOptions::Decision charge_file_metadata = GetParam();
|
||||||
|
table_options.cache_usage_options.options_overrides.insert(
|
||||||
|
{CacheEntryRole::kFileMetadata, {/*.charged = */ charge_file_metadata}});
|
||||||
|
table_options.block_cache = file_metadata_charge_only_cache;
|
||||||
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.disable_auto_compactions = true;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
|
||||||
|
// Create 128 file metadata, each of which is roughly 1024 bytes.
|
||||||
|
// This results in 1 *
|
||||||
|
// CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize()
|
||||||
|
// cache reservation for file metadata.
|
||||||
|
for (int i = 1; i <= 128; ++i) {
|
||||||
|
ASSERT_OK(Put(std::string(1024, 'a'), "va"));
|
||||||
|
ASSERT_OK(Put("b", "vb"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
}
|
||||||
|
if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
|
||||||
|
EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
|
||||||
|
1 * CacheReservationManagerImpl<
|
||||||
|
CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
|
||||||
|
|
||||||
|
} else {
|
||||||
|
EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create another 128 file metadata.
|
||||||
|
// This increases the file metadata cache reservation to 2 *
|
||||||
|
// CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize().
|
||||||
|
for (int i = 1; i <= 128; ++i) {
|
||||||
|
ASSERT_OK(Put(std::string(1024, 'a'), "vva"));
|
||||||
|
ASSERT_OK(Put("b", "vvb"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
}
|
||||||
|
if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
|
||||||
|
EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
|
||||||
|
2 * CacheReservationManagerImpl<
|
||||||
|
CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
|
||||||
|
} else {
|
||||||
|
EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0);
|
||||||
|
}
|
||||||
|
// Compaction will create 1 new file metadata, obsolete and delete all 256
|
||||||
|
// file metadata above. This results in 1 *
|
||||||
|
// CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>::GetDummyEntrySize()
|
||||||
|
// cache reservation for file metadata.
|
||||||
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
|
ASSERT_EQ("0,1", FilesPerLevel(0));
|
||||||
|
|
||||||
|
if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
|
||||||
|
EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
|
||||||
|
1 * CacheReservationManagerImpl<
|
||||||
|
CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
|
||||||
|
} else {
|
||||||
|
EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Destroying the db will delete the remaining 1 new file metadata
|
||||||
|
// This results in no cache reservation for file metadata.
|
||||||
|
Destroy(options);
|
||||||
|
EXPECT_EQ(file_metadata_charge_only_cache->GetCacheCharge(),
|
||||||
|
0 * CacheReservationManagerImpl<
|
||||||
|
CacheEntryRole::kFileMetadata>::GetDummyEntrySize());
|
||||||
|
|
||||||
|
// Reopen the db with a smaller cache in order to test failure in allocating
|
||||||
|
// file metadata based on memory limit based on cache capacity
|
||||||
|
file_metadata_charge_only_cache = std::make_shared<
|
||||||
|
TargetCacheChargeTrackingCache<CacheEntryRole::kFileMetadata>>(
|
||||||
|
NewLRUCache(1 * CacheReservationManagerImpl<
|
||||||
|
CacheEntryRole::kFileMetadata>::GetDummyEntrySize(),
|
||||||
|
0 /* num_shard_bits */, true /* strict_capacity_limit */));
|
||||||
|
table_options.block_cache = file_metadata_charge_only_cache;
|
||||||
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
||||||
|
Reopen(options);
|
||||||
|
ASSERT_OK(Put(std::string(1024, 'a'), "va"));
|
||||||
|
ASSERT_OK(Put("b", "vb"));
|
||||||
|
Status s = Flush();
|
||||||
|
if (charge_file_metadata == CacheEntryRoleOptions::Decision::kEnabled) {
|
||||||
|
EXPECT_TRUE(s.IsMemoryLimit());
|
||||||
|
EXPECT_TRUE(s.ToString().find(
|
||||||
|
kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
|
||||||
|
CacheEntryRole::kFileMetadata)]) != std::string::npos);
|
||||||
|
EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
|
||||||
|
std::string::npos);
|
||||||
|
} else {
|
||||||
|
EXPECT_TRUE(s.ok());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -138,6 +138,7 @@ DECLARE_bool(cache_index_and_filter_blocks);
|
|||||||
DECLARE_bool(charge_compression_dictionary_building_buffer);
|
DECLARE_bool(charge_compression_dictionary_building_buffer);
|
||||||
DECLARE_bool(charge_filter_construction);
|
DECLARE_bool(charge_filter_construction);
|
||||||
DECLARE_bool(charge_table_reader);
|
DECLARE_bool(charge_table_reader);
|
||||||
|
DECLARE_bool(charge_file_metadata);
|
||||||
DECLARE_int32(top_level_index_pinning);
|
DECLARE_int32(top_level_index_pinning);
|
||||||
DECLARE_int32(partition_pinning);
|
DECLARE_int32(partition_pinning);
|
||||||
DECLARE_int32(unpartitioned_pinning);
|
DECLARE_int32(unpartitioned_pinning);
|
||||||
|
@ -319,6 +319,11 @@ DEFINE_bool(charge_table_reader, false,
|
|||||||
"CacheEntryRoleOptions::charged of"
|
"CacheEntryRoleOptions::charged of"
|
||||||
"CacheEntryRole::kBlockBasedTableReader");
|
"CacheEntryRole::kBlockBasedTableReader");
|
||||||
|
|
||||||
|
DEFINE_bool(charge_file_metadata, false,
|
||||||
|
"Setting for "
|
||||||
|
"CacheEntryRoleOptions::charged of"
|
||||||
|
"kFileMetadata");
|
||||||
|
|
||||||
DEFINE_int32(
|
DEFINE_int32(
|
||||||
top_level_index_pinning,
|
top_level_index_pinning,
|
||||||
static_cast<int32_t>(ROCKSDB_NAMESPACE::PinningTier::kFallback),
|
static_cast<int32_t>(ROCKSDB_NAMESPACE::PinningTier::kFallback),
|
||||||
|
@ -2354,6 +2354,11 @@ void StressTest::Open(SharedState* shared) {
|
|||||||
{/*.charged = */ FLAGS_charge_table_reader
|
{/*.charged = */ FLAGS_charge_table_reader
|
||||||
? CacheEntryRoleOptions::Decision::kEnabled
|
? CacheEntryRoleOptions::Decision::kEnabled
|
||||||
: CacheEntryRoleOptions::Decision::kDisabled}});
|
: CacheEntryRoleOptions::Decision::kDisabled}});
|
||||||
|
block_based_options.cache_usage_options.options_overrides.insert(
|
||||||
|
{CacheEntryRole::kFileMetadata,
|
||||||
|
{/*.charged = */ FLAGS_charge_file_metadata
|
||||||
|
? CacheEntryRoleOptions::Decision::kEnabled
|
||||||
|
: CacheEntryRoleOptions::Decision::kDisabled}});
|
||||||
block_based_options.format_version =
|
block_based_options.format_version =
|
||||||
static_cast<uint32_t>(FLAGS_format_version);
|
static_cast<uint32_t>(FLAGS_format_version);
|
||||||
block_based_options.index_block_restart_interval =
|
block_based_options.index_block_restart_interval =
|
||||||
|
@ -9,8 +9,8 @@
|
|||||||
|
|
||||||
#ifdef GFLAGS
|
#ifdef GFLAGS
|
||||||
#include "db_stress_tool/db_stress_common.h"
|
#include "db_stress_tool/db_stress_common.h"
|
||||||
#include "utilities/fault_injection_fs.h"
|
|
||||||
#include "rocksdb/utilities/transaction_db.h"
|
#include "rocksdb/utilities/transaction_db.h"
|
||||||
|
#include "utilities/fault_injection_fs.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
class NonBatchedOpsStressTest : public StressTest {
|
class NonBatchedOpsStressTest : public StressTest {
|
||||||
|
@ -570,6 +570,9 @@ enum class CacheEntryRole {
|
|||||||
// BlockBasedTableReader's charge to account for
|
// BlockBasedTableReader's charge to account for
|
||||||
// its memory usage
|
// its memory usage
|
||||||
kBlockBasedTableReader,
|
kBlockBasedTableReader,
|
||||||
|
// FileMetadata's charge to account for
|
||||||
|
// its memory usage
|
||||||
|
kFileMetadata,
|
||||||
// Default bucket, for miscellaneous cache entries. Do not use for
|
// Default bucket, for miscellaneous cache entries. Do not use for
|
||||||
// entries that could potentially add up to large usage.
|
// entries that could potentially add up to large usage.
|
||||||
kMisc,
|
kMisc,
|
||||||
|
@ -370,7 +370,20 @@ struct BlockBasedTableOptions {
|
|||||||
// (iii) Compatible existing behavior:
|
// (iii) Compatible existing behavior:
|
||||||
// Same as kDisabled.
|
// Same as kDisabled.
|
||||||
//
|
//
|
||||||
// (d) Other CacheEntryRole
|
// (d) CacheEntryRole::kFileMetadata
|
||||||
|
// (i) If kEnabled:
|
||||||
|
// Charge memory usage of file metadata created
|
||||||
|
// for newly added files to Version.
|
||||||
|
// If such file metadata's
|
||||||
|
// memory exceeds the avaible space left in the block cache at some point
|
||||||
|
// (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` =
|
||||||
|
// true), creation will fail with Status::MemoryLimit().
|
||||||
|
// (ii) If kDisabled:
|
||||||
|
// Does not charge the memory usage mentioned above.
|
||||||
|
// (iii) Compatible existing behavior:
|
||||||
|
// Same as kDisabled.
|
||||||
|
//
|
||||||
|
// (e) Other CacheEntryRole
|
||||||
// Not supported.
|
// Not supported.
|
||||||
// `Status::kNotSupported` will be returned if
|
// `Status::kNotSupported` will be returned if
|
||||||
// `CacheEntryRoleOptions::charged` is set to {`kEnabled`, `kDisabled`}.
|
// `CacheEntryRoleOptions::charged` is set to {`kEnabled`, `kDisabled`}.
|
||||||
|
@ -5,6 +5,8 @@
|
|||||||
|
|
||||||
package org.rocksdb;
|
package org.rocksdb;
|
||||||
|
|
||||||
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -14,8 +16,6 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.rocksdb.util.Environment;
|
import org.rocksdb.util.Environment;
|
||||||
|
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A RocksDB is a persistent ordered map from keys to values. It is safe for
|
* A RocksDB is a persistent ordered map from keys to values. It is safe for
|
||||||
* concurrent access from multiple threads without any external synchronization.
|
* concurrent access from multiple threads without any external synchronization.
|
||||||
|
@ -689,7 +689,7 @@ Status BlockBasedTableFactory::ValidateOptions(
|
|||||||
static const std::set<CacheEntryRole> kMemoryChargingSupported = {
|
static const std::set<CacheEntryRole> kMemoryChargingSupported = {
|
||||||
CacheEntryRole::kCompressionDictionaryBuildingBuffer,
|
CacheEntryRole::kCompressionDictionaryBuildingBuffer,
|
||||||
CacheEntryRole::kFilterConstruction,
|
CacheEntryRole::kFilterConstruction,
|
||||||
CacheEntryRole::kBlockBasedTableReader};
|
CacheEntryRole::kBlockBasedTableReader, CacheEntryRole::kFileMetadata};
|
||||||
if (options.charged != CacheEntryRoleOptions::Decision::kFallback &&
|
if (options.charged != CacheEntryRoleOptions::Decision::kFallback &&
|
||||||
kMemoryChargingSupported.count(role) == 0) {
|
kMemoryChargingSupported.count(role) == 0) {
|
||||||
return Status::NotSupported(
|
return Status::NotSupported(
|
||||||
|
@ -1144,6 +1144,11 @@ DEFINE_bool(charge_table_reader, false,
|
|||||||
"CacheEntryRoleOptions::charged of"
|
"CacheEntryRoleOptions::charged of"
|
||||||
"CacheEntryRole::kBlockBasedTableReader");
|
"CacheEntryRole::kBlockBasedTableReader");
|
||||||
|
|
||||||
|
DEFINE_bool(charge_file_metadata, false,
|
||||||
|
"Setting for "
|
||||||
|
"CacheEntryRoleOptions::charged of"
|
||||||
|
"CacheEntryRole::kFileMetadata");
|
||||||
|
|
||||||
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
|
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
|
||||||
const char* ctype) {
|
const char* ctype) {
|
||||||
assert(ctype);
|
assert(ctype);
|
||||||
@ -4187,6 +4192,11 @@ class Benchmark {
|
|||||||
{/*.charged = */ FLAGS_charge_table_reader
|
{/*.charged = */ FLAGS_charge_table_reader
|
||||||
? CacheEntryRoleOptions::Decision::kEnabled
|
? CacheEntryRoleOptions::Decision::kEnabled
|
||||||
: CacheEntryRoleOptions::Decision::kDisabled}});
|
: CacheEntryRoleOptions::Decision::kDisabled}});
|
||||||
|
block_based_options.cache_usage_options.options_overrides.insert(
|
||||||
|
{CacheEntryRole::kFileMetadata,
|
||||||
|
{/*.charged = */ FLAGS_charge_file_metadata
|
||||||
|
? CacheEntryRoleOptions::Decision::kEnabled
|
||||||
|
: CacheEntryRoleOptions::Decision::kDisabled}});
|
||||||
block_based_options.block_cache_compressed = compressed_cache_;
|
block_based_options.block_cache_compressed = compressed_cache_;
|
||||||
block_based_options.block_size = FLAGS_block_size;
|
block_based_options.block_size = FLAGS_block_size;
|
||||||
block_based_options.block_restart_interval = FLAGS_block_restart_interval;
|
block_based_options.block_restart_interval = FLAGS_block_restart_interval;
|
||||||
|
@ -44,6 +44,7 @@ default_params = {
|
|||||||
"charge_compression_dictionary_building_buffer": lambda: random.choice([0, 1]),
|
"charge_compression_dictionary_building_buffer": lambda: random.choice([0, 1]),
|
||||||
"charge_filter_construction": lambda: random.choice([0, 1]),
|
"charge_filter_construction": lambda: random.choice([0, 1]),
|
||||||
"charge_table_reader": lambda: random.choice([0, 1]),
|
"charge_table_reader": lambda: random.choice([0, 1]),
|
||||||
|
"charge_file_metadata": lambda: random.choice([0, 1]),
|
||||||
"checkpoint_one_in": 1000000,
|
"checkpoint_one_in": 1000000,
|
||||||
"compression_type": lambda: random.choice(
|
"compression_type": lambda: random.choice(
|
||||||
["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]),
|
["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]),
|
||||||
|
Loading…
Reference in New Issue
Block a user