Add Temperature info in NewSequentialFile()
(#9499)
Summary: Add Temperature hints information from RocksDB in API `NewSequentialFile()`. backup and checkpoint operations need to open the source files with `NewSequentialFile()`, which will have the temperature hints. Other operations are not covered. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9499 Test Plan: Added unittest Reviewed By: pdillinger Differential Revision: D34006115 Pulled By: jay-zhuang fbshipit-source-id: 568b34602b76520e53128672bd07e9d886786a2f
This commit is contained in:
parent
559525dcbb
commit
d3a2f284d9
@ -1182,6 +1182,7 @@ if(WITH_TESTS)
|
||||
db/db_compaction_filter_test.cc
|
||||
db/db_compaction_test.cc
|
||||
db/db_dynamic_level_test.cc
|
||||
db/db_encryption_test.cc
|
||||
db/db_flush_test.cc
|
||||
db/db_inplace_update_test.cc
|
||||
db/db_io_failure_test.cc
|
||||
@ -1208,6 +1209,7 @@ if(WITH_TESTS)
|
||||
db/db_universal_compaction_test.cc
|
||||
db/db_wal_test.cc
|
||||
db/db_with_timestamp_compaction_test.cc
|
||||
db/db_write_buffer_manager_test.cc
|
||||
db/db_write_test.cc
|
||||
db/dbformat_test.cc
|
||||
db/deletefile_test.cc
|
||||
@ -1219,6 +1221,7 @@ if(WITH_TESTS)
|
||||
db/file_indexer_test.cc
|
||||
db/filename_test.cc
|
||||
db/flush_job_test.cc
|
||||
db/import_column_family_test.cc
|
||||
db/listener_test.cc
|
||||
db/log_test.cc
|
||||
db/manual_compaction_test.cc
|
||||
@ -1314,6 +1317,7 @@ if(WITH_TESTS)
|
||||
utilities/cassandra/cassandra_row_merge_test.cc
|
||||
utilities/cassandra/cassandra_serialize_test.cc
|
||||
utilities/checkpoint/checkpoint_test.cc
|
||||
utilities/env_timed_test.cc
|
||||
utilities/memory/memory_test.cc
|
||||
utilities/merge_operators/string_append/stringappend_test.cc
|
||||
utilities/object_registry_test.cc
|
||||
@ -1331,6 +1335,7 @@ if(WITH_TESTS)
|
||||
utilities/transactions/write_unprepared_transaction_test.cc
|
||||
utilities/transactions/lock/range/range_locking_test.cc
|
||||
utilities/ttl/ttl_test.cc
|
||||
utilities/util_merge_operators_test.cc
|
||||
utilities/write_batch_with_index/write_batch_with_index_test.cc
|
||||
)
|
||||
endif()
|
||||
|
@ -73,6 +73,7 @@
|
||||
* `ColumnFamilyOptions::OldDefaults` and `DBOptions::OldDefaults` are marked deprecated, as they are no longer maintained.
|
||||
* Add subcompaction callback APIs: `OnSubcompactionBegin()` and `OnSubcompactionCompleted()`.
|
||||
* Add file Temperature information to `FileOperationInfo` in event listener API.
|
||||
* Add Temperature hints information from RocksDB in API `NewSequentialFile()`. backup and checkpoint operations need to open the source files with `NewSequentialFile()`, which will have the temperature hints. Other operations are not covered.
|
||||
|
||||
### Behavior Changes
|
||||
* Disallow the combination of DBOptions.use_direct_io_for_flush_and_compaction == true and DBOptions.writable_file_max_buffer_size == 0. This combination can cause WritableFileWriter::Append() to loop forever, and it does not make much sense in direct IO.
|
||||
|
169
db/db_test2.cc
169
db/db_test2.cc
@ -21,6 +21,7 @@
|
||||
#include "rocksdb/persistent_cache.h"
|
||||
#include "rocksdb/trace_record.h"
|
||||
#include "rocksdb/trace_record_result.h"
|
||||
#include "rocksdb/utilities/backup_engine.h"
|
||||
#include "rocksdb/utilities/replayer.h"
|
||||
#include "rocksdb/wal_filter.h"
|
||||
#include "test_util/testutil.h"
|
||||
@ -6899,6 +6900,174 @@ TEST_F(DBTest2, LastLevelStatistics) {
|
||||
ASSERT_EQ(options.statistics->getTickerCount(LAST_LEVEL_READ_COUNT),
|
||||
options.statistics->getTickerCount(WARM_FILE_READ_COUNT));
|
||||
}
|
||||
|
||||
class FileTemperatureTestFS : public FileSystemWrapper {
|
||||
public:
|
||||
explicit FileTemperatureTestFS(SpecialEnv* env)
|
||||
: FileSystemWrapper(env->GetFileSystem()) {}
|
||||
|
||||
static const char* kClassName() { return "TestFileSystem"; }
|
||||
const char* Name() const override { return kClassName(); }
|
||||
|
||||
IOStatus NewSequentialFile(const std::string& fname, const FileOptions& opts,
|
||||
std::unique_ptr<FSSequentialFile>* result,
|
||||
IODebugContext* dbg) override {
|
||||
auto filename = GetFileName(fname);
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
auto r = ParseFileName(filename, &number, &type);
|
||||
assert(r);
|
||||
if (type == kTableFile) {
|
||||
auto emplaced =
|
||||
requested_sst_file_temperatures_.emplace(number, opts.temperature);
|
||||
assert(emplaced.second); // assume no duplication
|
||||
}
|
||||
return target()->NewSequentialFile(fname, opts, result, dbg);
|
||||
}
|
||||
|
||||
IOStatus LinkFile(const std::string& s, const std::string& t,
|
||||
const IOOptions& options, IODebugContext* dbg) override {
|
||||
auto filename = GetFileName(s);
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
auto r = ParseFileName(filename, &number, &type);
|
||||
assert(r);
|
||||
// return not supported to force checkpoint copy the file instead of just
|
||||
// link
|
||||
if (type == kTableFile) {
|
||||
return IOStatus::NotSupported();
|
||||
}
|
||||
return target()->LinkFile(s, t, options, dbg);
|
||||
}
|
||||
|
||||
const std::map<uint64_t, Temperature>& RequestedSstFileTemperatures() {
|
||||
return requested_sst_file_temperatures_;
|
||||
}
|
||||
|
||||
void ClearRequestedFileTemperatures() {
|
||||
requested_sst_file_temperatures_.clear();
|
||||
}
|
||||
|
||||
private:
|
||||
std::map<uint64_t, Temperature> requested_sst_file_temperatures_;
|
||||
|
||||
std::string GetFileName(const std::string& fname) {
|
||||
auto filename = fname.substr(fname.find_last_of(kFilePathSeparator) + 1);
|
||||
// workaround only for Windows that the file path could contain both Windows
|
||||
// FilePathSeparator and '/'
|
||||
filename = filename.substr(filename.find_last_of('/') + 1);
|
||||
return filename;
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(DBTest2, BackupFileTemperature) {
|
||||
std::shared_ptr<FileTemperatureTestFS> test_fs =
|
||||
std::make_shared<FileTemperatureTestFS>(env_);
|
||||
std::unique_ptr<Env> backup_env(new CompositeEnvWrapper(env_, test_fs));
|
||||
Options options = CurrentOptions();
|
||||
options.bottommost_temperature = Temperature::kWarm;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
Reopen(options);
|
||||
|
||||
// generate a bottommost file and a non-bottommost file
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
auto size = GetSstSizeHelper(Temperature::kWarm);
|
||||
ASSERT_GT(size, 0);
|
||||
|
||||
std::map<uint64_t, Temperature> temperatures;
|
||||
std::vector<LiveFileStorageInfo> infos;
|
||||
ASSERT_OK(
|
||||
dbfull()->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos));
|
||||
for (auto info : infos) {
|
||||
temperatures.emplace(info.file_number, info.temperature);
|
||||
}
|
||||
BackupEngine* backup_engine;
|
||||
auto backup_options = BackupEngineOptions(
|
||||
dbname_ + kFilePathSeparator + "tempbk", backup_env.get());
|
||||
auto s = BackupEngine::Open(backup_env.get(), backup_options, &backup_engine);
|
||||
ASSERT_OK(s);
|
||||
s = backup_engine->CreateNewBackup(db_);
|
||||
ASSERT_OK(s);
|
||||
|
||||
// checking src file src_temperature hints: 2 sst files: 1 sst is kWarm,
|
||||
// another is kUnknown
|
||||
auto file_temperatures = test_fs->RequestedSstFileTemperatures();
|
||||
ASSERT_EQ(file_temperatures.size(), 2);
|
||||
bool has_only_one_warm_sst = false;
|
||||
for (const auto& file_temperature : file_temperatures) {
|
||||
ASSERT_EQ(temperatures.at(file_temperature.first), file_temperature.second);
|
||||
if (file_temperature.second == Temperature::kWarm) {
|
||||
ASSERT_FALSE(has_only_one_warm_sst);
|
||||
has_only_one_warm_sst = true;
|
||||
}
|
||||
}
|
||||
ASSERT_TRUE(has_only_one_warm_sst);
|
||||
Close();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, CheckpointFileTemperature) {
|
||||
std::shared_ptr<FileTemperatureTestFS> test_fs =
|
||||
std::make_shared<FileTemperatureTestFS>(env_);
|
||||
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, test_fs));
|
||||
Options options = CurrentOptions();
|
||||
options.bottommost_temperature = Temperature::kWarm;
|
||||
options.level0_file_num_compaction_trigger = 2;
|
||||
options.env = env.get();
|
||||
Reopen(options);
|
||||
|
||||
// generate a bottommost file and a non-bottommost file
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
ASSERT_OK(dbfull()->TEST_WaitForCompact());
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_OK(Put("bar", "bar"));
|
||||
ASSERT_OK(Flush());
|
||||
auto size = GetSstSizeHelper(Temperature::kWarm);
|
||||
ASSERT_GT(size, 0);
|
||||
|
||||
std::map<uint64_t, Temperature> temperatures;
|
||||
std::vector<LiveFileStorageInfo> infos;
|
||||
ASSERT_OK(
|
||||
dbfull()->GetLiveFilesStorageInfo(LiveFilesStorageInfoOptions(), &infos));
|
||||
for (auto info : infos) {
|
||||
temperatures.emplace(info.file_number, info.temperature);
|
||||
}
|
||||
|
||||
test_fs->ClearRequestedFileTemperatures();
|
||||
Checkpoint* checkpoint;
|
||||
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
|
||||
ASSERT_OK(
|
||||
checkpoint->CreateCheckpoint(dbname_ + kFilePathSeparator + "tempcp"));
|
||||
|
||||
// checking src file src_temperature hints: 2 sst files: 1 sst is kWarm,
|
||||
// another is kUnknown
|
||||
auto file_temperatures = test_fs->RequestedSstFileTemperatures();
|
||||
ASSERT_EQ(file_temperatures.size(), 2);
|
||||
bool has_only_one_warm_sst = false;
|
||||
for (const auto& file_temperature : file_temperatures) {
|
||||
ASSERT_EQ(temperatures.at(file_temperature.first), file_temperature.second);
|
||||
if (file_temperature.second == Temperature::kWarm) {
|
||||
ASSERT_FALSE(has_only_one_warm_sst);
|
||||
has_only_one_warm_sst = true;
|
||||
}
|
||||
}
|
||||
ASSERT_TRUE(has_only_one_warm_sst);
|
||||
delete checkpoint;
|
||||
Close();
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.
|
||||
|
@ -151,8 +151,9 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
|
||||
nullptr);
|
||||
// CopyFile also sync the new file.
|
||||
status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync, io_tracer_);
|
||||
status =
|
||||
CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
|
||||
}
|
||||
TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
|
||||
if (!status.ok()) {
|
||||
|
@ -100,8 +100,9 @@ Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
|
||||
}
|
||||
}
|
||||
if (!hardlink_files) {
|
||||
status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync, io_tracer_);
|
||||
status =
|
||||
CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
|
||||
}
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
|
@ -269,9 +269,9 @@ Status FileExpectedStateManager::SaveAtAndAfter(DB* db) {
|
||||
|
||||
// Populate a tempfile and then rename it to atomically create "<seqno>.state"
|
||||
// with contents from "LATEST.state"
|
||||
Status s =
|
||||
CopyFile(FileSystem::Default(), latest_file_path, state_file_temp_path,
|
||||
0 /* size */, false /* use_fsync */);
|
||||
Status s = CopyFile(FileSystem::Default(), latest_file_path,
|
||||
state_file_temp_path, 0 /* size */, false /* use_fsync */,
|
||||
nullptr /* io_tracer */, Temperature::kUnknown);
|
||||
if (s.ok()) {
|
||||
s = FileSystem::Default()->RenameFile(state_file_temp_path, state_file_path,
|
||||
IOOptions(), nullptr /* dbg */);
|
||||
@ -481,7 +481,8 @@ Status FileExpectedStateManager::Restore(DB* db) {
|
||||
// "LATEST.state". Start off by creating a tempfile so we can later make the
|
||||
// new "LATEST.state" appear atomically using `RenameFile()`.
|
||||
s = CopyFile(FileSystem::Default(), state_file_path, latest_file_temp_path,
|
||||
0 /* size */, false /* use_fsync */);
|
||||
0 /* size */, false /* use_fsync */, nullptr /* io_tracer */,
|
||||
Temperature::kUnknown);
|
||||
}
|
||||
|
||||
{
|
||||
|
@ -19,13 +19,15 @@ namespace ROCKSDB_NAMESPACE {
|
||||
// Utility function to copy a file up to a specified length
|
||||
IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
||||
const std::string& destination, uint64_t size, bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer) {
|
||||
const FileOptions soptions;
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature) {
|
||||
FileOptions soptions;
|
||||
IOStatus io_s;
|
||||
std::unique_ptr<SequentialFileReader> src_reader;
|
||||
std::unique_ptr<WritableFileWriter> dest_writer;
|
||||
|
||||
{
|
||||
soptions.temperature = temperature;
|
||||
std::unique_ptr<FSSequentialFile> srcfile;
|
||||
io_s = fs->NewSequentialFile(source, soptions, &srcfile, nullptr);
|
||||
if (!io_s.ok()) {
|
||||
|
@ -22,15 +22,17 @@ namespace ROCKSDB_NAMESPACE {
|
||||
extern IOStatus CopyFile(FileSystem* fs, const std::string& source,
|
||||
const std::string& destination, uint64_t size,
|
||||
bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer = nullptr);
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature);
|
||||
inline IOStatus CopyFile(const std::shared_ptr<FileSystem>& fs,
|
||||
const std::string& source,
|
||||
const std::string& destination, uint64_t size,
|
||||
bool use_fsync,
|
||||
const std::shared_ptr<IOTracer>& io_tracer = nullptr) {
|
||||
return CopyFile(fs.get(), source, destination, size, use_fsync, io_tracer);
|
||||
const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const Temperature temperature) {
|
||||
return CopyFile(fs.get(), source, destination, size, use_fsync, io_tracer,
|
||||
temperature);
|
||||
}
|
||||
|
||||
extern IOStatus CreateFile(FileSystem* fs, const std::string& destination,
|
||||
const std::string& contents, bool use_fsync);
|
||||
|
||||
|
@ -552,6 +552,7 @@ class BackupEngineImpl {
|
||||
const EnvOptions& src_env_options, bool sync,
|
||||
RateLimiter* rate_limiter,
|
||||
std::function<void()> progress_callback,
|
||||
const Temperature src_temperature,
|
||||
uint64_t* bytes_toward_next_callback,
|
||||
uint64_t* size, std::string* checksum_hex);
|
||||
|
||||
@ -559,7 +560,8 @@ class BackupEngineImpl {
|
||||
const std::shared_ptr<FileSystem>& src_fs,
|
||||
const EnvOptions& src_env_options,
|
||||
uint64_t size_limit,
|
||||
std::string* checksum_hex) const;
|
||||
std::string* checksum_hex,
|
||||
const Temperature src_temperature) const;
|
||||
|
||||
// Obtain db_id and db_session_id from the table properties of file_path
|
||||
Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
|
||||
@ -605,6 +607,7 @@ class BackupEngineImpl {
|
||||
std::string src_checksum_hex;
|
||||
std::string db_id;
|
||||
std::string db_session_id;
|
||||
Temperature src_temperature;
|
||||
|
||||
CopyOrCreateWorkItem()
|
||||
: src_path(""),
|
||||
@ -620,7 +623,8 @@ class BackupEngineImpl {
|
||||
src_checksum_func_name(kUnknownFileChecksumFuncName),
|
||||
src_checksum_hex(""),
|
||||
db_id(""),
|
||||
db_session_id("") {}
|
||||
db_session_id(""),
|
||||
src_temperature(Temperature::kUnknown) {}
|
||||
|
||||
CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
|
||||
CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
|
||||
@ -646,6 +650,7 @@ class BackupEngineImpl {
|
||||
src_checksum_hex = std::move(o.src_checksum_hex);
|
||||
db_id = std::move(o.db_id);
|
||||
db_session_id = std::move(o.db_session_id);
|
||||
src_temperature = o.src_temperature;
|
||||
return *this;
|
||||
}
|
||||
|
||||
@ -657,7 +662,8 @@ class BackupEngineImpl {
|
||||
const std::string& _src_checksum_func_name =
|
||||
kUnknownFileChecksumFuncName,
|
||||
const std::string& _src_checksum_hex = "",
|
||||
const std::string& _db_id = "", const std::string& _db_session_id = "")
|
||||
const std::string& _db_id = "", const std::string& _db_session_id = "",
|
||||
const Temperature _src_temperature = Temperature::kUnknown)
|
||||
: src_path(std::move(_src_path)),
|
||||
dst_path(std::move(_dst_path)),
|
||||
contents(std::move(_contents)),
|
||||
@ -672,7 +678,8 @@ class BackupEngineImpl {
|
||||
src_checksum_func_name(_src_checksum_func_name),
|
||||
src_checksum_hex(_src_checksum_hex),
|
||||
db_id(_db_id),
|
||||
db_session_id(_db_session_id) {}
|
||||
db_session_id(_db_session_id),
|
||||
src_temperature(_src_temperature) {}
|
||||
};
|
||||
|
||||
struct BackupAfterCopyOrCreateWorkItem {
|
||||
@ -780,7 +787,8 @@ class BackupEngineImpl {
|
||||
std::function<void()> progress_callback = []() {},
|
||||
const std::string& contents = std::string(),
|
||||
const std::string& src_checksum_func_name = kUnknownFileChecksumFuncName,
|
||||
const std::string& src_checksum_str = kUnknownFileChecksum);
|
||||
const std::string& src_checksum_str = kUnknownFileChecksum,
|
||||
const Temperature src_temperature = Temperature::kUnknown);
|
||||
|
||||
// backup state data
|
||||
BackupID latest_backup_id_;
|
||||
@ -1194,8 +1202,8 @@ IOStatus BackupEngineImpl::Initialize() {
|
||||
work_item.src_path, work_item.dst_path, work_item.contents,
|
||||
work_item.size_limit, work_item.src_env, work_item.dst_env,
|
||||
work_item.src_env_options, work_item.sync, work_item.rate_limiter,
|
||||
work_item.progress_callback, &bytes_toward_next_callback,
|
||||
&result.size, &result.checksum_hex);
|
||||
work_item.progress_callback, work_item.src_temperature,
|
||||
&bytes_toward_next_callback, &result.size, &result.checksum_hex);
|
||||
|
||||
RecordTick(work_item.stats, BACKUP_READ_BYTES,
|
||||
IOSTATS(bytes_read) - prev_bytes_read);
|
||||
@ -1238,7 +1246,6 @@ IOStatus BackupEngineImpl::Initialize() {
|
||||
});
|
||||
}
|
||||
ROCKS_LOG_INFO(options_.info_log, "Initialized BackupEngine");
|
||||
|
||||
return IOStatus::OK();
|
||||
}
|
||||
|
||||
@ -1344,7 +1351,8 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
|
||||
[&](const std::string& src_dirname, const std::string& fname,
|
||||
uint64_t size_limit_bytes, FileType type,
|
||||
const std::string& checksum_func_name,
|
||||
const std::string& checksum_val) {
|
||||
const std::string& checksum_val,
|
||||
const Temperature src_temperature) {
|
||||
if (type == kWalFile && !options_.backup_log_files) {
|
||||
return IOStatus::OK();
|
||||
}
|
||||
@ -1390,7 +1398,7 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
|
||||
options_.share_files_with_checksum &&
|
||||
(type == kTableFile || type == kBlobFile),
|
||||
options.progress_callback, "" /* contents */,
|
||||
checksum_func_name, checksum_val);
|
||||
checksum_func_name, checksum_val, src_temperature);
|
||||
}
|
||||
return io_st;
|
||||
} /* copy_file_cb */,
|
||||
@ -1933,9 +1941,9 @@ IOStatus BackupEngineImpl::VerifyBackup(BackupID backup_id,
|
||||
std::string checksum_hex;
|
||||
ROCKS_LOG_INFO(options_.info_log, "Verifying %s checksum...\n",
|
||||
abs_path.c_str());
|
||||
IOStatus io_s =
|
||||
ReadFileAndComputeChecksum(abs_path, backup_fs_, EnvOptions(),
|
||||
0 /* size_limit */, &checksum_hex);
|
||||
IOStatus io_s = ReadFileAndComputeChecksum(
|
||||
abs_path, backup_fs_, EnvOptions(), 0 /* size_limit */, &checksum_hex,
|
||||
Temperature::kUnknown);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
} else if (file_info->checksum_hex != checksum_hex) {
|
||||
@ -1954,7 +1962,7 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
|
||||
const std::string& src, const std::string& dst, const std::string& contents,
|
||||
uint64_t size_limit, Env* src_env, Env* dst_env,
|
||||
const EnvOptions& src_env_options, bool sync, RateLimiter* rate_limiter,
|
||||
std::function<void()> progress_callback,
|
||||
std::function<void()> progress_callback, const Temperature src_temperature,
|
||||
uint64_t* bytes_toward_next_callback, uint64_t* size,
|
||||
std::string* checksum_hex) {
|
||||
assert(src.empty() != contents.empty());
|
||||
@ -1977,8 +1985,10 @@ IOStatus BackupEngineImpl::CopyOrCreateFile(
|
||||
io_s = dst_env->GetFileSystem()->NewWritableFile(dst, dst_file_options,
|
||||
&dst_file, nullptr);
|
||||
if (io_s.ok() && !src.empty()) {
|
||||
io_s = src_env->GetFileSystem()->NewSequentialFile(
|
||||
src, FileOptions(src_env_options), &src_file, nullptr);
|
||||
auto src_file_options = FileOptions(src_env_options);
|
||||
src_file_options.temperature = src_temperature;
|
||||
io_s = src_env->GetFileSystem()->NewSequentialFile(src, src_file_options,
|
||||
&src_file, nullptr);
|
||||
}
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
@ -2074,7 +2084,7 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
|
||||
Statistics* stats, uint64_t size_limit, bool shared_checksum,
|
||||
std::function<void()> progress_callback, const std::string& contents,
|
||||
const std::string& src_checksum_func_name,
|
||||
const std::string& src_checksum_str) {
|
||||
const std::string& src_checksum_str, const Temperature src_temperature) {
|
||||
assert(contents.empty() != src_dir.empty());
|
||||
|
||||
std::string src_path = src_dir + "/" + fname;
|
||||
@ -2121,7 +2131,8 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
|
||||
// the shared_checksum directory.
|
||||
if (checksum_hex.empty() && db_session_id.empty()) {
|
||||
IOStatus io_s = ReadFileAndComputeChecksum(
|
||||
src_path, db_fs_, src_env_options, size_limit, &checksum_hex);
|
||||
src_path, db_fs_, src_env_options, size_limit, &checksum_hex,
|
||||
src_temperature);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
@ -2240,7 +2251,8 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
|
||||
// BackupMeta::AddFile.
|
||||
} else {
|
||||
IOStatus io_s = ReadFileAndComputeChecksum(
|
||||
src_path, db_fs_, src_env_options, size_limit, &checksum_hex);
|
||||
src_path, db_fs_, src_env_options, size_limit, &checksum_hex,
|
||||
src_temperature);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
@ -2269,7 +2281,7 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
|
||||
src_dir.empty() ? "" : src_path, *copy_dest_path, contents, db_env_,
|
||||
backup_env_, src_env_options, options_.sync, rate_limiter, size_limit,
|
||||
stats, progress_callback, src_checksum_func_name, checksum_hex, db_id,
|
||||
db_session_id);
|
||||
db_session_id, src_temperature);
|
||||
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
|
||||
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
|
||||
backup_env_, temp_dest_path, final_dest_path, dst_relative);
|
||||
@ -2295,7 +2307,7 @@ IOStatus BackupEngineImpl::AddBackupFileWorkItem(
|
||||
IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
|
||||
const std::string& src, const std::shared_ptr<FileSystem>& src_fs,
|
||||
const EnvOptions& src_env_options, uint64_t size_limit,
|
||||
std::string* checksum_hex) const {
|
||||
std::string* checksum_hex, const Temperature src_temperature) const {
|
||||
if (checksum_hex == nullptr) {
|
||||
return status_to_io_status(Status::Aborted("Checksum pointer is null"));
|
||||
}
|
||||
@ -2305,8 +2317,10 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum(
|
||||
}
|
||||
|
||||
std::unique_ptr<SequentialFileReader> src_reader;
|
||||
IOStatus io_s = SequentialFileReader::Create(
|
||||
src_fs, src, FileOptions(src_env_options), &src_reader, nullptr);
|
||||
auto file_options = FileOptions(src_env_options);
|
||||
file_options.temperature = src_temperature;
|
||||
IOStatus io_s = SequentialFileReader::Create(src_fs, src, file_options,
|
||||
&src_reader, nullptr);
|
||||
if (!io_s.ok()) {
|
||||
return io_s;
|
||||
}
|
||||
|
@ -132,11 +132,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
||||
[&](const std::string& src_dirname, const std::string& fname,
|
||||
uint64_t size_limit_bytes, FileType,
|
||||
const std::string& /* checksum_func_name */,
|
||||
const std::string& /* checksum_val */) {
|
||||
const std::string& /* checksum_val */,
|
||||
const Temperature temperature) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
|
||||
return CopyFile(db_->GetFileSystem(), src_dirname + "/" + fname,
|
||||
full_private_path + "/" + fname, size_limit_bytes,
|
||||
db_options.use_fsync);
|
||||
db_options.use_fsync, nullptr, temperature);
|
||||
} /* copy_file_cb */,
|
||||
[&](const std::string& fname, const std::string& contents, FileType) {
|
||||
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
|
||||
@ -191,10 +192,11 @@ Status CheckpointImpl::CreateCustomCheckpoint(
|
||||
std::function<Status(const std::string& src_dirname,
|
||||
const std::string& src_fname, FileType type)>
|
||||
link_file_cb,
|
||||
std::function<Status(
|
||||
const std::string& src_dirname, const std::string& src_fname,
|
||||
uint64_t size_limit_bytes, FileType type,
|
||||
const std::string& checksum_func_name, const std::string& checksum_val)>
|
||||
std::function<
|
||||
Status(const std::string& src_dirname, const std::string& src_fname,
|
||||
uint64_t size_limit_bytes, FileType type,
|
||||
const std::string& checksum_func_name,
|
||||
const std::string& checksum_val, const Temperature temperature)>
|
||||
copy_file_cb,
|
||||
std::function<Status(const std::string& fname, const std::string& contents,
|
||||
FileType type)>
|
||||
@ -261,11 +263,11 @@ Status CheckpointImpl::CreateCustomCheckpoint(
|
||||
if (opts.include_checksum_info) {
|
||||
s = copy_file_cb(info.directory, info.relative_filename, info.size,
|
||||
info.file_type, info.file_checksum_func_name,
|
||||
info.file_checksum);
|
||||
info.file_checksum, info.temperature);
|
||||
} else {
|
||||
s = copy_file_cb(info.directory, info.relative_filename, info.size,
|
||||
info.file_type, kUnknownFileChecksumFuncName,
|
||||
kUnknownFileChecksum);
|
||||
kUnknownFileChecksum, info.temperature);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -332,7 +334,8 @@ Status CheckpointImpl::ExportColumnFamily(
|
||||
ROCKS_LOG_INFO(db_options.info_log, "[%s] Copying %s",
|
||||
cf_name.c_str(), fname.c_str());
|
||||
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
|
||||
tmp_export_dir + fname, 0, db_options.use_fsync);
|
||||
tmp_export_dir + fname, 0, db_options.use_fsync,
|
||||
nullptr, Temperature::kUnknown);
|
||||
} /*copy_file_cb*/);
|
||||
|
||||
const auto enable_status = db_->EnableFileDeletions(false /*force*/);
|
||||
|
@ -35,7 +35,8 @@ class CheckpointImpl : public Checkpoint {
|
||||
std::function<Status(const std::string& src_dirname,
|
||||
const std::string& fname, uint64_t size_limit_bytes,
|
||||
FileType type, const std::string& checksum_func_name,
|
||||
const std::string& checksum_val)>
|
||||
const std::string& checksum_val,
|
||||
const Temperature src_temperature)>
|
||||
copy_file_cb,
|
||||
std::function<Status(const std::string& fname,
|
||||
const std::string& contents, FileType type)>
|
||||
|
Loading…
x
Reference in New Issue
Block a user