New backup meta schema, with file temperatures (#9660)

Summary:
The primary goal of this change is to add support for backing up and
restoring (applying on restore) file temperature metadata, without
committing to either the DB manifest or the FS reported "current"
temperatures being exclusive "source of truth".

To achieve this goal, we need to add temperature information to backup
metadata, which requires updated backup meta schema. Fortunately I
prepared for this in https://github.com/facebook/rocksdb/issues/8069, which began forward compatibility in version
6.19.0 for this kind of schema update. (Previously, backup meta schema
was not extensible! Making this schema update public will allow some
other "nice to have" features like taking backups with hard links, and
avoiding crc32c checksum computation when another checksum is already
available.) While schema version 2 is newly public, the default schema
version is still 1. Until we change the default, users will need to set
to 2 to enable features like temperature data backup+restore. New
metadata like temperature information will be ignored with a warning
in versions before this change and since 6.19.0. The metadata is
considered ignorable because a functioning DB can be restored without
it.

Some detail:
* Some renaming because "future schema" is now just public schema 2.
* Initialize some atomics in TestFs (linter reported)
* Add temperature hint support to SstFileDumper (used by BackupEngine)

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9660

Test Plan:
related unit test majorly updated for the new functionality,
including some shared testing support for tracking temperatures in a FS.

Some other tests and testing hooks into production code also updated for
making the backup meta schema change public.

Reviewed By: ajkr

Differential Revision: D34686968

Pulled By: pdillinger

fbshipit-source-id: 3ac1fa3e67ee97ca8a5103d79cc87d872c1d862a
This commit is contained in:
Peter Dillinger 2022-03-18 11:06:17 -07:00 committed by Facebook GitHub Bot
parent 3bdbf67e1a
commit cff0d1e8e6
13 changed files with 428 additions and 152 deletions

View File

@ -6949,7 +6949,7 @@ TEST_F(DBTest2, CheckpointFileTemperature) {
temperatures.emplace(info.file_number, info.temperature);
}
test_fs->ClearRequestedFileTemperatures();
test_fs->PopRequestedSstFileTemperatures();
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(
@ -6957,17 +6957,19 @@ TEST_F(DBTest2, CheckpointFileTemperature) {
// checking src file src_temperature hints: 2 sst files: 1 sst is kWarm,
// another is kUnknown
auto file_temperatures = test_fs->RequestedSstFileTemperatures();
ASSERT_EQ(file_temperatures.size(), 2);
bool has_only_one_warm_sst = false;
for (const auto& file_temperature : file_temperatures) {
ASSERT_EQ(temperatures.at(file_temperature.first), file_temperature.second);
if (file_temperature.second == Temperature::kWarm) {
ASSERT_FALSE(has_only_one_warm_sst);
has_only_one_warm_sst = true;
std::vector<std::pair<uint64_t, Temperature>> requested_temps;
test_fs->PopRequestedSstFileTemperatures(&requested_temps);
// Two requests
ASSERT_EQ(requested_temps.size(), 2);
std::set<uint64_t> distinct_requests;
for (const auto& requested_temp : requested_temps) {
// Matching manifest temperatures
ASSERT_EQ(temperatures.at(requested_temp.first), requested_temp.second);
distinct_requests.insert(requested_temp.first);
}
}
ASSERT_TRUE(has_only_one_warm_sst);
// Each request to distinct file
ASSERT_EQ(distinct_requests.size(), requested_temps.size());
delete checkpoint;
Close();
}

View File

@ -14,6 +14,7 @@
#include <algorithm>
#include <cinttypes>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <thread>
@ -23,12 +24,15 @@
#include "db/db_impl/db_impl.h"
#include "file/filename.h"
#include "rocksdb/advanced_options.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/file_system.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/io_status.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/sst_file_writer.h"
@ -698,29 +702,79 @@ class FileTemperatureTestFS : public FileSystemWrapper {
IOStatus NewSequentialFile(const std::string& fname, const FileOptions& opts,
std::unique_ptr<FSSequentialFile>* result,
IODebugContext* dbg) override {
auto filename = GetFileName(fname);
IOStatus s = target()->NewSequentialFile(fname, opts, result, dbg);
uint64_t number;
FileType type;
auto r = ParseFileName(filename, &number, &type);
assert(r);
if (type == kTableFile) {
auto emplaced =
requested_sst_file_temperatures_.emplace(number, opts.temperature);
assert(emplaced.second); // assume no duplication
if (ParseFileName(GetFileName(fname), &number, &type) &&
type == kTableFile) {
MutexLock lock(&mu_);
requested_sst_file_temperatures_.emplace_back(number, opts.temperature);
if (s.ok()) {
*result = WrapWithTemperature<FSSequentialFileOwnerWrapper>(
number, std::move(*result));
}
return target()->NewSequentialFile(fname, opts, result, dbg);
}
return s;
}
const std::map<uint64_t, Temperature>& RequestedSstFileTemperatures() {
return requested_sst_file_temperatures_;
IOStatus NewRandomAccessFile(const std::string& fname,
const FileOptions& opts,
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
IOStatus s = target()->NewRandomAccessFile(fname, opts, result, dbg);
uint64_t number;
FileType type;
if (ParseFileName(GetFileName(fname), &number, &type) &&
type == kTableFile) {
MutexLock lock(&mu_);
requested_sst_file_temperatures_.emplace_back(number, opts.temperature);
if (s.ok()) {
*result = WrapWithTemperature<FSRandomAccessFileOwnerWrapper>(
number, std::move(*result));
}
}
return s;
}
void ClearRequestedFileTemperatures() {
void PopRequestedSstFileTemperatures(
std::vector<std::pair<uint64_t, Temperature>>* out = nullptr) {
MutexLock lock(&mu_);
if (out) {
*out = std::move(requested_sst_file_temperatures_);
assert(requested_sst_file_temperatures_.empty());
} else {
requested_sst_file_temperatures_.clear();
}
}
IOStatus NewWritableFile(const std::string& fname, const FileOptions& opts,
std::unique_ptr<FSWritableFile>* result,
IODebugContext* dbg) override {
uint64_t number;
FileType type;
if (ParseFileName(GetFileName(fname), &number, &type) &&
type == kTableFile) {
MutexLock lock(&mu_);
current_sst_file_temperatures_[number] = opts.temperature;
}
return target()->NewWritableFile(fname, opts, result, dbg);
}
void CopyCurrentSstFileTemperatures(std::map<uint64_t, Temperature>* out) {
MutexLock lock(&mu_);
*out = current_sst_file_temperatures_;
}
void OverrideSstFileTemperature(uint64_t number, Temperature temp) {
MutexLock lock(&mu_);
current_sst_file_temperatures_[number] = temp;
}
protected:
std::map<uint64_t, Temperature> requested_sst_file_temperatures_;
port::Mutex mu_;
std::vector<std::pair<uint64_t, Temperature>>
requested_sst_file_temperatures_;
std::map<uint64_t, Temperature> current_sst_file_temperatures_;
std::string GetFileName(const std::string& fname) {
auto filename = fname.substr(fname.find_last_of(kFilePathSeparator) + 1);
@ -729,6 +783,27 @@ class FileTemperatureTestFS : public FileSystemWrapper {
filename = filename.substr(filename.find_last_of('/') + 1);
return filename;
}
template <class FileOwnerWrapperT, /*inferred*/ class FileT>
std::unique_ptr<FileT> WrapWithTemperature(uint64_t number,
std::unique_ptr<FileT>&& t) {
class FileWithTemp : public FileOwnerWrapperT {
public:
FileWithTemp(FileTemperatureTestFS* fs, uint64_t number,
std::unique_ptr<FileT>&& t)
: FileOwnerWrapperT(std::move(t)), fs_(fs), number_(number) {}
Temperature GetTemperature() const override {
MutexLock lock(&fs_->mu_);
return fs_->current_sst_file_temperatures_[number_];
}
private:
FileTemperatureTestFS* fs_;
uint64_t number_;
};
return std::make_unique<FileWithTemp>(this, number, std::move(t));
}
};
class OnFileDeletionListener : public EventListener {

View File

@ -1451,6 +1451,11 @@ Status StressTest::TestBackupRestore(
}
}
}
if (thread->rand.OneIn(2)) {
backup_opts.schema_version = 1;
} else {
backup_opts.schema_version = 2;
}
BackupEngine* backup_engine = nullptr;
std::string from = "a backup/restore operation";
Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
@ -1458,11 +1463,11 @@ Status StressTest::TestBackupRestore(
from = "BackupEngine::Open";
}
if (s.ok()) {
if (thread->rand.OneIn(2)) {
TEST_FutureSchemaVersion2Options test_opts;
if (backup_opts.schema_version >= 2 && thread->rand.OneIn(2)) {
TEST_BackupMetaSchemaOptions test_opts;
test_opts.crc32c_checksums = thread->rand.OneIn(2) == 0;
test_opts.file_sizes = thread->rand.OneIn(2) == 0;
TEST_EnableWriteFutureSchemaVersion2(backup_engine, test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine, test_opts);
}
CreateBackupOptions create_opts;
if (FLAGS_disable_wal) {

View File

@ -204,6 +204,22 @@ struct BackupEngineOptions {
// and share_table_files are true.
ShareFilesNaming share_files_with_checksum_naming;
// Major schema version to use when writing backup meta files
// 1 (default) - compatible with very old versions of RocksDB.
// 2 - can be read by RocksDB versions >= 6.19.0. Minimum schema version for
// * (Experimental) saving and restoring file temperature metadata
int schema_version = 1;
// (Experimental - subject to change or removal) When taking a backup and
// saving file temperature info (minimum schema_version is 2), there are
// two potential sources of truth for the placement of files into temperature
// tiers: (a) the current file temperature reported by the FileSystem or
// (b) the expected file temperature recorded in DB manifest. When this
// option is false (default), (b) overrides (a) if both are not UNKNOWN.
// When true, (a) overrides (b) if both are not UNKNOWN. Regardless of this
// setting, a known temperature overrides UNKNOWN.
bool current_temperatures_override_manifest = false;
void Dump(Logger* logger) const;
explicit BackupEngineOptions(

View File

@ -321,6 +321,12 @@ std::map<CompactionStopStyle, std::string>
{kCompactionStopStyleSimilarSize, "kCompactionStopStyleSimilarSize"},
{kCompactionStopStyleTotalSize, "kCompactionStopStyleTotalSize"}};
std::map<Temperature, std::string> OptionsHelper::temperature_to_string = {
{Temperature::kUnknown, "kUnknown"},
{Temperature::kHot, "kHot"},
{Temperature::kWarm, "kWarm"},
{Temperature::kCold, "kCold"}};
std::unordered_map<std::string, ChecksumType>
OptionsHelper::checksum_type_string_map = {{"kNoChecksum", kNoChecksum},
{"kCRC32c", kCRC32c},

View File

@ -10,6 +10,7 @@
#include <string>
#include <vector>
#include "rocksdb/advanced_options.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
@ -77,6 +78,7 @@ struct OptionsHelper {
static std::map<CompactionPri, std::string> compaction_pri_to_string;
static std::map<CompactionStopStyle, std::string>
compaction_stop_style_to_string;
static std::map<Temperature, std::string> temperature_to_string;
static std::unordered_map<std::string, ChecksumType> checksum_type_string_map;
static std::unordered_map<std::string, CompressionType>
compression_type_string_map;
@ -98,6 +100,7 @@ static auto& compaction_style_to_string =
static auto& compaction_pri_to_string = OptionsHelper::compaction_pri_to_string;
static auto& compaction_stop_style_to_string =
OptionsHelper::compaction_stop_style_to_string;
static auto& temperature_to_string = OptionsHelper::temperature_to_string;
static auto& checksum_type_string_map = OptionsHelper::checksum_type_string_map;
#ifndef ROCKSDB_LITE
static auto& compaction_stop_style_string_map =

View File

@ -42,11 +42,13 @@ namespace ROCKSDB_NAMESPACE {
SstFileDumper::SstFileDumper(const Options& options,
const std::string& file_path,
size_t readahead_size, bool verify_checksum,
bool output_hex, bool decode_blob_index,
const EnvOptions& soptions, bool silent)
Temperature file_temp, size_t readahead_size,
bool verify_checksum, bool output_hex,
bool decode_blob_index, const EnvOptions& soptions,
bool silent)
: file_name_(file_path),
read_num_(0),
file_temp_(file_temp),
output_hex_(output_hex),
decode_blob_index_(decode_blob_index),
soptions_(soptions),
@ -82,8 +84,9 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
const auto& fs = options_.env->GetFileSystem();
std::unique_ptr<FSRandomAccessFile> file;
uint64_t file_size = 0;
Status s = fs->NewRandomAccessFile(file_path, FileOptions(soptions_), &file,
nullptr);
FileOptions fopts = soptions_;
fopts.temperature = file_temp_;
Status s = fs->NewRandomAccessFile(file_path, fopts, &file, nullptr);
if (s.ok()) {
s = fs->GetFileSize(file_path, IOOptions(), &file_size, nullptr);
}
@ -122,8 +125,7 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
magic_number == kLegacyPlainTableMagicNumber) {
soptions_.use_mmap_reads = true;
fs->NewRandomAccessFile(file_path, FileOptions(soptions_), &file,
nullptr);
fs->NewRandomAccessFile(file_path, fopts, &file, nullptr);
file_.reset(new RandomAccessFileReader(std::move(file), file_path));
}

View File

@ -7,17 +7,20 @@
#include <memory>
#include <string>
#include "db/dbformat.h"
#include "file/writable_file_writer.h"
#include "options/cf_options.h"
#include "rocksdb/advanced_options.h"
namespace ROCKSDB_NAMESPACE {
class SstFileDumper {
public:
explicit SstFileDumper(const Options& options, const std::string& file_name,
size_t readahead_size, bool verify_checksum,
bool output_hex, bool decode_blob_index,
Temperature file_temp, size_t readahead_size,
bool verify_checksum, bool output_hex,
bool decode_blob_index,
const EnvOptions& soptions = EnvOptions(),
bool silent = false);
@ -71,6 +74,7 @@ class SstFileDumper {
std::string file_name_;
uint64_t read_num_;
Temperature file_temp_;
bool output_hex_;
bool decode_blob_index_;
EnvOptions soptions_;

View File

@ -3471,7 +3471,8 @@ void DumpSstFile(Options options, std::string filename, bool output_hex,
// no verification
// TODO: add support for decoding blob indexes in ldb as well
ROCKSDB_NAMESPACE::SstFileDumper dumper(
options, filename, 2 * 1024 * 1024 /* readahead_size */,
options, filename, Temperature::kUnknown,
2 * 1024 * 1024 /* readahead_size */,
/* verify_checksum */ false, output_hex,
/* decode_blob_index */ false);
Status st = dumper.ReadSequential(true, std::numeric_limits<uint64_t>::max(),

View File

@ -398,9 +398,9 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
filename = std::string(dir_or_file) + "/" + filename;
}
ROCKSDB_NAMESPACE::SstFileDumper dumper(options, filename, readahead_size,
verify_checksum, output_hex,
decode_blob_index);
ROCKSDB_NAMESPACE::SstFileDumper dumper(
options, filename, Temperature::kUnknown, readahead_size,
verify_checksum, output_hex, decode_blob_index);
// Not a valid SST
if (!dumper.getStatus().ok()) {
fprintf(stderr, "%s: %s\n", filename.c_str(),

View File

@ -34,7 +34,9 @@
#include "file/writable_file_writer.h"
#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "rocksdb/advanced_options.h"
#include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/statistics.h"
@ -191,13 +193,14 @@ class BackupEngineImpl {
struct FileInfo {
FileInfo(const std::string& fname, uint64_t sz, const std::string& checksum,
const std::string& id = "", const std::string& sid = "")
const std::string& id, const std::string& sid, Temperature _temp)
: refs(0),
filename(fname),
size(sz),
checksum_hex(checksum),
db_id(id),
db_session_id(sid) {}
db_session_id(sid),
temp(_temp) {}
FileInfo(const FileInfo&) = delete;
FileInfo& operator=(const FileInfo&) = delete;
@ -214,6 +217,7 @@ class BackupEngineImpl {
// db_session_id appears in the backup SST filename if the table naming
// option is kUseDbSessionId
const std::string db_session_id;
Temperature temp;
std::string GetDbFileName() {
std::string rv;
@ -420,7 +424,8 @@ class BackupEngineImpl {
RateLimiter* rate_limiter, Logger* info_log,
std::unordered_set<std::string>* reported_ignored_fields);
IOStatus StoreToFile(
bool sync, const TEST_FutureSchemaVersion2Options* test_future_options);
bool sync, int schema_version,
const TEST_BackupMetaSchemaOptions* schema_test_options);
std::string GetInfoString() {
std::ostringstream ss;
@ -546,13 +551,16 @@ class BackupEngineImpl {
//
// @param src If non-empty, the file is copied from this pathname.
// @param contents If non-empty, the file will be created with these contents.
// @param src_temperature Pass in expected temperature of src, return back
// temperature reported by FileSystem
IOStatus CopyOrCreateFile(const std::string& src, const std::string& dst,
const std::string& contents, uint64_t size_limit,
Env* src_env, Env* dst_env,
const EnvOptions& src_env_options, bool sync,
RateLimiter* rate_limiter,
std::function<void()> progress_callback,
const Temperature src_temperature,
Temperature* src_temperature,
Temperature dst_temperature,
uint64_t* bytes_toward_next_callback,
uint64_t* size, std::string* checksum_hex);
@ -566,8 +574,8 @@ class BackupEngineImpl {
// Obtain db_id and db_session_id from the table properties of file_path
Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
const std::string& file_path,
RateLimiter* rate_limiter, std::string* db_id,
std::string* db_session_id);
Temperature file_temp, RateLimiter* rate_limiter,
std::string* db_id, std::string* db_session_id);
struct CopyOrCreateResult {
~CopyOrCreateResult() {
@ -585,6 +593,8 @@ class BackupEngineImpl {
std::string db_id;
std::string db_session_id;
IOStatus io_status;
Temperature expected_src_temperature = Temperature::kUnknown;
Temperature current_src_temperature = Temperature::kUnknown;
};
// Exactly one of src_path and contents must be non-empty. If src_path is
@ -593,6 +603,8 @@ class BackupEngineImpl {
struct CopyOrCreateWorkItem {
std::string src_path;
std::string dst_path;
Temperature src_temperature;
Temperature dst_temperature;
std::string contents;
Env* src_env;
Env* dst_env;
@ -607,11 +619,12 @@ class BackupEngineImpl {
std::string src_checksum_hex;
std::string db_id;
std::string db_session_id;
Temperature src_temperature;
CopyOrCreateWorkItem()
: src_path(""),
dst_path(""),
src_temperature(Temperature::kUnknown),
dst_temperature(Temperature::kUnknown),
contents(""),
src_env(nullptr),
dst_env(nullptr),
@ -623,8 +636,7 @@ class BackupEngineImpl {
src_checksum_func_name(kUnknownFileChecksumFuncName),
src_checksum_hex(""),
db_id(""),
db_session_id(""),
src_temperature(Temperature::kUnknown) {}
db_session_id("") {}
CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
@ -636,6 +648,8 @@ class BackupEngineImpl {
CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
src_path = std::move(o.src_path);
dst_path = std::move(o.dst_path);
src_temperature = std::move(o.src_temperature);
dst_temperature = std::move(o.dst_temperature);
contents = std::move(o.contents);
src_env = o.src_env;
dst_env = o.dst_env;
@ -655,17 +669,20 @@ class BackupEngineImpl {
}
CopyOrCreateWorkItem(
std::string _src_path, std::string _dst_path, std::string _contents,
Env* _src_env, Env* _dst_env, EnvOptions _src_env_options, bool _sync,
RateLimiter* _rate_limiter, uint64_t _size_limit, Statistics* _stats,
std::string _src_path, std::string _dst_path,
const Temperature _src_temperature, const Temperature _dst_temperature,
std::string _contents, Env* _src_env, Env* _dst_env,
EnvOptions _src_env_options, bool _sync, RateLimiter* _rate_limiter,
uint64_t _size_limit, Statistics* _stats,
std::function<void()> _progress_callback = []() {},
const std::string& _src_checksum_func_name =
kUnknownFileChecksumFuncName,
const std::string& _src_checksum_hex = "",
const std::string& _db_id = "", const std::string& _db_session_id = "",
const Temperature _src_temperature = Temperature::kUnknown)
const std::string& _db_id = "", const std::string& _db_session_id = "")
: src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)),
src_temperature(_src_temperature),
dst_temperature(_dst_temperature),
contents(std::move(_contents)),
src_env(_src_env),
dst_env(_dst_env),
@ -678,8 +695,7 @@ class BackupEngineImpl {
src_checksum_func_name(_src_checksum_func_name),
src_checksum_hex(_src_checksum_hex),
db_id(_db_id),
db_session_id(_db_session_id),
src_temperature(_src_temperature) {}
db_session_id(_db_session_id) {}
};
struct BackupAfterCopyOrCreateWorkItem {
@ -821,7 +837,7 @@ class BackupEngineImpl {
IOOptions io_options_ = IOOptions();
public:
std::unique_ptr<TEST_FutureSchemaVersion2Options> test_future_options_;
std::unique_ptr<TEST_BackupMetaSchemaOptions> schema_test_options_;
};
// -------- BackupEngineImplThreadSafe class ---------
@ -919,10 +935,9 @@ class BackupEngineImplThreadSafe : public BackupEngine,
}
// Not public API but used in testing
void TEST_EnableWriteFutureSchemaVersion2(
const TEST_FutureSchemaVersion2Options& options) {
impl_.test_future_options_.reset(
new TEST_FutureSchemaVersion2Options(options));
void TEST_SetBackupMetaSchemaOptions(
const TEST_BackupMetaSchemaOptions& options) {
impl_.schema_test_options_.reset(new TEST_BackupMetaSchemaOptions(options));
}
private:
@ -1198,11 +1213,12 @@ IOStatus BackupEngineImpl::Initialize() {
uint64_t prev_bytes_written = IOSTATS(bytes_written);
CopyOrCreateResult result;
Temperature temp = work_item.src_temperature;
result.io_status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.size_limit, work_item.src_env, work_item.dst_env,
work_item.src_env_options, work_item.sync, work_item.rate_limiter,
work_item.progress_callback, work_item.src_temperature,
work_item.progress_callback, &temp, work_item.dst_temperature,
&bytes_toward_next_callback, &result.size, &result.checksum_hex);
RecordTick(work_item.stats, BACKUP_READ_BYTES,
@ -1212,6 +1228,8 @@ IOStatus BackupEngineImpl::Initialize() {
result.db_id = work_item.db_id;
result.db_session_id = work_item.db_session_id;
result.expected_src_temperature = work_item.src_temperature;
result.current_src_temperature = temp;
if (result.io_status.ok() && !work_item.src_checksum_hex.empty()) {
// unknown checksum function name implies no db table file checksum in
// db manifest; work_item.src_checksum_hex not empty means
@ -1424,6 +1442,12 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
item.result.wait();
auto result = item.result.get();
item_io_status = result.io_status;
Temperature temp = result.expected_src_temperature;
if (result.current_src_temperature != Temperature::kUnknown &&
(temp == Temperature::kUnknown ||
options_.current_temperatures_override_manifest)) {
temp = result.current_src_temperature;
}
if (item_io_status.ok() && item.shared && item.needed_to_copy) {
item_io_status = item.backup_env->GetFileSystem()->RenameFile(
item.dst_path_tmp, item.dst_path, io_options_, nullptr);
@ -1431,7 +1455,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
if (item_io_status.ok()) {
item_io_status = new_backup.get()->AddFile(std::make_shared<FileInfo>(
item.dst_relative, result.size, result.checksum_hex, result.db_id,
result.db_session_id));
result.db_session_id, temp));
}
if (!item_io_status.ok()) {
io_s = item_io_status;
@ -1446,7 +1470,8 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
if (io_s.ok()) {
// persist the backup metadata on the disk
io_s = new_backup->StoreToFile(options_.sync, test_future_options_.get());
io_s = new_backup->StoreToFile(options_.sync, options_.schema_version,
schema_test_options_.get());
}
if (io_s.ok() && options_.sync) {
std::unique_ptr<FSDirectory> backup_private_directory;
@ -1823,7 +1848,8 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
dst.c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */,
file_info->temp, "" /* contents */, backup_env_, db_env_,
EnvOptions() /* src_env_options */, options_.sync,
options_.restore_rate_limiter.get(), 0 /* size_limit */,
nullptr /* stats */);
@ -1962,15 +1988,16 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
const std::string& src, const std::string& dst, const std::string& contents,
uint64_t size_limit, Env* src_env, Env* dst_env,
const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter,
std::function<void()> progress_callback, const Temperature src_temperature,
uint64_t* bytes_toward_next_callback, uint64_t* size,
std::string* checksum_hex) {
std::function<void()> progress_callback, Temperature* src_temperature,
Temperature dst_temperature, uint64_t* bytes_toward_next_callback,
uint64_t* size, std::string* checksum_hex) {
assert(src.empty() != contents.empty());
IOStatus io_s;
std::unique_ptr<FSWritableFile> dst_file;
std::unique_ptr<FSSequentialFile> src_file;
FileOptions dst_file_options;
dst_file_options.use_mmap_writes = false;
dst_file_options.temperature = dst_temperature;
// TODO:(gzh) maybe use direct reads/writes here if possible
if (size != nullptr) {
*size = 0;
@ -1986,7 +2013,7 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
&dst_file, nullptr);
if (io_s.ok() && !src.empty()) {
auto src_file_options = FileOptions(src_env_options);
src_file_options.temperature = src_temperature;
src_file_options.temperature = *src_temperature;
io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options,
&src_file, nullptr);
}
@ -2003,6 +2030,9 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
std::unique_ptr<SequentialFileReader> src_reader;
std::unique_ptr<char[]> buf;
if (!src.empty()) {
// Return back current temperature in FileSystem
*src_temperature = src_file->GetTemperature();
src_reader.reset(new SequentialFileReader(std::move(src_file), src));
buf.reset(new char[buf_size]);
}
@ -2121,8 +2151,8 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
// Prepare db_session_id to add to the file name
// Ignore the returned status
// In the failed cases, db_id and db_session_id will be empty
GetFileDbIdentities(db_env_, src_env_options, src_path, rate_limiter,
&db_id, &db_session_id)
GetFileDbIdentities(db_env_, src_env_options, src_path, src_temperature,
rate_limiter, &db_id, &db_session_id)
.PermitUncheckedError();
}
// Calculate checksum if checksum and db session id are not available.
@ -2278,10 +2308,11 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
ROCKS_LOG_INFO(options_.info_log, "Copying %s to %s", fname.c_str(),
copy_dest_path->c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_path, *copy_dest_path, contents, db_env_,
backup_env_, src_env_options, options_.sync, rate_limiter, size_limit,
stats, progress_callback, src_checksum_func_name, checksum_hex, db_id,
db_session_id, src_temperature);
src_dir.empty() ? "" : src_path, *copy_dest_path, src_temperature,
Temperature::kUnknown /*dst_temp*/, contents, db_env_, backup_env_,
src_env_options, options_.sync, rate_limiter, size_limit, stats,
progress_callback, src_checksum_func_name, checksum_hex, db_id,
db_session_id);
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);
@ -2356,17 +2387,15 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
return io_s;
}
Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
const EnvOptions& src_env_options,
const std::string& file_path,
RateLimiter* rate_limiter,
std::string* db_id,
std::string* db_session_id) {
Status BackupEngineImpl::GetFileDbIdentities(
Env* src_env, const EnvOptions& src_env_options,
const std::string& file_path, Temperature file_temp,
RateLimiter* rate_limiter, std::string* db_id, std::string* db_session_id) {
assert(db_id != nullptr || db_session_id != nullptr);
Options options;
options.env = src_env;
SstFileDumper sst_reader(options, file_path,
SstFileDumper sst_reader(options, file_path, file_temp,
2 * 1024 * 1024
/* readahead_size */,
false /* verify_checksum */, false /* output_hex */,
@ -2678,6 +2707,7 @@ const std::string kAppMetaDataFieldName{"metadata"};
// WART: The checksums are crc32c but named "crc32"
const std::string kFileCrc32cFieldName{"crc32"};
const std::string kFileSizeFieldName{"size"};
const std::string kTemperatureFieldName{"temp"};
// Marks a (future) field that should cause failure if not recognized.
// Other fields are assumed to be ignorable. For example, in the future
@ -2898,6 +2928,7 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
}
std::string checksum_hex;
Temperature temp = Temperature::kUnknown;
for (unsigned i = 1; i < components.size(); i += 2) {
const std::string& field_name = components[i];
const std::string& field_data = components[i + 1];
@ -2918,6 +2949,16 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
"For file " + filename + " expected size " + ToString(ex_size) +
" but found size" + ToString(actual_size));
}
} else if (field_name == kTemperatureFieldName) {
auto iter = temperature_string_map.find(field_data);
if (iter != temperature_string_map.end()) {
temp = iter->second;
} else {
// Could report corruption, but in case of new temperatures added
// in future, letting those map to kUnknown which should generally
// be safe.
temp = Temperature::kUnknown;
}
} else if (StartsWith(field_name, kNonIgnorableFieldPrefix)) {
return IOStatus::NotSupported("Unrecognized non-ignorable file field " +
field_name + " (from future version?)");
@ -2930,7 +2971,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
}
}
files.emplace_back(new FileInfo(filename, actual_size, checksum_hex));
files.emplace_back(new FileInfo(filename, actual_size, checksum_hex,
/*id*/ "", /*sid*/ "", temp));
}
if (footer_present) {
@ -2987,8 +3029,31 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile(
return IOStatus::OK();
}
namespace {
const std::vector<std::string> minor_version_strings{
"", // invalid major version 0
"", // implicit major version 1
"2.0",
};
} // namespace
IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
bool sync, const TEST_FutureSchemaVersion2Options* test_future_options) {
bool sync, int schema_version,
const TEST_BackupMetaSchemaOptions* schema_test_options) {
if (schema_version < 1) {
return IOStatus::InvalidArgument(
"BackupEngineOptions::schema_version must be >= 1");
}
if (schema_version > static_cast<int>(minor_version_strings.size() - 1)) {
return IOStatus::NotSupported(
"Only BackupEngineOptions::schema_version <= " +
ToString(minor_version_strings.size() - 1) + " is supported");
}
std::string ver = minor_version_strings[schema_version];
// Need schema_version >= 2 for TEST_BackupMetaSchemaOptions
assert(schema_version >= 2 || schema_test_options == nullptr);
IOStatus io_s;
std::unique_ptr<FSWritableFile> backup_meta_file;
FileOptions file_options;
@ -3001,8 +3066,13 @@ IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
}
std::ostringstream buf;
if (test_future_options) {
buf << kSchemaVersionPrefix << test_future_options->version << "\n";
if (schema_test_options) {
// override for testing
ver = schema_test_options->version;
}
if (!ver.empty()) {
assert(schema_version >= 2);
buf << kSchemaVersionPrefix << ver << "\n";
}
buf << static_cast<unsigned long long>(timestamp_) << "\n";
buf << sequence_number_ << "\n";
@ -3012,8 +3082,8 @@ IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
Slice(app_metadata_).ToString(/* hex */ true);
buf << kAppMetaDataFieldName << " " << hex_encoded_metadata << "\n";
}
if (test_future_options) {
for (auto& e : test_future_options->meta_fields) {
if (schema_test_options) {
for (auto& e : schema_test_options->meta_fields) {
buf << e.first << " " << e.second << "\n";
}
}
@ -3021,26 +3091,30 @@ IOStatus BackupEngineImpl::BackupMeta::StoreToFile(
for (const auto& file : files_) {
buf << file->filename;
if (test_future_options == nullptr ||
test_future_options->crc32c_checksums) {
if (schema_test_options == nullptr ||
schema_test_options->crc32c_checksums) {
// use crc32c for now, switch to something else if needed
buf << " " << kFileCrc32cFieldName << " "
<< ChecksumHexToInt32(file->checksum_hex);
}
if (test_future_options && test_future_options->file_sizes) {
if (schema_version >= 2 && file->temp != Temperature::kUnknown) {
buf << " " << kTemperatureFieldName << " "
<< temperature_to_string[file->temp];
}
if (schema_test_options && schema_test_options->file_sizes) {
buf << " " << kFileSizeFieldName << " " << ToString(file->size);
}
if (test_future_options) {
for (auto& e : test_future_options->file_fields) {
if (schema_test_options) {
for (auto& e : schema_test_options->file_fields) {
buf << " " << e.first << " " << e.second;
}
}
buf << "\n";
}
if (test_future_options && !test_future_options->footer_fields.empty()) {
if (schema_test_options && !schema_test_options->footer_fields.empty()) {
buf << kFooterMarker << "\n";
for (auto& e : test_future_options->footer_fields) {
for (auto& e : schema_test_options->footer_fields) {
buf << e.first << " " << e.second << "\n";
}
}
@ -3078,11 +3152,11 @@ IOStatus BackupEngineReadOnly::Open(const BackupEngineOptions& options,
return IOStatus::OK();
}
void TEST_EnableWriteFutureSchemaVersion2(
BackupEngine* engine, const TEST_FutureSchemaVersion2Options& options) {
void TEST_SetBackupMetaSchemaOptions(
BackupEngine* engine, const TEST_BackupMetaSchemaOptions& options) {
BackupEngineImplThreadSafe* impl =
static_cast_with_check<BackupEngineImplThreadSafe>(engine);
impl->TEST_EnableWriteFutureSchemaVersion2(options);
impl->TEST_SetBackupMetaSchemaOptions(options);
}
} // namespace ROCKSDB_NAMESPACE

View File

@ -10,7 +10,7 @@
namespace ROCKSDB_NAMESPACE {
struct TEST_FutureSchemaVersion2Options {
struct TEST_BackupMetaSchemaOptions {
std::string version = "2";
bool crc32c_checksums = false;
bool file_sizes = true;
@ -21,9 +21,9 @@ struct TEST_FutureSchemaVersion2Options {
// Modifies the BackupEngine(Impl) to write backup meta files using the
// unpublished schema version 2, for the life of this object (not backup_dir).
// TEST_FutureSchemaVersion2Options offers some customization for testing.
void TEST_EnableWriteFutureSchemaVersion2(
BackupEngine *engine, const TEST_FutureSchemaVersion2Options &options);
// TEST_BackupMetaSchemaOptions offers some customization for testing.
void TEST_SetBackupMetaSchemaOptions(
BackupEngine *engine, const TEST_BackupMetaSchemaOptions &options);
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

View File

@ -27,6 +27,7 @@
#include "file/filename.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/advanced_options.h"
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/rate_limiter.h"
@ -422,12 +423,12 @@ class TestFs : public FileSystemWrapper {
// Keeps track of how many files of each type were successfully opened, and
// out of those, how many were opened with direct I/O.
std::atomic<int> num_rand_readers_;
std::atomic<int> num_direct_rand_readers_;
std::atomic<int> num_seq_readers_;
std::atomic<int> num_direct_seq_readers_;
std::atomic<int> num_writers_;
std::atomic<int> num_direct_writers_;
std::atomic<int> num_rand_readers_{};
std::atomic<int> num_direct_rand_readers_{};
std::atomic<int> num_seq_readers_{};
std::atomic<int> num_direct_seq_readers_{};
std::atomic<int> num_writers_{};
std::atomic<int> num_direct_writers_{};
}; // TestFs
class FileManager : public EnvWrapper {
@ -3278,27 +3279,32 @@ TEST_F(BackupEngineTest, MetadataTooLarge) {
DestroyDB(dbname_, options_);
}
TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_SizeCorruption) {
OpenDBAndBackupEngine(true);
TEST_F(BackupEngineTest, MetaSchemaVersion2_SizeCorruption) {
engine_options_->schema_version = 1;
OpenDBAndBackupEngine(/*destroy_old_data*/ true);
// Backup 1: no future schema, no sizes, with checksums
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
CloseDBAndBackupEngine();
engine_options_->schema_version = 2;
OpenDBAndBackupEngine(/*destroy_old_data*/ false);
// Backup 2: no checksums, no sizes
TEST_FutureSchemaVersion2Options test_opts;
TEST_BackupMetaSchemaOptions test_opts;
test_opts.crc32c_checksums = false;
test_opts.file_sizes = false;
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
// Backup 3: no checksums, with sizes
test_opts.file_sizes = true;
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
// Backup 4: with checksums and sizes
test_opts.crc32c_checksums = true;
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
CloseDBAndBackupEngine();
@ -3341,13 +3347,14 @@ TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_SizeCorruption) {
CloseBackupEngine();
}
TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_NotSupported) {
TEST_FutureSchemaVersion2Options test_opts;
TEST_F(BackupEngineTest, MetaSchemaVersion2_NotSupported) {
engine_options_->schema_version = 2;
TEST_BackupMetaSchemaOptions test_opts;
std::string app_metadata = "abc\ndef";
OpenDBAndBackupEngine(true);
// Start with supported
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(
backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
@ -3355,30 +3362,30 @@ TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_NotSupported) {
// detected on attempt to restore.
// Not supported versions
test_opts.version = "3";
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(
backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
test_opts.version = "23.45.67";
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(
backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
test_opts.version = "2";
// Non-ignorable fields
test_opts.meta_fields["ni::blah"] = "123";
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(
backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
test_opts.meta_fields.clear();
test_opts.file_fields["ni::123"] = "xyz";
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(
backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
test_opts.file_fields.clear();
test_opts.footer_fields["ni::123"] = "xyz";
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(
backup_engine_->CreateNewBackupWithMetadata(db_.get(), app_metadata));
test_opts.footer_fields.clear();
@ -3393,8 +3400,9 @@ TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_NotSupported) {
CloseBackupEngine();
}
TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_Restore) {
TEST_FutureSchemaVersion2Options test_opts;
TEST_F(BackupEngineTest, MetaSchemaVersion2_Restore) {
engine_options_->schema_version = 2;
TEST_BackupMetaSchemaOptions test_opts;
const int keys_iteration = 5000;
OpenDBAndBackupEngine(true, false, kShareWithChecksum);
@ -3403,7 +3411,7 @@ TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_Restore) {
// based on shared files also in other backups with the metadata.
test_opts.crc32c_checksums = false;
test_opts.file_sizes = false;
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
CloseDBAndBackupEngine();
@ -3412,7 +3420,7 @@ TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_Restore) {
OpenDBAndBackupEngine(false /* destroy_old_data */, false,
kShareWithChecksum);
test_opts.file_sizes = true;
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
CloseDBAndBackupEngine();
@ -3423,7 +3431,7 @@ TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_Restore) {
OpenDBAndBackupEngine(false /* destroy_old_data */, false,
kShareWithChecksum);
test_opts.crc32c_checksums = true;
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
CloseDBAndBackupEngine();
@ -3451,7 +3459,7 @@ TEST_F(BackupEngineTest, FutureMetaSchemaVersion2_Restore) {
test_opts.file_fields["_7yyyyyyyyy"] = "111111111111";
test_opts.footer_fields["Qwzn.tz89"] = "ASDF!!@# ##=\t ";
test_opts.footer_fields["yes"] = "no!";
TEST_EnableWriteFutureSchemaVersion2(backup_engine_.get(), test_opts);
TEST_SetBackupMetaSchemaOptions(backup_engine_.get(), test_opts);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
CloseDBAndBackupEngine();
@ -4009,6 +4017,10 @@ TEST_F(BackupEngineTest, IOStats) {
TEST_F(BackupEngineTest, FileTemperatures) {
CloseDBAndBackupEngine();
// Required for recording+restoring temperatures
engine_options_->schema_version = 2;
// More file IO instrumentation
auto my_db_fs = std::make_shared<FileTemperatureTestFS>(db_chroot_fs_);
test_db_fs_ = std::make_shared<TestFs>(my_db_fs);
@ -4021,7 +4033,7 @@ TEST_F(BackupEngineTest, FileTemperatures) {
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
kShareWithChecksum);
// generate a bottommost file and a non-bottommost file
// generate a bottommost file (combined from 2) and a non-bottommost file
DBImpl* dbi = static_cast_with_check<DBImpl>(db_.get());
ASSERT_OK(db_->Put(WriteOptions(), "a", "val"));
ASSERT_OK(db_->Put(WriteOptions(), "c", "val"));
@ -4033,8 +4045,10 @@ TEST_F(BackupEngineTest, FileTemperatures) {
ASSERT_OK(db_->Put(WriteOptions(), "e", "val"));
ASSERT_OK(db_->Flush(FlushOptions()));
// Get temperatures from manifest
std::map<uint64_t, Temperature> manifest_temps;
std::map<Temperature, int> manifest_temp_counts;
{
std::vector<LiveFileStorageInfo> infos;
ASSERT_OK(
db_->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos));
@ -4044,29 +4058,103 @@ TEST_F(BackupEngineTest, FileTemperatures) {
manifest_temp_counts[info.temperature]++;
}
}
}
// Verify expected manifest temperatures
ASSERT_EQ(manifest_temp_counts.size(), 2);
ASSERT_EQ(manifest_temp_counts[Temperature::kWarm], 1);
ASSERT_EQ(manifest_temp_counts[Temperature::kUnknown], 1);
// Verify manifest temperatures match FS temperatures
std::map<uint64_t, Temperature> current_temps;
my_db_fs->CopyCurrentSstFileTemperatures(&current_temps);
for (const auto& manifest_temp : manifest_temps) {
ASSERT_EQ(current_temps[manifest_temp.first], manifest_temp.second);
}
// Try a few different things
for (int i = 1; i <= 5; ++i) {
// Expected temperatures after restore are based on manifest temperatures
std::map<uint64_t, Temperature> expected_temps = manifest_temps;
if (i >= 2) {
// For iterations 2 & 3, override current temperature of one file
// and vary which temperature is authoritative (current or manifest).
// For iterations 4 & 5, override current temperature of both files
// but make sure an current temperate always takes precedence over
// unknown regardless of current_temperatures_override_manifest setting.
bool use_current = ((i % 2) == 1);
engine_options_->current_temperatures_override_manifest = use_current;
CloseBackupEngine();
OpenBackupEngine();
for (const auto& manifest_temp : manifest_temps) {
if (i <= 3) {
if (manifest_temp.second == Temperature::kWarm) {
my_db_fs->OverrideSstFileTemperature(manifest_temp.first,
Temperature::kCold);
if (use_current) {
expected_temps[manifest_temp.first] = Temperature::kCold;
}
}
} else {
assert(i <= 5);
if (manifest_temp.second == Temperature::kWarm) {
my_db_fs->OverrideSstFileTemperature(manifest_temp.first,
Temperature::kUnknown);
} else {
ASSERT_EQ(manifest_temp.second, Temperature::kUnknown);
my_db_fs->OverrideSstFileTemperature(manifest_temp.first,
Temperature::kHot);
// regardless of use_current
expected_temps[manifest_temp.first] = Temperature::kHot;
}
}
}
}
// Sample requested temperatures in opening files for backup
my_db_fs->ClearRequestedFileTemperatures();
my_db_fs->PopRequestedSstFileTemperatures();
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
// checking src file src_temperature hints: 2 sst files: 1 sst is kWarm,
// another is kUnknown
auto requested_temps = my_db_fs->RequestedSstFileTemperatures();
ASSERT_EQ(requested_temps.size(), 2);
bool has_only_one_warm_sst = false;
// Verify requested temperatures against manifest temperatures (before
// backup finds out current temperatures in FileSystem)
std::vector<std::pair<uint64_t, Temperature>> requested_temps;
my_db_fs->PopRequestedSstFileTemperatures(&requested_temps);
std::set<uint64_t> distinct_requests;
for (const auto& requested_temp : requested_temps) {
// Matching manifest temperatures
ASSERT_EQ(manifest_temps.at(requested_temp.first), requested_temp.second);
if (requested_temp.second == Temperature::kWarm) {
ASSERT_FALSE(has_only_one_warm_sst);
has_only_one_warm_sst = true;
distinct_requests.insert(requested_temp.first);
}
// Two distinct requests
ASSERT_EQ(distinct_requests.size(), 2);
// Verify against backup info file details API
BackupInfo info;
ASSERT_OK(backup_engine_->GetLatestBackupInfo(
&info, /*include_file_details*/ true));
ASSERT_GT(info.file_details.size(), 2);
for (auto& e : info.file_details) {
ASSERT_EQ(expected_temps[e.file_number], e.temperature);
}
// Restore backup to another virtual (tiered) dir
const std::string restore_dir = "/restore" + ToString(i);
ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(
RestoreOptions(), restore_dir, restore_dir));
// Verify restored FS temperatures match expectation
// (FileTemperatureTestFS doesn't distinguish directories when reporting
// current temperatures, just whatever SST was written or overridden last
// with that file number.)
my_db_fs->CopyCurrentSstFileTemperatures(&current_temps);
for (const auto& expected_temp : expected_temps) {
ASSERT_EQ(current_temps[expected_temp.first], expected_temp.second);
}
// Delete backup to force next backup to copy files
ASSERT_OK(backup_engine_->PurgeOldBackups(0));
}
ASSERT_TRUE(has_only_one_warm_sst);
}
} // anon namespace