Real fix for race in backup custom checksum checking (#7309)
Summary: This is a "real" fix for the issue worked around in https://github.com/facebook/rocksdb/issues/7294. To get DB checksum info for live files, we now read the manifest file that will become part of the checkpoint/backup. This requires a little extra handling in taking a custom checkpoint, including only reading the manifest file up to the size prescribed by the checkpoint. This moves GetFileChecksumsFromManifest from backup code to file_checksum_helper.{h,cc} and removes apparently unnecessary checking related to column families. Updated HISTORY.md and warned potential future users of DB::GetLiveFilesChecksumInfo() Pull Request resolved: https://github.com/facebook/rocksdb/pull/7309 Test Plan: updated unit test, before and after Reviewed By: ajkr Differential Revision: D23311994 Pulled By: pdillinger fbshipit-source-id: 741e30a2dc1830e8208f7648fcc8c5f000d4e2d5
This commit is contained in:
parent
722814e357
commit
9aad24da55
@ -5,7 +5,7 @@
|
|||||||
* Fix a possible corruption to the LSM state (overlapping files within a level) when a `CompactRange()` for refitting levels (`CompactRangeOptions::change_level == true`) and another manual compaction are executed in parallel.
|
* Fix a possible corruption to the LSM state (overlapping files within a level) when a `CompactRange()` for refitting levels (`CompactRangeOptions::change_level == true`) and another manual compaction are executed in parallel.
|
||||||
* Sanitize `recycle_log_file_num` to zero when the user attempts to enable it in combination with `WALRecoveryMode::kTolerateCorruptedTailRecords`. Previously the two features were allowed together, which compromised the user's configured crash-recovery guarantees.
|
* Sanitize `recycle_log_file_num` to zero when the user attempts to enable it in combination with `WALRecoveryMode::kTolerateCorruptedTailRecords`. Previously the two features were allowed together, which compromised the user's configured crash-recovery guarantees.
|
||||||
* Fix a bug where a level refitting in CompactRange() might race with an automatic compaction that puts the data to the target level of the refitting. The bug has been there for years.
|
* Fix a bug where a level refitting in CompactRange() might race with an automatic compaction that puts the data to the target level of the refitting. The bug has been there for years.
|
||||||
* BackupEngine::CreateNewBackup could fail intermittently with non-OK status when backing up a read-write DB configured with a DBOptions::file_checksum_gen_factory. This issue has been worked-around such that CreateNewBackup should succeed, but (until fully fixed) BackupEngine might not see all checksums available in the DB.
|
* Fixed a bug in version 6.12 in which BackupEngine::CreateNewBackup could fail intermittently with non-OK status when backing up a read-write DB configured with a DBOptions::file_checksum_gen_factory.
|
||||||
* Fix a bug where immutable flushed memtable is never destroyed because a memtable is not added to delete list because of refernce hold by super version and super version doesn't switch because of empty delete list. So memory usage increases beyond write_buffer_size + max_write_buffer_size_to_maintain.
|
* Fix a bug where immutable flushed memtable is never destroyed because a memtable is not added to delete list because of refernce hold by super version and super version doesn't switch because of empty delete list. So memory usage increases beyond write_buffer_size + max_write_buffer_size_to_maintain.
|
||||||
* Fix useless no-op compactions scheduled upon snapshot release when options.disable-auto-compactions = true.
|
* Fix useless no-op compactions scheduled upon snapshot release when options.disable-auto-compactions = true.
|
||||||
|
|
||||||
|
@ -202,6 +202,10 @@ uint64_t Reader::LastRecordOffset() {
|
|||||||
return last_record_offset_;
|
return last_record_offset_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t Reader::LastRecordEnd() {
|
||||||
|
return end_of_buffer_offset_ - buffer_.size();
|
||||||
|
}
|
||||||
|
|
||||||
void Reader::UnmarkEOF() {
|
void Reader::UnmarkEOF() {
|
||||||
if (read_error_) {
|
if (read_error_) {
|
||||||
return;
|
return;
|
||||||
|
@ -71,6 +71,11 @@ class Reader {
|
|||||||
// Undefined before the first call to ReadRecord.
|
// Undefined before the first call to ReadRecord.
|
||||||
uint64_t LastRecordOffset();
|
uint64_t LastRecordOffset();
|
||||||
|
|
||||||
|
// Returns the first physical offset after the last record returned by
|
||||||
|
// ReadRecord, or zero before first call to ReadRecord. This can also be
|
||||||
|
// thought of as the "current" position in processing the file bytes.
|
||||||
|
uint64_t LastRecordEnd();
|
||||||
|
|
||||||
// returns true if the reader has encountered an eof condition.
|
// returns true if the reader has encountered an eof condition.
|
||||||
bool IsEOF() {
|
bool IsEOF() {
|
||||||
return eof_;
|
return eof_;
|
||||||
|
@ -1354,7 +1354,9 @@ class DB {
|
|||||||
virtual void GetLiveFilesMetaData(
|
virtual void GetLiveFilesMetaData(
|
||||||
std::vector<LiveFileMetaData>* /*metadata*/) {}
|
std::vector<LiveFileMetaData>* /*metadata*/) {}
|
||||||
|
|
||||||
// Return a list of all table checksum info
|
// Return a list of all table file checksum info.
|
||||||
|
// Note: This function might be of limited use because it cannot be
|
||||||
|
// synchronized with GetLiveFiles.
|
||||||
virtual Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) = 0;
|
virtual Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) = 0;
|
||||||
|
|
||||||
// Obtains the meta data of the specified column family of the DB.
|
// Obtains the meta data of the specified column family of the DB.
|
||||||
|
@ -9,6 +9,12 @@
|
|||||||
|
|
||||||
#include "util/file_checksum_helper.h"
|
#include "util/file_checksum_helper.h"
|
||||||
|
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
|
#include "db/log_reader.h"
|
||||||
|
#include "db/version_edit.h"
|
||||||
|
#include "file/sequence_file_reader.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
void FileChecksumListImpl::reset() { checksum_map_.clear(); }
|
void FileChecksumListImpl::reset() { checksum_map_.clear(); }
|
||||||
@ -83,4 +89,66 @@ std::shared_ptr<FileChecksumGenFactory> GetFileChecksumGenCrc32cFactory() {
|
|||||||
return default_crc32c_gen_factory;
|
return default_crc32c_gen_factory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status GetFileChecksumsFromManifest(Env* src_env, const std::string& abs_path,
|
||||||
|
uint64_t manifest_file_size,
|
||||||
|
FileChecksumList* checksum_list) {
|
||||||
|
if (checksum_list == nullptr) {
|
||||||
|
return Status::InvalidArgument("checksum_list is nullptr");
|
||||||
|
}
|
||||||
|
|
||||||
|
checksum_list->reset();
|
||||||
|
Status s;
|
||||||
|
|
||||||
|
std::unique_ptr<SequentialFileReader> file_reader;
|
||||||
|
{
|
||||||
|
std::unique_ptr<FSSequentialFile> file;
|
||||||
|
const std::shared_ptr<FileSystem>& fs = src_env->GetFileSystem();
|
||||||
|
s = fs->NewSequentialFile(abs_path,
|
||||||
|
fs->OptimizeForManifestRead(FileOptions()), &file,
|
||||||
|
nullptr /* dbg */);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
file_reader.reset(new SequentialFileReader(std::move(file), abs_path));
|
||||||
|
}
|
||||||
|
|
||||||
|
struct LogReporter : public log::Reader::Reporter {
|
||||||
|
Status* status_ptr;
|
||||||
|
virtual void Corruption(size_t /*bytes*/, const Status& st) override {
|
||||||
|
if (status_ptr->ok()) {
|
||||||
|
*status_ptr = st;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} reporter;
|
||||||
|
reporter.status_ptr = &s;
|
||||||
|
log::Reader reader(nullptr, std::move(file_reader), &reporter,
|
||||||
|
true /* checksum */, 0 /* log_number */);
|
||||||
|
Slice record;
|
||||||
|
std::string scratch;
|
||||||
|
while (reader.LastRecordEnd() < manifest_file_size &&
|
||||||
|
reader.ReadRecord(&record, &scratch) && s.ok()) {
|
||||||
|
VersionEdit edit;
|
||||||
|
s = edit.DecodeFrom(record);
|
||||||
|
if (!s.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the deleted files from the checksum_list
|
||||||
|
for (const auto& deleted_file : edit.GetDeletedFiles()) {
|
||||||
|
checksum_list->RemoveOneFileChecksum(deleted_file.second);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the new files to the checksum_list
|
||||||
|
for (const auto& new_file : edit.GetNewFiles()) {
|
||||||
|
checksum_list->InsertOneFileChecksum(
|
||||||
|
new_file.second.fd.GetNumber(), new_file.second.file_checksum,
|
||||||
|
new_file.second.file_checksum_func_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert(!s.ok() ||
|
||||||
|
manifest_file_size == std::numeric_limits<uint64_t>::max() ||
|
||||||
|
reader.LastRecordEnd() == manifest_file_size);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
@ -89,4 +89,10 @@ class FileChecksumListImpl : public FileChecksumList {
|
|||||||
checksum_map_;
|
checksum_map_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// If manifest_file_size < std::numeric_limits<uint64_t>::max(), only use
|
||||||
|
// that length prefix of the manifest file.
|
||||||
|
Status GetFileChecksumsFromManifest(Env* src_env, const std::string& abs_path,
|
||||||
|
uint64_t manifest_file_size,
|
||||||
|
FileChecksumList* checksum_list);
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
@ -42,6 +42,7 @@
|
|||||||
#include "util/channel.h"
|
#include "util/channel.h"
|
||||||
#include "util/coding.h"
|
#include "util/coding.h"
|
||||||
#include "util/crc32c.h"
|
#include "util/crc32c.h"
|
||||||
|
#include "util/file_checksum_helper.h"
|
||||||
#include "util/string_util.h"
|
#include "util/string_util.h"
|
||||||
#include "utilities/checkpoint/checkpoint_impl.h"
|
#include "utilities/checkpoint/checkpoint_impl.h"
|
||||||
|
|
||||||
@ -485,9 +486,6 @@ class BackupEngineImpl : public BackupEngine {
|
|||||||
const BackupMeta* backup,
|
const BackupMeta* backup,
|
||||||
FileChecksumList* checksum_list);
|
FileChecksumList* checksum_list);
|
||||||
|
|
||||||
Status GetFileChecksumsFromManifest(Env* src_env, const std::string& abs_path,
|
|
||||||
FileChecksumList* checksum_list);
|
|
||||||
|
|
||||||
Status VerifyFileWithCrc32c(Env* src_env, const BackupMeta* backup,
|
Status VerifyFileWithCrc32c(Env* src_env, const BackupMeta* backup,
|
||||||
const std::string& rel_path);
|
const std::string& rel_path);
|
||||||
|
|
||||||
@ -2159,6 +2157,7 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Status BackupEngineImpl::GetFileChecksumsFromManifestInBackup(
|
Status BackupEngineImpl::GetFileChecksumsFromManifestInBackup(
|
||||||
Env* src_env, const BackupID& backup_id, const BackupMeta* backup,
|
Env* src_env, const BackupID& backup_id, const BackupMeta* backup,
|
||||||
FileChecksumList* checksum_list) {
|
FileChecksumList* checksum_list) {
|
||||||
@ -2197,87 +2196,11 @@ Status BackupEngineImpl::GetFileChecksumsFromManifestInBackup(
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
s = GetFileChecksumsFromManifest(src_env, GetAbsolutePath(manifest_rel_path),
|
// Read whole manifest file in backup
|
||||||
checksum_list);
|
s = GetFileChecksumsFromManifest(
|
||||||
return s;
|
src_env, GetAbsolutePath(manifest_rel_path),
|
||||||
}
|
std::numeric_limits<uint64_t>::max() /*manifest_file_size*/,
|
||||||
|
checksum_list);
|
||||||
Status BackupEngineImpl::GetFileChecksumsFromManifest(
|
|
||||||
Env* src_env, const std::string& abs_path,
|
|
||||||
FileChecksumList* checksum_list) {
|
|
||||||
if (checksum_list == nullptr) {
|
|
||||||
return Status::InvalidArgument("checksum_list is nullptr");
|
|
||||||
}
|
|
||||||
|
|
||||||
checksum_list->reset();
|
|
||||||
Status s;
|
|
||||||
|
|
||||||
std::unique_ptr<SequentialFileReader> file_reader;
|
|
||||||
{
|
|
||||||
std::unique_ptr<FSSequentialFile> file;
|
|
||||||
const std::shared_ptr<FileSystem>& fs = src_env->GetFileSystem();
|
|
||||||
s = fs->NewSequentialFile(abs_path,
|
|
||||||
fs->OptimizeForManifestRead(FileOptions()), &file,
|
|
||||||
nullptr /* dbg */);
|
|
||||||
if (!s.ok()) {
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
file_reader.reset(new SequentialFileReader(std::move(file), abs_path));
|
|
||||||
}
|
|
||||||
|
|
||||||
LogReporter reporter;
|
|
||||||
reporter.status = &s;
|
|
||||||
log::Reader reader(nullptr, std::move(file_reader), &reporter,
|
|
||||||
true /* checksum */, 0 /* log_number */);
|
|
||||||
Slice record;
|
|
||||||
std::string scratch;
|
|
||||||
// Set of column families initialized with default CF
|
|
||||||
std::unordered_set<uint32_t> cf_set = {0};
|
|
||||||
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
|
|
||||||
VersionEdit edit;
|
|
||||||
s = edit.DecodeFrom(record);
|
|
||||||
if (!s.ok()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
// Check current CF status
|
|
||||||
uint32_t column_family = edit.GetColumnFamily();
|
|
||||||
auto cf_set_itr = cf_set.find(column_family);
|
|
||||||
bool cf_exist = (cf_set_itr != cf_set.end());
|
|
||||||
if (edit.IsColumnFamilyAdd()) {
|
|
||||||
if (cf_exist) {
|
|
||||||
s = Status::Corruption("Manifest adding the same column family twice");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
cf_set.insert(column_family);
|
|
||||||
} else if (edit.IsColumnFamilyDrop()) {
|
|
||||||
if (!cf_exist) {
|
|
||||||
s = Status::Corruption(
|
|
||||||
"Manifest dropping non-existing column family: " +
|
|
||||||
ToString(column_family));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
cf_set.erase(cf_set_itr);
|
|
||||||
} else {
|
|
||||||
if (!cf_exist) {
|
|
||||||
s = Status::Corruption("Manifest referencing unknown column family: " +
|
|
||||||
ToString(column_family));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
assert(cf_set.find(column_family) != cf_set.end());
|
|
||||||
|
|
||||||
// Remove the deleted files from the checksum_list
|
|
||||||
for (const auto& deleted_file : edit.GetDeletedFiles()) {
|
|
||||||
checksum_list->RemoveOneFileChecksum(deleted_file.second);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the new files to the checksum_list
|
|
||||||
for (const auto& new_file : edit.GetNewFiles()) {
|
|
||||||
checksum_list->InsertOneFileChecksum(
|
|
||||||
new_file.second.fd.GetNumber(), new_file.second.file_checksum,
|
|
||||||
new_file.second.file_checksum_func_name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
#include <regex>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
@ -1823,13 +1824,13 @@ TEST_F(BackupableDBTest, FlushCompactDuringBackupCheckpoint) {
|
|||||||
// That FillDB leaves a mix of flushed and unflushed data
|
// That FillDB leaves a mix of flushed and unflushed data
|
||||||
SyncPoint::GetInstance()->LoadDependency(
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
{{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1",
|
{{"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1",
|
||||||
"BackupableDBTest::FlushDuringBackupCheckpoint:BeforeFlush"},
|
"BackupableDBTest::FlushCompactDuringBackupCheckpoint:Before"},
|
||||||
{"BackupableDBTest::FlushDuringBackupCheckpoint:AfterFlush",
|
{"BackupableDBTest::FlushCompactDuringBackupCheckpoint:After",
|
||||||
"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"}});
|
"CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2"}});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
ROCKSDB_NAMESPACE::port::Thread flush_thread{[this]() {
|
ROCKSDB_NAMESPACE::port::Thread flush_thread{[this]() {
|
||||||
TEST_SYNC_POINT(
|
TEST_SYNC_POINT(
|
||||||
"BackupableDBTest::FlushDuringBackupCheckpoint:BeforeFlush");
|
"BackupableDBTest::FlushCompactDuringBackupCheckpoint:Before");
|
||||||
FillDB(db_.get(), keys_iteration, 2 * keys_iteration);
|
FillDB(db_.get(), keys_iteration, 2 * keys_iteration);
|
||||||
ASSERT_OK(db_->Flush(FlushOptions()));
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||||
DBImpl* dbi = static_cast<DBImpl*>(db_.get());
|
DBImpl* dbi = static_cast<DBImpl*>(db_.get());
|
||||||
@ -1837,11 +1838,28 @@ TEST_F(BackupableDBTest, FlushCompactDuringBackupCheckpoint) {
|
|||||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
||||||
dbi->TEST_WaitForCompact();
|
dbi->TEST_WaitForCompact();
|
||||||
TEST_SYNC_POINT(
|
TEST_SYNC_POINT(
|
||||||
"BackupableDBTest::FlushDuringBackupCheckpoint:AfterFlush");
|
"BackupableDBTest::FlushCompactDuringBackupCheckpoint:After");
|
||||||
}};
|
}};
|
||||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
|
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get()));
|
||||||
flush_thread.join();
|
flush_thread.join();
|
||||||
CloseDBAndBackupEngine();
|
CloseDBAndBackupEngine();
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
if (sopt == kShareWithChecksum) {
|
||||||
|
// Ensure we actually got DB manifest checksums by inspecting
|
||||||
|
// shared_checksum file names for hex checksum component
|
||||||
|
std::regex expected("[^_]+_[0-9A-F]{8}_[^_]+.sst");
|
||||||
|
std::vector<FileAttributes> children;
|
||||||
|
const std::string dir = backupdir_ + "/shared_checksum";
|
||||||
|
ASSERT_OK(file_manager_->GetChildrenFileAttributes(dir, &children));
|
||||||
|
for (const auto& child : children) {
|
||||||
|
if (child.name == "." || child.name == ".." || child.size_bytes == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
const std::string match("match");
|
||||||
|
EXPECT_EQ(match, std::regex_replace(child.name, expected, match));
|
||||||
|
}
|
||||||
|
}
|
||||||
AssertBackupConsistency(0, 0, keys_iteration);
|
AssertBackupConsistency(0, 0, keys_iteration);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,7 @@
|
|||||||
#include "rocksdb/utilities/checkpoint.h"
|
#include "rocksdb/utilities/checkpoint.h"
|
||||||
#include "test_util/sync_point.h"
|
#include "test_util/sync_point.h"
|
||||||
#include "util/cast_util.h"
|
#include "util/cast_util.h"
|
||||||
|
#include "util/file_checksum_helper.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
@ -180,7 +181,7 @@ Status CheckpointImpl::CreateCustomCheckpoint(
|
|||||||
FileType type)>
|
FileType type)>
|
||||||
create_file_cb,
|
create_file_cb,
|
||||||
uint64_t* sequence_number, uint64_t log_size_for_flush,
|
uint64_t* sequence_number, uint64_t log_size_for_flush,
|
||||||
const bool& get_live_table_checksum) {
|
bool get_live_table_checksum) {
|
||||||
Status s;
|
Status s;
|
||||||
std::vector<std::string> live_files;
|
std::vector<std::string> live_files;
|
||||||
uint64_t manifest_file_size = 0;
|
uint64_t manifest_file_size = 0;
|
||||||
@ -262,19 +263,17 @@ Status CheckpointImpl::CreateCustomCheckpoint(
|
|||||||
|
|
||||||
size_t wal_size = live_wal_files.size();
|
size_t wal_size = live_wal_files.size();
|
||||||
|
|
||||||
// get table file checksums if get_live_table_checksum is true
|
// process live files, non-table files first
|
||||||
std::unique_ptr<FileChecksumList> checksum_list(NewFileChecksumList());
|
|
||||||
Status checksum_status;
|
|
||||||
if (get_live_table_checksum) {
|
|
||||||
checksum_status = db_->GetLiveFilesChecksumInfo(checksum_list.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
// copy/hard link live_files
|
|
||||||
std::string manifest_fname, current_fname;
|
std::string manifest_fname, current_fname;
|
||||||
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
|
// record table files for processing next
|
||||||
|
std::vector<std::pair<std::string, uint64_t>> live_table_files;
|
||||||
|
for (auto& live_file : live_files) {
|
||||||
|
if (!s.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
uint64_t number;
|
uint64_t number;
|
||||||
FileType type;
|
FileType type;
|
||||||
bool ok = ParseFileName(live_files[i], &number, &type);
|
bool ok = ParseFileName(live_file, &number, &type);
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
s = Status::Corruption("Can't parse file name. This is very bad");
|
s = Status::Corruption("Can't parse file name. This is very bad");
|
||||||
break;
|
break;
|
||||||
@ -282,54 +281,80 @@ Status CheckpointImpl::CreateCustomCheckpoint(
|
|||||||
// we should only get sst, options, manifest and current files here
|
// we should only get sst, options, manifest and current files here
|
||||||
assert(type == kTableFile || type == kDescriptorFile ||
|
assert(type == kTableFile || type == kDescriptorFile ||
|
||||||
type == kCurrentFile || type == kOptionsFile);
|
type == kCurrentFile || type == kOptionsFile);
|
||||||
assert(live_files[i].size() > 0 && live_files[i][0] == '/');
|
assert(live_file.size() > 0 && live_file[0] == '/');
|
||||||
if (type == kCurrentFile) {
|
if (type == kCurrentFile) {
|
||||||
// We will craft the current file manually to ensure it's consistent with
|
// We will craft the current file manually to ensure it's consistent with
|
||||||
// the manifest number. This is necessary because current's file contents
|
// the manifest number. This is necessary because current's file contents
|
||||||
// can change during checkpoint creation.
|
// can change during checkpoint creation.
|
||||||
current_fname = live_files[i];
|
current_fname = live_file;
|
||||||
continue;
|
continue;
|
||||||
} else if (type == kDescriptorFile) {
|
} else if (type == kDescriptorFile) {
|
||||||
manifest_fname = live_files[i];
|
manifest_fname = live_file;
|
||||||
}
|
}
|
||||||
std::string src_fname = live_files[i];
|
if (type != kTableFile) {
|
||||||
|
// copy non-table files here
|
||||||
|
// * if it's kDescriptorFile, limit the size to manifest_file_size
|
||||||
|
s = copy_file_cb(db_->GetName(), live_file,
|
||||||
|
(type == kDescriptorFile) ? manifest_file_size : 0, type,
|
||||||
|
kUnknownFileChecksumFuncName, kUnknownFileChecksum);
|
||||||
|
} else {
|
||||||
|
// process table files below
|
||||||
|
live_table_files.push_back(make_pair(live_file, number));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// get checksum info for table files
|
||||||
|
// get table file checksums if get_live_table_checksum is true
|
||||||
|
std::unique_ptr<FileChecksumList> checksum_list;
|
||||||
|
|
||||||
|
if (s.ok() && get_live_table_checksum) {
|
||||||
|
checksum_list.reset(NewFileChecksumList());
|
||||||
|
// should succeed even without checksum info present, else manifest
|
||||||
|
// is corrupt
|
||||||
|
s = GetFileChecksumsFromManifest(db_->GetEnv(),
|
||||||
|
db_->GetName() + manifest_fname,
|
||||||
|
manifest_file_size, checksum_list.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
// copy/hard link live table files
|
||||||
|
for (auto& ltf : live_table_files) {
|
||||||
|
if (!s.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::string& src_fname = ltf.first;
|
||||||
|
uint64_t number = ltf.second;
|
||||||
|
|
||||||
// rules:
|
// rules:
|
||||||
// * if it's kTableFile, then it's shared
|
// * for kTableFile, attempt hard link instead of copy.
|
||||||
// * if it's kDescriptorFile, limit the size to manifest_file_size
|
// * but can't hard link across filesystems.
|
||||||
// * always copy if cross-device link
|
if (same_fs) {
|
||||||
if ((type == kTableFile) && same_fs) {
|
s = link_file_cb(db_->GetName(), src_fname, kTableFile);
|
||||||
s = link_file_cb(db_->GetName(), src_fname, type);
|
|
||||||
if (s.IsNotSupported()) {
|
if (s.IsNotSupported()) {
|
||||||
same_fs = false;
|
same_fs = false;
|
||||||
s = Status::OK();
|
s = Status::OK();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ((type != kTableFile) || (!same_fs)) {
|
if (!same_fs) {
|
||||||
std::string checksum_name = kUnknownFileChecksumFuncName;
|
std::string checksum_name = kUnknownFileChecksumFuncName;
|
||||||
std::string checksum_value = kUnknownFileChecksum;
|
std::string checksum_value = kUnknownFileChecksum;
|
||||||
|
|
||||||
// we ignore the checksums either they are not required or we failed to
|
// we ignore the checksums either they are not required or we failed to
|
||||||
// obtain the checksum lsit for old table files that have no file
|
// obtain the checksum lsit for old table files that have no file
|
||||||
// checksums
|
// checksums
|
||||||
if (type == kTableFile && get_live_table_checksum &&
|
if (get_live_table_checksum) {
|
||||||
checksum_status.ok()) {
|
|
||||||
// find checksum info for table files
|
// find checksum info for table files
|
||||||
s = checksum_list->SearchOneFileChecksum(number, &checksum_value,
|
Status search = checksum_list->SearchOneFileChecksum(
|
||||||
&checksum_name);
|
number, &checksum_value, &checksum_name);
|
||||||
|
|
||||||
// XXX/FIXME(peterd): There's currently a race between GetLiveFiles
|
// could be a legacy file lacking checksum info. overall OK if
|
||||||
// and GetLiveFilesChecksumInfo that could lead to not finding
|
// not found
|
||||||
// checksum info on a file that has it. For now, we can accept
|
if (!search.ok()) {
|
||||||
// that and treat it like a legacy file lacking checksum info.
|
|
||||||
if (!s.ok()) {
|
|
||||||
assert(checksum_name == kUnknownFileChecksumFuncName);
|
assert(checksum_name == kUnknownFileChecksumFuncName);
|
||||||
assert(checksum_value == kUnknownFileChecksum);
|
assert(checksum_value == kUnknownFileChecksum);
|
||||||
s = Status::OK();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s = copy_file_cb(db_->GetName(), src_fname,
|
s = copy_file_cb(db_->GetName(), src_fname, 0, kTableFile, checksum_name,
|
||||||
(type == kDescriptorFile) ? manifest_file_size : 0, type,
|
checksum_value);
|
||||||
checksum_name, checksum_value);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
|
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
|
||||||
|
@ -58,7 +58,7 @@ class CheckpointImpl : public Checkpoint {
|
|||||||
const std::string& contents, FileType type)>
|
const std::string& contents, FileType type)>
|
||||||
create_file_cb,
|
create_file_cb,
|
||||||
uint64_t* sequence_number, uint64_t log_size_for_flush,
|
uint64_t* sequence_number, uint64_t log_size_for_flush,
|
||||||
const bool& get_live_table_checksum = false);
|
bool get_live_table_checksum = false);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void CleanStagingDirectory(const std::string& path, Logger* info_log);
|
void CleanStagingDirectory(const std::string& path, Logger* info_log);
|
||||||
|
Loading…
Reference in New Issue
Block a user