Always fsync the file after file copying
Summary: File copying happens when creating checkpoints and bulkloading files from different FS partition. We should fsync the files when copying them to guarantee durability. A side effect will be that the dirty pages in file system buffers won't grow too large. Closes https://github.com/facebook/rocksdb/pull/1728 Differential Revision: D4371083 Pulled By: siying fbshipit-source-id: 579e14c
This commit is contained in:
parent
2fb70dc798
commit
17a4b75cc3
@ -7,6 +7,7 @@
|
||||
|
||||
### Bug Fixes
|
||||
* Fix the bug that if 2PC is enabled, checkpoints may loss some recent transactions.
|
||||
* When file copying is needed when creating checkpoints or bulk loading files, fsync the file after the file copying.
|
||||
|
||||
## 5.0.0 (11/17/2016)
|
||||
### Public API Change
|
||||
|
@ -96,10 +96,12 @@ Status ExternalSstFileIngestionJob::Prepare(
|
||||
status = env_->LinkFile(path_outside_db, path_inside_db);
|
||||
if (status.IsNotSupported()) {
|
||||
// Original file is on a different FS, use copy instead of hard linking
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync);
|
||||
}
|
||||
} else {
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
|
||||
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
|
||||
db_options_.use_fsync);
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
|
||||
if (!status.ok()) {
|
||||
|
@ -16,7 +16,7 @@ namespace rocksdb {
|
||||
|
||||
// Utility function to copy a file up to a specified length
|
||||
Status CopyFile(Env* env, const std::string& source,
|
||||
const std::string& destination, uint64_t size) {
|
||||
const std::string& destination, uint64_t size, bool use_fsync) {
|
||||
const EnvOptions soptions;
|
||||
Status s;
|
||||
unique_ptr<SequentialFileReader> src_reader;
|
||||
@ -62,6 +62,7 @@ Status CopyFile(Env* env, const std::string& source,
|
||||
}
|
||||
size -= slice.size();
|
||||
}
|
||||
dest_writer->Sync(use_fsync);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
@ -12,9 +12,11 @@
|
||||
#include "util/db_options.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// use_fsync maps to options.use_fsync, which determines the way that
|
||||
// the file is synced after copying.
|
||||
extern Status CopyFile(Env* env, const std::string& source,
|
||||
const std::string& destination, uint64_t size = 0);
|
||||
const std::string& destination, uint64_t size,
|
||||
bool use_fsync);
|
||||
|
||||
extern Status CreateFile(Env* env, const std::string& destination,
|
||||
const std::string& contents);
|
||||
|
@ -62,7 +62,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
Status s;
|
||||
std::vector<std::string> live_files;
|
||||
uint64_t manifest_file_size = 0;
|
||||
bool allow_2pc = db_->GetDBOptions().allow_2pc;
|
||||
DBOptions db_options = db_->GetDBOptions();
|
||||
uint64_t min_log_num = port::kMaxUint64;
|
||||
uint64_t sequence_number = db_->GetLatestSequenceNumber();
|
||||
bool same_fs = true;
|
||||
@ -81,7 +81,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
// this will return live_files prefixed with "/"
|
||||
s = db_->GetLiveFiles(live_files, &manifest_file_size);
|
||||
|
||||
if (s.ok() && allow_2pc) {
|
||||
if (s.ok() && db_options.allow_2pc) {
|
||||
// If 2PC is enabled, we need to get minimum log number after the flush.
|
||||
// Need to refetch the live files to recapture the snapshot.
|
||||
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
|
||||
@ -122,7 +122,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
}
|
||||
|
||||
size_t wal_size = live_wal_files.size();
|
||||
Log(db_->GetOptions().info_log,
|
||||
Log(db_options.info_log,
|
||||
"Started the snapshot process -- creating snapshot in directory %s",
|
||||
checkpoint_dir.c_str());
|
||||
|
||||
@ -161,7 +161,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
// * if it's kDescriptorFile, limit the size to manifest_file_size
|
||||
// * always copy if cross-device link
|
||||
if ((type == kTableFile) && same_fs) {
|
||||
Log(db_->GetOptions().info_log, "Hard Linking %s", src_fname.c_str());
|
||||
Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());
|
||||
s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname,
|
||||
full_private_path + src_fname);
|
||||
if (s.IsNotSupported()) {
|
||||
@ -170,17 +170,18 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
}
|
||||
}
|
||||
if ((type != kTableFile) || (!same_fs)) {
|
||||
Log(db_->GetOptions().info_log, "Copying %s", src_fname.c_str());
|
||||
Log(db_options.info_log, "Copying %s", src_fname.c_str());
|
||||
s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname,
|
||||
full_private_path + src_fname,
|
||||
(type == kDescriptorFile) ? manifest_file_size : 0);
|
||||
(type == kDescriptorFile) ? manifest_file_size : 0,
|
||||
db_options.use_fsync);
|
||||
}
|
||||
}
|
||||
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
|
||||
s = CreateFile(db_->GetEnv(), full_private_path + current_fname,
|
||||
manifest_fname.substr(1) + "\n");
|
||||
}
|
||||
Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt,
|
||||
Log(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
|
||||
live_wal_files.size());
|
||||
|
||||
// Link WAL files. Copy exact size of last one because it is the only one
|
||||
@ -190,20 +191,20 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
(live_wal_files[i]->StartSequence() >= sequence_number ||
|
||||
live_wal_files[i]->LogNumber() >= min_log_num)) {
|
||||
if (i + 1 == wal_size) {
|
||||
Log(db_->GetOptions().info_log, "Copying %s",
|
||||
Log(db_options.info_log, "Copying %s",
|
||||
live_wal_files[i]->PathName().c_str());
|
||||
s = CopyFile(db_->GetEnv(),
|
||||
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
|
||||
db_options.wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName(),
|
||||
live_wal_files[i]->SizeFileBytes());
|
||||
live_wal_files[i]->SizeFileBytes(), db_options.use_fsync);
|
||||
break;
|
||||
}
|
||||
if (same_fs) {
|
||||
// we only care about live log files
|
||||
Log(db_->GetOptions().info_log, "Hard Linking %s",
|
||||
Log(db_options.info_log, "Hard Linking %s",
|
||||
live_wal_files[i]->PathName().c_str());
|
||||
s = db_->GetEnv()->LinkFile(
|
||||
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
|
||||
db_options.wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName());
|
||||
if (s.IsNotSupported()) {
|
||||
same_fs = false;
|
||||
@ -211,11 +212,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
}
|
||||
}
|
||||
if (!same_fs) {
|
||||
Log(db_->GetOptions().info_log, "Copying %s",
|
||||
Log(db_options.info_log, "Copying %s",
|
||||
live_wal_files[i]->PathName().c_str());
|
||||
s = CopyFile(db_->GetEnv(),
|
||||
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName(), 0);
|
||||
db_options.wal_dir + live_wal_files[i]->PathName(),
|
||||
full_private_path + live_wal_files[i]->PathName(), 0,
|
||||
db_options.use_fsync);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -237,27 +239,26 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
|
||||
|
||||
if (!s.ok()) {
|
||||
// clean all the files we might have created
|
||||
Log(db_->GetOptions().info_log, "Snapshot failed -- %s",
|
||||
s.ToString().c_str());
|
||||
Log(db_options.info_log, "Snapshot failed -- %s", s.ToString().c_str());
|
||||
// we have to delete the dir and all its children
|
||||
std::vector<std::string> subchildren;
|
||||
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
|
||||
for (auto& subchild : subchildren) {
|
||||
std::string subchild_path = full_private_path + "/" + subchild;
|
||||
Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
|
||||
Log(db_->GetOptions().info_log, "Delete file %s -- %s",
|
||||
subchild_path.c_str(), s1.ToString().c_str());
|
||||
Log(db_options.info_log, "Delete file %s -- %s", subchild_path.c_str(),
|
||||
s1.ToString().c_str());
|
||||
}
|
||||
// finally delete the private dir
|
||||
Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
|
||||
Log(db_->GetOptions().info_log, "Delete dir %s -- %s",
|
||||
full_private_path.c_str(), s1.ToString().c_str());
|
||||
Log(db_options.info_log, "Delete dir %s -- %s", full_private_path.c_str(),
|
||||
s1.ToString().c_str());
|
||||
return s;
|
||||
}
|
||||
|
||||
// here we know that we succeeded and installed the new snapshot
|
||||
Log(db_->GetOptions().info_log, "Snapshot DONE. All is good");
|
||||
Log(db_->GetOptions().info_log, "Snapshot sequence number: %" PRIu64,
|
||||
Log(db_options.info_log, "Snapshot DONE. All is good");
|
||||
Log(db_options.info_log, "Snapshot sequence number: %" PRIu64,
|
||||
sequence_number);
|
||||
|
||||
return s;
|
||||
|
Loading…
x
Reference in New Issue
Block a user