Always fsync the file after file copying

Summary:
File copying happens when creating checkpoints and bulkloading files from different FS partition. We should fsync the files when copying them to guarantee durability. A side effect will be that the dirty pages in file system buffers won't grow too large.
Closes https://github.com/facebook/rocksdb/pull/1728

Differential Revision: D4371083

Pulled By: siying

fbshipit-source-id: 579e14c
This commit is contained in:
Siying Dong 2016-12-28 18:38:20 -08:00 committed by Islam AbdelRahman
parent beb5daeeac
commit b2892047fd
5 changed files with 36 additions and 28 deletions

View File

@ -4,6 +4,8 @@
* Fix the data corruption bug in the case that concurrent memtable write is enabled and 2PC is used.
* OptimizeForPointLookup() doesn't work with the default DB setting of allow_concurrent_memtable_write=true. Fix it.
* Fix a 2PC related bug where WAL files size grow too large.
* Fix the bug that if 2PC is enabled, checkpoints may loss some recent transactions.
* When file copying is needed when creating checkpoints or bulk loading files, fsync the file after the file copying.
## 5.0.0 (11/17/2016)
### Public API Change

View File

@ -96,10 +96,12 @@ Status ExternalSstFileIngestionJob::Prepare(
status = env_->LinkFile(path_outside_db, path_inside_db);
if (status.IsNotSupported()) {
// Original file is on a different FS, use copy instead of hard linking
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
} else {
status = CopyFile(env_, path_outside_db, path_inside_db, 0);
status = CopyFile(env_, path_outside_db, path_inside_db, 0,
db_options_.use_fsync);
}
TEST_SYNC_POINT("DBImpl::AddFile:FileCopied");
if (!status.ok()) {

View File

@ -16,7 +16,7 @@ namespace rocksdb {
// Utility function to copy a file up to a specified length
Status CopyFile(Env* env, const std::string& source,
const std::string& destination, uint64_t size) {
const std::string& destination, uint64_t size, bool use_fsync) {
const EnvOptions soptions;
Status s;
unique_ptr<SequentialFileReader> src_reader;
@ -62,6 +62,7 @@ Status CopyFile(Env* env, const std::string& source,
}
size -= slice.size();
}
dest_writer->Sync(use_fsync);
return Status::OK();
}

View File

@ -12,9 +12,11 @@
#include "util/db_options.h"
namespace rocksdb {
// use_fsync maps to options.use_fsync, which determines the way that
// the file is synced after copying.
extern Status CopyFile(Env* env, const std::string& source,
const std::string& destination, uint64_t size = 0);
const std::string& destination, uint64_t size,
bool use_fsync);
extern Status CreateFile(Env* env, const std::string& destination,
const std::string& contents);

View File

@ -62,7 +62,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
bool allow_2pc = db_->GetDBOptions().allow_2pc;
DBOptions db_options = db_->GetDBOptions();
uint64_t min_log_num = port::kMaxUint64;
uint64_t sequence_number = db_->GetLatestSequenceNumber();
bool same_fs = true;
@ -81,7 +81,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size);
if (s.ok() && allow_2pc) {
if (s.ok() && db_options.allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush.
// Need to refetch the live files to recapture the snapshot.
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
@ -122,7 +122,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
}
size_t wal_size = live_wal_files.size();
Log(db_->GetOptions().info_log,
Log(db_options.info_log,
"Started the snapshot process -- creating snapshot in directory %s",
checkpoint_dir.c_str());
@ -161,7 +161,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
// * if it's kDescriptorFile, limit the size to manifest_file_size
// * always copy if cross-device link
if ((type == kTableFile) && same_fs) {
Log(db_->GetOptions().info_log, "Hard Linking %s", src_fname.c_str());
Log(db_options.info_log, "Hard Linking %s", src_fname.c_str());
s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname,
full_private_path + src_fname);
if (s.IsNotSupported()) {
@ -170,17 +170,18 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
}
}
if ((type != kTableFile) || (!same_fs)) {
Log(db_->GetOptions().info_log, "Copying %s", src_fname.c_str());
Log(db_options.info_log, "Copying %s", src_fname.c_str());
s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname,
full_private_path + src_fname,
(type == kDescriptorFile) ? manifest_file_size : 0);
(type == kDescriptorFile) ? manifest_file_size : 0,
db_options.use_fsync);
}
}
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
s = CreateFile(db_->GetEnv(), full_private_path + current_fname,
manifest_fname.substr(1) + "\n");
}
Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt,
Log(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
live_wal_files.size());
// Link WAL files. Copy exact size of last one because it is the only one
@ -190,20 +191,20 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
(live_wal_files[i]->StartSequence() >= sequence_number ||
live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) {
Log(db_->GetOptions().info_log, "Copying %s",
Log(db_options.info_log, "Copying %s",
live_wal_files[i]->PathName().c_str());
s = CopyFile(db_->GetEnv(),
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes());
live_wal_files[i]->SizeFileBytes(), db_options.use_fsync);
break;
}
if (same_fs) {
// we only care about live log files
Log(db_->GetOptions().info_log, "Hard Linking %s",
Log(db_options.info_log, "Hard Linking %s",
live_wal_files[i]->PathName().c_str());
s = db_->GetEnv()->LinkFile(
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName());
if (s.IsNotSupported()) {
same_fs = false;
@ -211,11 +212,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
}
}
if (!same_fs) {
Log(db_->GetOptions().info_log, "Copying %s",
Log(db_options.info_log, "Copying %s",
live_wal_files[i]->PathName().c_str());
s = CopyFile(db_->GetEnv(),
db_->GetOptions().wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(), 0);
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(), 0,
db_options.use_fsync);
}
}
}
@ -237,27 +239,26 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
if (!s.ok()) {
// clean all the files we might have created
Log(db_->GetOptions().info_log, "Snapshot failed -- %s",
s.ToString().c_str());
Log(db_options.info_log, "Snapshot failed -- %s", s.ToString().c_str());
// we have to delete the dir and all its children
std::vector<std::string> subchildren;
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
std::string subchild_path = full_private_path + "/" + subchild;
Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
Log(db_->GetOptions().info_log, "Delete file %s -- %s",
subchild_path.c_str(), s1.ToString().c_str());
Log(db_options.info_log, "Delete file %s -- %s", subchild_path.c_str(),
s1.ToString().c_str());
}
// finally delete the private dir
Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
Log(db_->GetOptions().info_log, "Delete dir %s -- %s",
full_private_path.c_str(), s1.ToString().c_str());
Log(db_options.info_log, "Delete dir %s -- %s", full_private_path.c_str(),
s1.ToString().c_str());
return s;
}
// here we know that we succeeded and installed the new snapshot
Log(db_->GetOptions().info_log, "Snapshot DONE. All is good");
Log(db_->GetOptions().info_log, "Snapshot sequence number: %" PRIu64,
Log(db_options.info_log, "Snapshot DONE. All is good");
Log(db_options.info_log, "Snapshot sequence number: %" PRIu64,
sequence_number);
return s;