diff --git a/HISTORY.md b/HISTORY.md index f18446b74..81f5080bd 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ ### Bug Fixes * Fix the bug that if 2PC is enabled, checkpoints may loss some recent transactions. +* When file copying is needed when creating checkpoints or bulk loading files, fsync the file after the file copying. ## 5.0.0 (11/17/2016) ### Public API Change diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index d5a073efe..993c23790 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -96,10 +96,12 @@ Status ExternalSstFileIngestionJob::Prepare( status = env_->LinkFile(path_outside_db, path_inside_db); if (status.IsNotSupported()) { // Original file is on a different FS, use copy instead of hard linking - status = CopyFile(env_, path_outside_db, path_inside_db, 0); + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); } } else { - status = CopyFile(env_, path_outside_db, path_inside_db, 0); + status = CopyFile(env_, path_outside_db, path_inside_db, 0, + db_options_.use_fsync); } TEST_SYNC_POINT("DBImpl::AddFile:FileCopied"); if (!status.ok()) { diff --git a/util/file_util.cc b/util/file_util.cc index 42b3dc1f3..829415aee 100644 --- a/util/file_util.cc +++ b/util/file_util.cc @@ -16,7 +16,7 @@ namespace rocksdb { // Utility function to copy a file up to a specified length Status CopyFile(Env* env, const std::string& source, - const std::string& destination, uint64_t size) { + const std::string& destination, uint64_t size, bool use_fsync) { const EnvOptions soptions; Status s; unique_ptr src_reader; @@ -62,6 +62,7 @@ Status CopyFile(Env* env, const std::string& source, } size -= slice.size(); } + dest_writer->Sync(use_fsync); return Status::OK(); } diff --git a/util/file_util.h b/util/file_util.h index f2c5c329f..56c78742c 100644 --- a/util/file_util.h +++ b/util/file_util.h @@ -12,9 +12,11 @@ #include "util/db_options.h" namespace rocksdb { - +// use_fsync maps to options.use_fsync, which determines the way that +// the file is synced after copying. extern Status CopyFile(Env* env, const std::string& source, - const std::string& destination, uint64_t size = 0); + const std::string& destination, uint64_t size, + bool use_fsync); extern Status CreateFile(Env* env, const std::string& destination, const std::string& contents); diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint.cc index ea6a1d08f..e03c67daa 100644 --- a/utilities/checkpoint/checkpoint.cc +++ b/utilities/checkpoint/checkpoint.cc @@ -62,7 +62,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { Status s; std::vector live_files; uint64_t manifest_file_size = 0; - bool allow_2pc = db_->GetDBOptions().allow_2pc; + DBOptions db_options = db_->GetDBOptions(); uint64_t min_log_num = port::kMaxUint64; uint64_t sequence_number = db_->GetLatestSequenceNumber(); bool same_fs = true; @@ -81,7 +81,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { // this will return live_files prefixed with "/" s = db_->GetLiveFiles(live_files, &manifest_file_size); - if (s.ok() && allow_2pc) { + if (s.ok() && db_options.allow_2pc) { // If 2PC is enabled, we need to get minimum log number after the flush. // Need to refetch the live files to recapture the snapshot. if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep, @@ -122,7 +122,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { } size_t wal_size = live_wal_files.size(); - Log(db_->GetOptions().info_log, + Log(db_options.info_log, "Started the snapshot process -- creating snapshot in directory %s", checkpoint_dir.c_str()); @@ -161,7 +161,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { // * if it's kDescriptorFile, limit the size to manifest_file_size // * always copy if cross-device link if ((type == kTableFile) && same_fs) { - Log(db_->GetOptions().info_log, "Hard Linking %s", src_fname.c_str()); + Log(db_options.info_log, "Hard Linking %s", src_fname.c_str()); s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname, full_private_path + src_fname); if (s.IsNotSupported()) { @@ -170,17 +170,18 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { } } if ((type != kTableFile) || (!same_fs)) { - Log(db_->GetOptions().info_log, "Copying %s", src_fname.c_str()); + Log(db_options.info_log, "Copying %s", src_fname.c_str()); s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname, full_private_path + src_fname, - (type == kDescriptorFile) ? manifest_file_size : 0); + (type == kDescriptorFile) ? manifest_file_size : 0, + db_options.use_fsync); } } if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { s = CreateFile(db_->GetEnv(), full_private_path + current_fname, manifest_fname.substr(1) + "\n"); } - Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt, + Log(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt, live_wal_files.size()); // Link WAL files. Copy exact size of last one because it is the only one @@ -190,20 +191,20 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { (live_wal_files[i]->StartSequence() >= sequence_number || live_wal_files[i]->LogNumber() >= min_log_num)) { if (i + 1 == wal_size) { - Log(db_->GetOptions().info_log, "Copying %s", + Log(db_options.info_log, "Copying %s", live_wal_files[i]->PathName().c_str()); s = CopyFile(db_->GetEnv(), - db_->GetOptions().wal_dir + live_wal_files[i]->PathName(), + db_options.wal_dir + live_wal_files[i]->PathName(), full_private_path + live_wal_files[i]->PathName(), - live_wal_files[i]->SizeFileBytes()); + live_wal_files[i]->SizeFileBytes(), db_options.use_fsync); break; } if (same_fs) { // we only care about live log files - Log(db_->GetOptions().info_log, "Hard Linking %s", + Log(db_options.info_log, "Hard Linking %s", live_wal_files[i]->PathName().c_str()); s = db_->GetEnv()->LinkFile( - db_->GetOptions().wal_dir + live_wal_files[i]->PathName(), + db_options.wal_dir + live_wal_files[i]->PathName(), full_private_path + live_wal_files[i]->PathName()); if (s.IsNotSupported()) { same_fs = false; @@ -211,11 +212,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { } } if (!same_fs) { - Log(db_->GetOptions().info_log, "Copying %s", + Log(db_options.info_log, "Copying %s", live_wal_files[i]->PathName().c_str()); s = CopyFile(db_->GetEnv(), - db_->GetOptions().wal_dir + live_wal_files[i]->PathName(), - full_private_path + live_wal_files[i]->PathName(), 0); + db_options.wal_dir + live_wal_files[i]->PathName(), + full_private_path + live_wal_files[i]->PathName(), 0, + db_options.use_fsync); } } } @@ -237,27 +239,26 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { if (!s.ok()) { // clean all the files we might have created - Log(db_->GetOptions().info_log, "Snapshot failed -- %s", - s.ToString().c_str()); + Log(db_options.info_log, "Snapshot failed -- %s", s.ToString().c_str()); // we have to delete the dir and all its children std::vector subchildren; db_->GetEnv()->GetChildren(full_private_path, &subchildren); for (auto& subchild : subchildren) { std::string subchild_path = full_private_path + "/" + subchild; Status s1 = db_->GetEnv()->DeleteFile(subchild_path); - Log(db_->GetOptions().info_log, "Delete file %s -- %s", - subchild_path.c_str(), s1.ToString().c_str()); + Log(db_options.info_log, "Delete file %s -- %s", subchild_path.c_str(), + s1.ToString().c_str()); } // finally delete the private dir Status s1 = db_->GetEnv()->DeleteDir(full_private_path); - Log(db_->GetOptions().info_log, "Delete dir %s -- %s", - full_private_path.c_str(), s1.ToString().c_str()); + Log(db_options.info_log, "Delete dir %s -- %s", full_private_path.c_str(), + s1.ToString().c_str()); return s; } // here we know that we succeeded and installed the new snapshot - Log(db_->GetOptions().info_log, "Snapshot DONE. All is good"); - Log(db_->GetOptions().info_log, "Snapshot sequence number: %" PRIu64, + Log(db_options.info_log, "Snapshot DONE. All is good"); + Log(db_options.info_log, "Snapshot sequence number: %" PRIu64, sequence_number); return s;