Forge current file for checkpoint
Summary: This fixes a similar issue as D54711: "CURRENT" file can mutate between GetLiveFiles() and copy to the tmp directory, in which case it would reference the wrong manifest filename. To fix this, I forge the "CURRENT" file such that it simply contains the filename for the manifest returned by GetLiveFiles(). - Changed CreateCheckpoint() to forge current file - Added CreateFile() utility function - Added test case that rolls manifest during checkpoint creation Test Plan: $ ./checkpoint_test Reviewers: sdong, IslamAbdelRahman Reviewed By: IslamAbdelRahman Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D55065
This commit is contained in:
parent
33d568611d
commit
72224104d3
@ -66,6 +66,22 @@ Status CopyFile(Env* env, const std::string& source,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Utility function to create a file with the provided contents
|
||||
Status CreateFile(Env* env, const std::string& destination,
|
||||
const std::string& contents) {
|
||||
const EnvOptions soptions;
|
||||
Status s;
|
||||
unique_ptr<WritableFileWriter> dest_writer;
|
||||
|
||||
unique_ptr<WritableFile> destfile;
|
||||
s = env->NewWritableFile(destination, &destfile, soptions);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
dest_writer.reset(new WritableFileWriter(std::move(destfile), soptions));
|
||||
return dest_writer->Append(Slice(contents));
|
||||
}
|
||||
|
||||
Status DeleteSSTFile(const DBOptions* db_options, const std::string& fname,
|
||||
uint32_t path_id) {
|
||||
// TODO(tec): support sst_file_manager for multiple path_ids
|
||||
|
@ -16,6 +16,9 @@ namespace rocksdb {
|
||||
extern Status CopyFile(Env* env, const std::string& source,
|
||||
const std::string& destination, uint64_t size = 0);
|
||||
|
||||
extern Status CreateFile(Env* env, const std::string& destination,
|
||||
const std::string& contents);
|
||||
|
||||
extern Status DeleteSSTFile(const DBOptions* db_options,
|
||||
const std::string& fname, uint32_t path_id);
|
||||
|
||||
|
@ -24,6 +24,7 @@
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "util/file_util.h"
|
||||
#include "util/sync_point.h"
|
||||
#include "port/port.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -76,7 +77,9 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
s = db_->DisableFileDeletions();
|
||||
if (s.ok()) {
|
||||
// this will return live_files prefixed with "/"
|
||||
s = db_->GetLiveFiles(live_files, &manifest_file_size, true);
|
||||
s = db_->GetLiveFiles(live_files, &manifest_file_size);
|
||||
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
|
||||
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
|
||||
}
|
||||
// if we have more than one column family, we need to also get WAL files
|
||||
if (s.ok()) {
|
||||
@ -98,6 +101,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
s = db_->GetEnv()->CreateDir(full_private_path);
|
||||
|
||||
// copy/hard link live_files
|
||||
std::string manifest_fname, current_fname;
|
||||
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
@ -110,6 +114,15 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
assert(type == kTableFile || type == kDescriptorFile ||
|
||||
type == kCurrentFile);
|
||||
assert(live_files[i].size() > 0 && live_files[i][0] == '/');
|
||||
if (type == kCurrentFile) {
|
||||
// We will craft the current file manually to ensure it's consistent with
|
||||
// the manifest number. This is necessary because current's file contents
|
||||
// can change during checkpoint creation.
|
||||
current_fname = live_files[i];
|
||||
continue;
|
||||
} else if (type == kDescriptorFile) {
|
||||
manifest_fname = live_files[i];
|
||||
}
|
||||
std::string src_fname = live_files[i];
|
||||
|
||||
// rules:
|
||||
@ -132,6 +145,10 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
(type == kDescriptorFile) ? manifest_file_size : 0);
|
||||
}
|
||||
}
|
||||
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
|
||||
s = CreateFile(db_->GetEnv(), full_private_path + current_fname,
|
||||
manifest_fname.substr(1) + "\n");
|
||||
}
|
||||
Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt,
|
||||
live_wal_files.size());
|
||||
|
||||
|
@ -346,6 +346,50 @@ TEST_F(DBTest, CheckpointCF) {
|
||||
ASSERT_OK(DestroyDB(snapshot_name, options));
|
||||
}
|
||||
|
||||
TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing) {
|
||||
const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot";
|
||||
ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));
|
||||
env_->DeleteDir(kSnapshotName);
|
||||
|
||||
Options options = CurrentOptions();
|
||||
options.max_manifest_file_size = 0; // always rollover manifest for file add
|
||||
Reopen(options);
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->LoadDependency(
|
||||
{// Get past the flush in the checkpoint thread before adding any keys to
|
||||
// the db so the checkpoint thread won't hit the WriteManifest
|
||||
// syncpoints.
|
||||
{"DBImpl::GetLiveFiles:1",
|
||||
"DBTest::CurrentFileModifiedWhileCheckpointing:PrePut"},
|
||||
// Roll the manifest during checkpointing right after live files are
|
||||
// snapshotted.
|
||||
{"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
|
||||
"VersionSet::LogAndApply:WriteManifest"},
|
||||
{"VersionSet::LogAndApply:WriteManifestDone",
|
||||
"CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}});
|
||||
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
std::thread t([&]() {
|
||||
Checkpoint* checkpoint;
|
||||
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
|
||||
ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName));
|
||||
delete checkpoint;
|
||||
});
|
||||
TEST_SYNC_POINT("DBTest::CurrentFileModifiedWhileCheckpointing:PrePut");
|
||||
ASSERT_OK(Put("Default", "Default1"));
|
||||
ASSERT_OK(Flush());
|
||||
t.join();
|
||||
|
||||
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
||||
|
||||
DB* snapshotDB;
|
||||
// Successful Open() implies that CURRENT pointed to the manifest in the
|
||||
// checkpoint.
|
||||
ASSERT_OK(DB::Open(options, kSnapshotName, &snapshotDB));
|
||||
delete snapshotDB;
|
||||
snapshotDB = nullptr;
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
Loading…
Reference in New Issue
Block a user