Skip directory fsync for filesystem btrfs (#8903)
Summary: Directory fsync might be expensive on btrfs and it may not be needed. Here are 4 directory fsync cases: 1. creating a new file: dir-fsync is not needed on btrfs, as long as the new file itself is synced. 2. renaming a file: dir-fsync is not needed if the renamed file is synced. So an API `FsyncAfterFileRename(filename, ...)` is provided to sync the file on btrfs. By default, it just calls dir-fsync. 3. deleting files: dir-fsync is forced by set `IOOptions.force_dir_fsync = true` 4. renaming multiple files (like backup and checkpoint): dir-fsync is forced, the same as above. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8903 Test Plan: run tests on btrfs and non btrfs Reviewed By: ajkr Differential Revision: D30885059 Pulled By: jay-zhuang fbshipit-source-id: dd2730b31580b0bcaedffc318a762d7dbf25de4a
This commit is contained in:
parent
081722780b
commit
29102641dd
@ -20,6 +20,8 @@
|
|||||||
* Made FileSystem extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.
|
* Made FileSystem extend the Customizable class and added a CreateFromString method. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.
|
||||||
* Clarified in API comments that RocksDB is not exception safe for callbacks and custom extensions. An exception propagating into RocksDB can lead to undefined behavior, including data loss, unreported corruption, deadlocks, and more.
|
* Clarified in API comments that RocksDB is not exception safe for callbacks and custom extensions. An exception propagating into RocksDB can lead to undefined behavior, including data loss, unreported corruption, deadlocks, and more.
|
||||||
* Marked `WriteBufferManager` as `final` because it is not intended for extension.
|
* Marked `WriteBufferManager` as `final` because it is not intended for extension.
|
||||||
|
* Add API `FSDirectory::FsyncWithDirOptions()`, which provides extra information like directory fsync reason in `DirFsyncOptions`. File system like btrfs is using that to skip directory fsync for creating a new file, or when renaming a file, fsync the target file instead of the directory, which improves the `DB::Open()` speed by ~20%.
|
||||||
|
* `DB::Open()` is not going be blocked by obsolete file purge if `DBOptions::avoid_unnecessary_blocking_io` is set to true.
|
||||||
|
|
||||||
## 6.26.0 (2021-10-20)
|
## 6.26.0 (2021-10-20)
|
||||||
### Bug Fixes
|
### Bug Fixes
|
||||||
|
3
Makefile
3
Makefile
@ -1926,6 +1926,9 @@ clipping_iterator_test: $(OBJ_DIR)/db/compaction/clipping_iterator_test.o $(TEST
|
|||||||
ribbon_bench: $(OBJ_DIR)/microbench/ribbon_bench.o $(LIBRARY)
|
ribbon_bench: $(OBJ_DIR)/microbench/ribbon_bench.o $(LIBRARY)
|
||||||
$(AM_LINK)
|
$(AM_LINK)
|
||||||
|
|
||||||
|
db_basic_bench: $(OBJ_DIR)/microbench/db_basic_bench.o $(LIBRARY)
|
||||||
|
$(AM_LINK)
|
||||||
|
|
||||||
cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
|
cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
|
||||||
$(AM_LINK)
|
$(AM_LINK)
|
||||||
#-------------------------------------------------
|
#-------------------------------------------------
|
||||||
|
@ -763,12 +763,16 @@ Status CompactionJob::Run() {
|
|||||||
constexpr IODebugContext* dbg = nullptr;
|
constexpr IODebugContext* dbg = nullptr;
|
||||||
|
|
||||||
if (output_directory_) {
|
if (output_directory_) {
|
||||||
io_s = output_directory_->Fsync(IOOptions(), dbg);
|
io_s = output_directory_->FsyncWithDirOptions(
|
||||||
|
IOOptions(), dbg,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
|
if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
|
||||||
blob_output_directory_ != output_directory_) {
|
blob_output_directory_ != output_directory_) {
|
||||||
io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
|
io_s = blob_output_directory_->FsyncWithDirOptions(
|
||||||
|
IOOptions(), dbg,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (io_status_.ok()) {
|
if (io_status_.ok()) {
|
||||||
@ -2442,7 +2446,8 @@ Status CompactionServiceCompactionJob::Run() {
|
|||||||
constexpr IODebugContext* dbg = nullptr;
|
constexpr IODebugContext* dbg = nullptr;
|
||||||
|
|
||||||
if (output_directory_) {
|
if (output_directory_) {
|
||||||
io_s = output_directory_->Fsync(IOOptions(), dbg);
|
io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (io_status_.ok()) {
|
if (io_status_.ok()) {
|
||||||
|
@ -1389,7 +1389,9 @@ Status DBImpl::SyncWAL() {
|
|||||||
IOStatusCheck(io_s);
|
IOStatusCheck(io_s);
|
||||||
}
|
}
|
||||||
if (status.ok() && need_log_dir_sync) {
|
if (status.ok() && need_log_dir_sync) {
|
||||||
status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
|
status = directories_.GetWalDir()->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
}
|
}
|
||||||
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
|
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
|
||||||
|
|
||||||
@ -4288,6 +4290,14 @@ Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) {
|
|||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
// Retry if the file name happen to conflict with an existing one.
|
// Retry if the file name happen to conflict with an existing one.
|
||||||
s = GetEnv()->RenameFile(file_name, options_file_name);
|
s = GetEnv()->RenameFile(file_name, options_file_name);
|
||||||
|
std::unique_ptr<FSDirectory> dir_obj;
|
||||||
|
if (s.ok()) {
|
||||||
|
s = fs_->NewDirectory(GetName(), IOOptions(), &dir_obj, nullptr);
|
||||||
|
}
|
||||||
|
if (s.ok()) {
|
||||||
|
s = dir_obj->FsyncWithDirOptions(IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(options_file_name));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
@ -974,6 +974,9 @@ class DBImpl : public DB {
|
|||||||
// is only for the special test of CancelledCompactions
|
// is only for the special test of CancelledCompactions
|
||||||
Status TEST_WaitForCompact(bool waitUnscheduled = false);
|
Status TEST_WaitForCompact(bool waitUnscheduled = false);
|
||||||
|
|
||||||
|
// Wait for any background purge
|
||||||
|
Status TEST_WaitForPurge();
|
||||||
|
|
||||||
// Get the background error status
|
// Get the background error status
|
||||||
Status TEST_GetBGError();
|
Status TEST_GetBGError();
|
||||||
|
|
||||||
|
@ -120,7 +120,9 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (io_s.ok()) {
|
if (io_s.ok()) {
|
||||||
io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
|
io_s = directories_.GetWalDir()->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
}
|
}
|
||||||
|
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
@ -532,7 +534,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
|
|||||||
// Sync on all distinct output directories.
|
// Sync on all distinct output directories.
|
||||||
for (auto dir : distinct_output_dirs) {
|
for (auto dir : distinct_output_dirs) {
|
||||||
if (dir != nullptr) {
|
if (dir != nullptr) {
|
||||||
Status error_status = dir->Fsync(IOOptions(), nullptr);
|
Status error_status = dir->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
if (!error_status.ok()) {
|
if (!error_status.ok()) {
|
||||||
s = error_status;
|
s = error_status;
|
||||||
break;
|
break;
|
||||||
|
@ -184,6 +184,14 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
|
|||||||
return error_handler_.GetBGError();
|
return error_handler_.GetBGError();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status DBImpl::TEST_WaitForPurge() {
|
||||||
|
InstrumentedMutexLock l(&mutex_);
|
||||||
|
while (bg_purge_scheduled_ && error_handler_.GetBGError().ok()) {
|
||||||
|
bg_cv_.Wait();
|
||||||
|
}
|
||||||
|
return error_handler_.GetBGError();
|
||||||
|
}
|
||||||
|
|
||||||
Status DBImpl::TEST_GetBGError() {
|
Status DBImpl::TEST_GetBGError() {
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
return error_handler_.GetBGError();
|
return error_handler_.GetBGError();
|
||||||
|
@ -655,7 +655,13 @@ void DBImpl::DeleteObsoleteFiles() {
|
|||||||
|
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
if (job_context.HaveSomethingToDelete()) {
|
if (job_context.HaveSomethingToDelete()) {
|
||||||
PurgeObsoleteFiles(job_context);
|
bool defer_purge = immutable_db_options_.avoid_unnecessary_blocking_io;
|
||||||
|
PurgeObsoleteFiles(job_context, defer_purge);
|
||||||
|
if (defer_purge) {
|
||||||
|
mutex_.Lock();
|
||||||
|
SchedulePurge();
|
||||||
|
mutex_.Unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
job_context.Clean();
|
job_context.Clean();
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
|
@ -1694,10 +1694,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
if (impl->two_write_queues_) {
|
if (impl->two_write_queues_) {
|
||||||
impl->log_write_mutex_.Unlock();
|
impl->log_write_mutex_.Unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
impl->DeleteObsoleteFiles();
|
|
||||||
s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
|
|
||||||
TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFilesAndSyncDir");
|
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
// In WritePrepared there could be gap in sequence numbers. This breaks
|
// In WritePrepared there could be gap in sequence numbers. This breaks
|
||||||
@ -1770,6 +1766,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
|
|
||||||
*dbptr = impl;
|
*dbptr = impl;
|
||||||
impl->opened_successfully_ = true;
|
impl->opened_successfully_ = true;
|
||||||
|
impl->DeleteObsoleteFiles();
|
||||||
|
TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
|
||||||
impl->MaybeScheduleFlushOrCompaction();
|
impl->MaybeScheduleFlushOrCompaction();
|
||||||
} else {
|
} else {
|
||||||
persist_options_status.PermitUncheckedError();
|
persist_options_status.PermitUncheckedError();
|
||||||
|
@ -1142,7 +1142,9 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
|
|||||||
// We only sync WAL directory the first time WAL syncing is
|
// We only sync WAL directory the first time WAL syncing is
|
||||||
// requested, so that in case users never turn on WAL sync,
|
// requested, so that in case users never turn on WAL sync,
|
||||||
// we can avoid the disk I/O in the write code path.
|
// we can avoid the disk I/O in the write code path.
|
||||||
io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
|
io_s = directories_.GetWalDir()->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -690,7 +690,7 @@ TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
|
|||||||
SyncPoint::GetInstance()->LoadDependency(
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
{{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
|
{{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
|
||||||
"VersionSet::ProcessManifestWrites:BeforeNewManifest"},
|
"VersionSet::ProcessManifestWrites:BeforeNewManifest"},
|
||||||
{"DBImpl::Open:AfterDeleteFilesAndSyncDir",
|
{"DBImpl::Open:AfterDeleteFiles",
|
||||||
"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
|
"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
|
||||||
"1"}});
|
"1"}});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
@ -117,10 +117,16 @@ class DeleteFileTest : public DBTestBase {
|
|||||||
manifest_cnt += (type == kDescriptorFile);
|
manifest_cnt += (type == kDescriptorFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (required_log >= 0) {
|
||||||
ASSERT_EQ(required_log, log_cnt);
|
ASSERT_EQ(required_log, log_cnt);
|
||||||
|
}
|
||||||
|
if (required_sst >= 0) {
|
||||||
ASSERT_EQ(required_sst, sst_cnt);
|
ASSERT_EQ(required_sst, sst_cnt);
|
||||||
|
}
|
||||||
|
if (required_manifest >= 0) {
|
||||||
ASSERT_EQ(required_manifest, manifest_cnt);
|
ASSERT_EQ(required_manifest, manifest_cnt);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void DoSleep(void* arg) {
|
static void DoSleep(void* arg) {
|
||||||
auto test = reinterpret_cast<DeleteFileTest*>(arg);
|
auto test = reinterpret_cast<DeleteFileTest*>(arg);
|
||||||
@ -264,6 +270,41 @@ TEST_F(DeleteFileTest, BackgroundPurgeIteratorTest) {
|
|||||||
CheckFileTypeCounts(dbname_, 0, 1, 1);
|
CheckFileTypeCounts(dbname_, 0, 1, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DeleteFileTest, PurgeDuringOpen) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
CheckFileTypeCounts(dbname_, -1, 0, -1);
|
||||||
|
Close();
|
||||||
|
std::unique_ptr<WritableFile> file;
|
||||||
|
ASSERT_OK(options.env->NewWritableFile(dbname_ + "/000002.sst", &file,
|
||||||
|
EnvOptions()));
|
||||||
|
ASSERT_OK(file->Close());
|
||||||
|
CheckFileTypeCounts(dbname_, -1, 1, -1);
|
||||||
|
options.avoid_unnecessary_blocking_io = false;
|
||||||
|
options.create_if_missing = false;
|
||||||
|
Reopen(options);
|
||||||
|
CheckFileTypeCounts(dbname_, -1, 0, -1);
|
||||||
|
Close();
|
||||||
|
|
||||||
|
// test background purge
|
||||||
|
options.avoid_unnecessary_blocking_io = true;
|
||||||
|
options.create_if_missing = false;
|
||||||
|
ASSERT_OK(options.env->NewWritableFile(dbname_ + "/000002.sst", &file,
|
||||||
|
EnvOptions()));
|
||||||
|
ASSERT_OK(file->Close());
|
||||||
|
CheckFileTypeCounts(dbname_, -1, 1, -1);
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
|
{{"DeleteFileTest::PurgeDuringOpen:1", "DBImpl::BGWorkPurge:start"}});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
Reopen(options);
|
||||||
|
// the obsolete file is not deleted until the background purge job is ran
|
||||||
|
CheckFileTypeCounts(dbname_, -1, 1, -1);
|
||||||
|
TEST_SYNC_POINT("DeleteFileTest::PurgeDuringOpen:1");
|
||||||
|
ASSERT_OK(dbfull()->TEST_WaitForPurge());
|
||||||
|
CheckFileTypeCounts(dbname_, -1, 0, -1);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) {
|
TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) {
|
||||||
Options options = CurrentOptions();
|
Options options = CurrentOptions();
|
||||||
SetOptions(&options);
|
SetOptions(&options);
|
||||||
@ -310,6 +351,11 @@ TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) {
|
|||||||
do_test(false);
|
do_test(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
options.avoid_unnecessary_blocking_io = true;
|
||||||
|
options.create_if_missing = false;
|
||||||
|
Reopen(options);
|
||||||
|
ASSERT_OK(dbfull()->TEST_WaitForPurge());
|
||||||
|
|
||||||
SyncPoint::GetInstance()->DisableProcessing();
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
SyncPoint::GetInstance()->ClearAllCallBacks();
|
SyncPoint::GetInstance()->ClearAllCallBacks();
|
||||||
SyncPoint::GetInstance()->LoadDependency(
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
@ -317,9 +363,6 @@ TEST_F(DeleteFileTest, BackgroundPurgeCFDropTest) {
|
|||||||
"DBImpl::BGWorkPurge:start"}});
|
"DBImpl::BGWorkPurge:start"}});
|
||||||
SyncPoint::GetInstance()->EnableProcessing();
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
options.avoid_unnecessary_blocking_io = true;
|
|
||||||
options.create_if_missing = false;
|
|
||||||
Reopen(options);
|
|
||||||
{
|
{
|
||||||
SCOPED_TRACE("avoid_unnecessary_blocking_io = true");
|
SCOPED_TRACE("avoid_unnecessary_blocking_io = true");
|
||||||
do_test(true);
|
do_test(true);
|
||||||
|
@ -168,7 +168,9 @@ Status ExternalSstFileIngestionJob::Prepare(
|
|||||||
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
|
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
for (auto path_id : ingestion_path_ids) {
|
for (auto path_id : ingestion_path_ids) {
|
||||||
status = directories_->GetDataDir(path_id)->Fsync(IOOptions(), nullptr);
|
status = directories_->GetDataDir(path_id)->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
ROCKS_LOG_WARN(db_options_.info_log,
|
ROCKS_LOG_WARN(db_options_.info_log,
|
||||||
"Failed to sync directory %" ROCKSDB_PRIszt
|
"Failed to sync directory %" ROCKSDB_PRIszt
|
||||||
|
@ -937,7 +937,9 @@ Status FlushJob::WriteLevel0Table() {
|
|||||||
meta_.marked_for_compaction ? " (needs compaction)" : "");
|
meta_.marked_for_compaction ? " (needs compaction)" : "");
|
||||||
|
|
||||||
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
|
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
|
||||||
s = output_file_directory_->Fsync(IOOptions(), nullptr);
|
s = output_file_directory_->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
|
||||||
}
|
}
|
||||||
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
|
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
|
||||||
db_mutex_->Lock();
|
db_mutex_->Lock();
|
||||||
|
2
env/composite_env.cc
vendored
2
env/composite_env.cc
vendored
@ -272,7 +272,7 @@ class CompositeDirectoryWrapper : public Directory {
|
|||||||
Status Fsync() override {
|
Status Fsync() override {
|
||||||
IOOptions io_opts;
|
IOOptions io_opts;
|
||||||
IODebugContext dbg;
|
IODebugContext dbg;
|
||||||
return target_->Fsync(io_opts, &dbg);
|
return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions());
|
||||||
}
|
}
|
||||||
size_t GetUniqueId(char* id, size_t max_size) const override {
|
size_t GetUniqueId(char* id, size_t max_size) const override {
|
||||||
return target_->GetUniqueId(id, max_size);
|
return target_->GetUniqueId(id, max_size);
|
||||||
|
12
env/file_system.cc
vendored
12
env/file_system.cc
vendored
@ -241,4 +241,16 @@ std::string FileSystemWrapper::SerializeOptions(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
|
DirFsyncOptions::DirFsyncOptions() { reason = kDefault; }
|
||||||
|
|
||||||
|
DirFsyncOptions::DirFsyncOptions(std::string file_renamed_new_name) {
|
||||||
|
reason = kFileRenamed;
|
||||||
|
renamed_new_name = file_renamed_new_name;
|
||||||
|
}
|
||||||
|
|
||||||
|
DirFsyncOptions::DirFsyncOptions(FsyncReason fsync_reason) {
|
||||||
|
assert(fsync_reason != kFileRenamed);
|
||||||
|
reason = fsync_reason;
|
||||||
|
}
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
52
env/io_posix.cc
vendored
52
env/io_posix.cc
vendored
@ -1534,17 +1534,61 @@ PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
|
|||||||
/*
|
/*
|
||||||
* PosixDirectory
|
* PosixDirectory
|
||||||
*/
|
*/
|
||||||
|
#if !defined(BTRFS_SUPER_MAGIC)
|
||||||
|
// The magic number for BTRFS is fixed, if it's not defined, define it here
|
||||||
|
#define BTRFS_SUPER_MAGIC 0x9123683E
|
||||||
|
#endif
|
||||||
|
PosixDirectory::PosixDirectory(int fd) : fd_(fd) {
|
||||||
|
is_btrfs_ = false;
|
||||||
|
#ifdef OS_LINUX
|
||||||
|
struct statfs buf;
|
||||||
|
int ret = fstatfs(fd, &buf);
|
||||||
|
is_btrfs_ = (ret == 0 && buf.f_type == BTRFS_SUPER_MAGIC);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
PosixDirectory::~PosixDirectory() { close(fd_); }
|
PosixDirectory::~PosixDirectory() { close(fd_); }
|
||||||
|
|
||||||
IOStatus PosixDirectory::Fsync(const IOOptions& /*opts*/,
|
IOStatus PosixDirectory::Fsync(const IOOptions& opts, IODebugContext* dbg) {
|
||||||
IODebugContext* /*dbg*/) {
|
return FsyncWithDirOptions(opts, dbg, DirFsyncOptions());
|
||||||
|
}
|
||||||
|
|
||||||
|
IOStatus PosixDirectory::FsyncWithDirOptions(
|
||||||
|
const IOOptions& /*opts*/, IODebugContext* /*dbg*/,
|
||||||
|
const DirFsyncOptions& dir_fsync_options) {
|
||||||
|
IOStatus s = IOStatus::OK();
|
||||||
#ifndef OS_AIX
|
#ifndef OS_AIX
|
||||||
|
if (is_btrfs_) {
|
||||||
|
// skip dir fsync for new file creation, which is not needed for btrfs
|
||||||
|
if (dir_fsync_options.reason == DirFsyncOptions::kNewFileSynced) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
// skip dir fsync for renaming file, only need to sync new file
|
||||||
|
if (dir_fsync_options.reason == DirFsyncOptions::kFileRenamed) {
|
||||||
|
std::string new_name = dir_fsync_options.renamed_new_name;
|
||||||
|
assert(!new_name.empty());
|
||||||
|
int fd;
|
||||||
|
do {
|
||||||
|
IOSTATS_TIMER_GUARD(open_nanos);
|
||||||
|
fd = open(new_name.c_str(), O_RDONLY);
|
||||||
|
} while (fd < 0 && errno == EINTR);
|
||||||
|
if (fd < 0) {
|
||||||
|
s = IOError("While open renaming file", new_name, errno);
|
||||||
|
} else if (fsync(fd) < 0) {
|
||||||
|
s = IOError("While fsync renaming file", new_name, errno);
|
||||||
|
}
|
||||||
|
if (close(fd) < 0) {
|
||||||
|
s = IOError("While closing file after fsync", new_name, errno);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
// fallback to dir-fsync for kDefault, kDirRenamed and kFileDeleted
|
||||||
|
}
|
||||||
if (fsync(fd_) == -1) {
|
if (fsync(fd_) == -1) {
|
||||||
return IOError("While fsync", "a directory", errno);
|
s = IOError("While fsync", "a directory", errno);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
return IOStatus::OK();
|
return s;
|
||||||
}
|
}
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
#endif
|
#endif
|
||||||
|
7
env/io_posix.h
vendored
7
env/io_posix.h
vendored
@ -391,12 +391,17 @@ struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer {
|
|||||||
|
|
||||||
class PosixDirectory : public FSDirectory {
|
class PosixDirectory : public FSDirectory {
|
||||||
public:
|
public:
|
||||||
explicit PosixDirectory(int fd) : fd_(fd) {}
|
explicit PosixDirectory(int fd);
|
||||||
~PosixDirectory();
|
~PosixDirectory();
|
||||||
virtual IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
|
virtual IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
|
||||||
|
|
||||||
|
virtual IOStatus FsyncWithDirOptions(
|
||||||
|
const IOOptions&, IODebugContext*,
|
||||||
|
const DirFsyncOptions& dir_fsync_options) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int fd_;
|
int fd_;
|
||||||
|
bool is_btrfs_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
@ -357,7 +357,9 @@ Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
|
|||||||
s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
|
s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
s = dir_obj->Fsync(IOOptions(), nullptr);
|
s = dir_obj->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
|
||||||
TEST_SYNC_POINT_CALLBACK(
|
TEST_SYNC_POINT_CALLBACK(
|
||||||
"DeleteScheduler::DeleteTrashFile::AfterSyncDir",
|
"DeleteScheduler::DeleteTrashFile::AfterSyncDir",
|
||||||
reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
|
reinterpret_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
|
||||||
|
@ -405,7 +405,8 @@ IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
|
|||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
if (directory_to_fsync != nullptr) {
|
if (directory_to_fsync != nullptr) {
|
||||||
s = directory_to_fsync->Fsync(IOOptions(), nullptr);
|
s = directory_to_fsync->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr, DirFsyncOptions(CurrentFileName(dbname)));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fs->DeleteFile(tmp, IOOptions(), nullptr)
|
fs->DeleteFile(tmp, IOOptions(), nullptr)
|
||||||
@ -428,9 +429,19 @@ Status SetIdentityFile(Env* env, const std::string& dbname,
|
|||||||
assert(!id.empty());
|
assert(!id.empty());
|
||||||
// Reserve the filename dbname/000000.dbtmp for the temporary identity file
|
// Reserve the filename dbname/000000.dbtmp for the temporary identity file
|
||||||
std::string tmp = TempFileName(dbname, 0);
|
std::string tmp = TempFileName(dbname, 0);
|
||||||
|
std::string identify_file_name = IdentityFileName(dbname);
|
||||||
Status s = WriteStringToFile(env, id, tmp, true);
|
Status s = WriteStringToFile(env, id, tmp, true);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
s = env->RenameFile(tmp, IdentityFileName(dbname));
|
s = env->RenameFile(tmp, identify_file_name);
|
||||||
|
}
|
||||||
|
std::unique_ptr<FSDirectory> dir_obj;
|
||||||
|
if (s.ok()) {
|
||||||
|
s = env->GetFileSystem()->NewDirectory(dbname, IOOptions(), &dir_obj,
|
||||||
|
nullptr);
|
||||||
|
}
|
||||||
|
if (s.ok()) {
|
||||||
|
s = dir_obj->FsyncWithDirOptions(IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(identify_file_name));
|
||||||
}
|
}
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
env->DeleteFile(tmp).PermitUncheckedError();
|
env->DeleteFile(tmp).PermitUncheckedError();
|
||||||
|
@ -100,7 +100,36 @@ struct IOOptions {
|
|||||||
// such as NewRandomAccessFile and NewWritableFile.
|
// such as NewRandomAccessFile and NewWritableFile.
|
||||||
std::unordered_map<std::string, std::string> property_bag;
|
std::unordered_map<std::string, std::string> property_bag;
|
||||||
|
|
||||||
IOOptions() : timeout(0), prio(IOPriority::kIOLow), type(IOType::kUnknown) {}
|
// Force directory fsync, some file systems like btrfs may skip directory
|
||||||
|
// fsync, set this to force the fsync
|
||||||
|
bool force_dir_fsync;
|
||||||
|
|
||||||
|
IOOptions() : IOOptions(false) {}
|
||||||
|
|
||||||
|
explicit IOOptions(bool force_dir_fsync_)
|
||||||
|
: timeout(std::chrono::microseconds::zero()),
|
||||||
|
prio(IOPriority::kIOLow),
|
||||||
|
type(IOType::kUnknown),
|
||||||
|
force_dir_fsync(force_dir_fsync_) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DirFsyncOptions {
|
||||||
|
enum FsyncReason : uint8_t {
|
||||||
|
kNewFileSynced,
|
||||||
|
kFileRenamed,
|
||||||
|
kDirRenamed,
|
||||||
|
kFileDeleted,
|
||||||
|
kDefault,
|
||||||
|
} reason;
|
||||||
|
|
||||||
|
std::string renamed_new_name; // for kFileRenamed
|
||||||
|
// add other options for other FsyncReason
|
||||||
|
|
||||||
|
DirFsyncOptions();
|
||||||
|
|
||||||
|
explicit DirFsyncOptions(std::string file_renamed_new_name);
|
||||||
|
|
||||||
|
explicit DirFsyncOptions(FsyncReason fsync_reason);
|
||||||
};
|
};
|
||||||
|
|
||||||
// File scope options that control how a file is opened/created and accessed
|
// File scope options that control how a file is opened/created and accessed
|
||||||
@ -1111,6 +1140,15 @@ class FSDirectory {
|
|||||||
// Fsync directory. Can be called concurrently from multiple threads.
|
// Fsync directory. Can be called concurrently from multiple threads.
|
||||||
virtual IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) = 0;
|
virtual IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) = 0;
|
||||||
|
|
||||||
|
// FsyncWithDirOptions after renaming a file. Depends on the filesystem, it
|
||||||
|
// may fsync directory or just the renaming file (e.g. btrfs). By default, it
|
||||||
|
// just calls directory fsync.
|
||||||
|
virtual IOStatus FsyncWithDirOptions(
|
||||||
|
const IOOptions& options, IODebugContext* dbg,
|
||||||
|
const DirFsyncOptions& /*dir_fsync_options*/) {
|
||||||
|
return Fsync(options, dbg);
|
||||||
|
}
|
||||||
|
|
||||||
virtual size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const {
|
virtual size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1623,6 +1661,13 @@ class FSDirectoryWrapper : public FSDirectory {
|
|||||||
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
|
IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
|
||||||
return target_->Fsync(options, dbg);
|
return target_->Fsync(options, dbg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IOStatus FsyncWithDirOptions(
|
||||||
|
const IOOptions& options, IODebugContext* dbg,
|
||||||
|
const DirFsyncOptions& dir_fsync_options) override {
|
||||||
|
return target_->FsyncWithDirOptions(options, dbg, dir_fsync_options);
|
||||||
|
}
|
||||||
|
|
||||||
size_t GetUniqueId(char* id, size_t max_size) const override {
|
size_t GetUniqueId(char* id, size_t max_size) const override {
|
||||||
return target_->GetUniqueId(id, max_size);
|
return target_->GetUniqueId(id, max_size);
|
||||||
}
|
}
|
||||||
|
134
microbench/db_basic_bench.cc
Normal file
134
microbench/db_basic_bench.cc
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||||
|
// This source code is licensed under both the GPLv2 (found in the
|
||||||
|
// COPYING file in the root directory) and Apache 2.0 License
|
||||||
|
// (found in the LICENSE.Apache file in the root directory).
|
||||||
|
|
||||||
|
// this is a simple micro-benchmark for compare ribbon filter vs. other filter
|
||||||
|
// for more comprehensive, please check the dedicate util/filter_bench.
|
||||||
|
#include <benchmark/benchmark.h>
|
||||||
|
|
||||||
|
#include "rocksdb/db.h"
|
||||||
|
#include "rocksdb/options.h"
|
||||||
|
#include "util/random.h"
|
||||||
|
|
||||||
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
|
||||||
|
static void DBOpen(benchmark::State& state) {
|
||||||
|
// create DB
|
||||||
|
DB* db;
|
||||||
|
Options options;
|
||||||
|
auto env = Env::Default();
|
||||||
|
std::string db_path;
|
||||||
|
auto s = env->GetTestDirectory(&db_path);
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
std::string db_name = db_path + "/bench_dbopen";
|
||||||
|
|
||||||
|
DestroyDB(db_name, options);
|
||||||
|
|
||||||
|
options.create_if_missing = true;
|
||||||
|
s = DB::Open(options, db_name, &db);
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
db->Close();
|
||||||
|
|
||||||
|
options.create_if_missing = false;
|
||||||
|
|
||||||
|
auto rnd = Random(12345);
|
||||||
|
|
||||||
|
for (auto _ : state) {
|
||||||
|
s = DB::Open(options, db_name, &db);
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
state.PauseTiming();
|
||||||
|
auto wo = WriteOptions();
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
for (int j = 0; j < 100; j++) {
|
||||||
|
s = db->Put(wo, rnd.RandomString(10), rnd.RandomString(100));
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s = db->Flush(FlushOptions());
|
||||||
|
}
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
s = db->Close();
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
state.ResumeTiming();
|
||||||
|
}
|
||||||
|
DestroyDB(db_name, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
BENCHMARK(DBOpen)->Iterations(200); // specify iteration number as the db size
|
||||||
|
// is impacted by iteration number
|
||||||
|
|
||||||
|
static void DBClose(benchmark::State& state) {
|
||||||
|
// create DB
|
||||||
|
DB* db;
|
||||||
|
Options options;
|
||||||
|
auto env = Env::Default();
|
||||||
|
std::string db_path;
|
||||||
|
auto s = env->GetTestDirectory(&db_path);
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
std::string db_name = db_path + "/bench_dbclose";
|
||||||
|
|
||||||
|
DestroyDB(db_name, options);
|
||||||
|
|
||||||
|
options.create_if_missing = true;
|
||||||
|
s = DB::Open(options, db_name, &db);
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
db->Close();
|
||||||
|
|
||||||
|
options.create_if_missing = false;
|
||||||
|
|
||||||
|
auto rnd = Random(12345);
|
||||||
|
|
||||||
|
for (auto _ : state) {
|
||||||
|
state.PauseTiming();
|
||||||
|
s = DB::Open(options, db_name, &db);
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
auto wo = WriteOptions();
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
for (int j = 0; j < 100; j++) {
|
||||||
|
s = db->Put(wo, rnd.RandomString(10), rnd.RandomString(100));
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s = db->Flush(FlushOptions());
|
||||||
|
}
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
state.ResumeTiming();
|
||||||
|
s = db->Close();
|
||||||
|
if (!s.ok()) {
|
||||||
|
state.SkipWithError(s.ToString().c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
DestroyDB(db_name, options);
|
||||||
|
}
|
||||||
|
|
||||||
|
BENCHMARK(DBClose)->Iterations(200); // specify iteration number as the db size
|
||||||
|
// is impacted by iteration number
|
||||||
|
|
||||||
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
|
||||||
|
BENCHMARK_MAIN();
|
@ -6,6 +6,7 @@
|
|||||||
#include "util/timer.h"
|
#include "util/timer.h"
|
||||||
|
|
||||||
#include "db/db_test_util.h"
|
#include "db/db_test_util.h"
|
||||||
|
#include "rocksdb/file_system.h"
|
||||||
#include "test_util/mock_time_env.h"
|
#include "test_util/mock_time_env.h"
|
||||||
|
|
||||||
namespace ROCKSDB_NAMESPACE {
|
namespace ROCKSDB_NAMESPACE {
|
||||||
|
@ -1438,19 +1438,24 @@ IOStatus BackupEngineImpl::CreateNewBackupWithMetadata(
|
|||||||
io_options_, &backup_private_directory, nullptr)
|
io_options_, &backup_private_directory, nullptr)
|
||||||
.PermitUncheckedError();
|
.PermitUncheckedError();
|
||||||
if (backup_private_directory != nullptr) {
|
if (backup_private_directory != nullptr) {
|
||||||
io_s = backup_private_directory->Fsync(io_options_, nullptr);
|
io_s = backup_private_directory->FsyncWithDirOptions(io_options_, nullptr,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
if (io_s.ok() && private_directory_ != nullptr) {
|
if (io_s.ok() && private_directory_ != nullptr) {
|
||||||
io_s = private_directory_->Fsync(io_options_, nullptr);
|
io_s = private_directory_->FsyncWithDirOptions(io_options_, nullptr,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
if (io_s.ok() && meta_directory_ != nullptr) {
|
if (io_s.ok() && meta_directory_ != nullptr) {
|
||||||
io_s = meta_directory_->Fsync(io_options_, nullptr);
|
io_s = meta_directory_->FsyncWithDirOptions(io_options_, nullptr,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
if (io_s.ok() && shared_directory_ != nullptr) {
|
if (io_s.ok() && shared_directory_ != nullptr) {
|
||||||
io_s = shared_directory_->Fsync(io_options_, nullptr);
|
io_s = shared_directory_->FsyncWithDirOptions(io_options_, nullptr,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
if (io_s.ok() && backup_directory_ != nullptr) {
|
if (io_s.ok() && backup_directory_ != nullptr) {
|
||||||
io_s = backup_directory_->Fsync(io_options_, nullptr);
|
io_s = backup_directory_->FsyncWithDirOptions(io_options_, nullptr,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1832,15 +1837,17 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// When enabled, the first Fsync is to ensure all files are fully persisted
|
// When enabled, the first FsyncWithDirOptions is to ensure all files are
|
||||||
// before renaming CURRENT.tmp
|
// fully persisted before renaming CURRENT.tmp
|
||||||
if (io_s.ok() && db_dir_for_fsync) {
|
if (io_s.ok() && db_dir_for_fsync) {
|
||||||
ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n");
|
ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n");
|
||||||
io_s = db_dir_for_fsync->Fsync(io_options_, nullptr);
|
io_s = db_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (io_s.ok() && wal_dir_for_fsync) {
|
if (io_s.ok() && wal_dir_for_fsync) {
|
||||||
io_s = wal_dir_for_fsync->Fsync(io_options_, nullptr);
|
io_s = wal_dir_for_fsync->FsyncWithDirOptions(io_options_, nullptr,
|
||||||
|
DirFsyncOptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (io_s.ok() && !temporary_current_file.empty()) {
|
if (io_s.ok() && !temporary_current_file.empty()) {
|
||||||
@ -1851,11 +1858,12 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
|
if (io_s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
|
||||||
// Second Fsync is to ensure the final atomic rename of DB restore is
|
// Second FsyncWithDirOptions is to ensure the final atomic rename of DB
|
||||||
// fully persisted even if power goes out right after restore operation
|
// restore is fully persisted even if power goes out right after restore
|
||||||
// returns success
|
// operation returns success
|
||||||
assert(db_dir_for_fsync);
|
assert(db_dir_for_fsync);
|
||||||
io_s = db_dir_for_fsync->Fsync(io_options_, nullptr);
|
io_s = db_dir_for_fsync->FsyncWithDirOptions(
|
||||||
|
io_options_, nullptr, DirFsyncOptions(final_current_file));
|
||||||
}
|
}
|
||||||
|
|
||||||
ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
|
ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
|
||||||
|
@ -177,7 +177,8 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
|
|||||||
"Failed to create blob_dir %s, status: %s",
|
"Failed to create blob_dir %s, status: %s",
|
||||||
blob_dir_.c_str(), s.ToString().c_str());
|
blob_dir_.c_str(), s.ToString().c_str());
|
||||||
}
|
}
|
||||||
s = env_->NewDirectory(blob_dir_, &dir_ent_);
|
s = env_->GetFileSystem()->NewDirectory(blob_dir_, IOOptions(), &dir_ent_,
|
||||||
|
nullptr);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
"Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
|
"Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
|
||||||
@ -1913,7 +1914,7 @@ Status BlobDBImpl::SyncBlobFiles() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s = dir_ent_->Fsync();
|
s = dir_ent_->FsyncWithDirOptions(IOOptions(), nullptr, DirFsyncOptions());
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_ERROR(db_options_.info_log,
|
ROCKS_LOG_ERROR(db_options_.info_log,
|
||||||
"Failed to sync blob directory, status: %s",
|
"Failed to sync blob directory, status: %s",
|
||||||
@ -2005,7 +2006,9 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
|
|||||||
|
|
||||||
// directory change. Fsync
|
// directory change. Fsync
|
||||||
if (file_deleted) {
|
if (file_deleted) {
|
||||||
Status s = dir_ent_->Fsync();
|
Status s = dir_ent_->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
|
ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
|
||||||
blob_dir_.c_str(), s.ToString().c_str());
|
blob_dir_.c_str(), s.ToString().c_str());
|
||||||
|
@ -419,7 +419,7 @@ class BlobDBImpl : public BlobDB {
|
|||||||
std::string blob_dir_;
|
std::string blob_dir_;
|
||||||
|
|
||||||
// pointer to directory
|
// pointer to directory
|
||||||
std::unique_ptr<Directory> dir_ent_;
|
std::unique_ptr<FSDirectory> dir_ent_;
|
||||||
|
|
||||||
// Read Write Mutex, which protects all the data structures
|
// Read Write Mutex, which protects all the data structures
|
||||||
// HEAVILY TRAFFICKED
|
// HEAVILY TRAFFICKED
|
||||||
|
@ -160,10 +160,13 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
|
|||||||
s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
|
s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
std::unique_ptr<Directory> checkpoint_directory;
|
std::unique_ptr<FSDirectory> checkpoint_directory;
|
||||||
s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
|
s = db_->GetFileSystem()->NewDirectory(checkpoint_dir, IOOptions(),
|
||||||
|
&checkpoint_directory, nullptr);
|
||||||
if (s.ok() && checkpoint_directory != nullptr) {
|
if (s.ok() && checkpoint_directory != nullptr) {
|
||||||
s = checkpoint_directory->Fsync();
|
s = checkpoint_directory->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kDirRenamed));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -348,11 +351,14 @@ Status CheckpointImpl::ExportColumnFamily(
|
|||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
// Fsync export directory.
|
// Fsync export directory.
|
||||||
moved_to_user_specified_dir = true;
|
moved_to_user_specified_dir = true;
|
||||||
std::unique_ptr<Directory> dir_ptr;
|
std::unique_ptr<FSDirectory> dir_ptr;
|
||||||
s = db_->GetEnv()->NewDirectory(export_dir, &dir_ptr);
|
s = db_->GetFileSystem()->NewDirectory(export_dir, IOOptions(), &dir_ptr,
|
||||||
|
nullptr);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
assert(dir_ptr != nullptr);
|
assert(dir_ptr != nullptr);
|
||||||
s = dir_ptr->Fsync();
|
s = dir_ptr->FsyncWithDirOptions(
|
||||||
|
IOOptions(), nullptr,
|
||||||
|
DirFsyncOptions(DirFsyncOptions::FsyncReason::kDirRenamed));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,6 +108,29 @@ IOStatus TestFSDirectory::Fsync(const IOOptions& options, IODebugContext* dbg) {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IOStatus TestFSDirectory::FsyncWithDirOptions(
|
||||||
|
const IOOptions& options, IODebugContext* dbg,
|
||||||
|
const DirFsyncOptions& dir_fsync_options) {
|
||||||
|
if (!fs_->IsFilesystemActive()) {
|
||||||
|
return fs_->GetError();
|
||||||
|
}
|
||||||
|
{
|
||||||
|
IOStatus in_s = fs_->InjectMetadataWriteError();
|
||||||
|
if (!in_s.ok()) {
|
||||||
|
return in_s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fs_->SyncDir(dirname_);
|
||||||
|
IOStatus s = dir_->FsyncWithDirOptions(options, dbg, dir_fsync_options);
|
||||||
|
{
|
||||||
|
IOStatus in_s = fs_->InjectMetadataWriteError();
|
||||||
|
if (!in_s.ok()) {
|
||||||
|
return in_s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
TestFSWritableFile::TestFSWritableFile(const std::string& fname,
|
TestFSWritableFile::TestFSWritableFile(const std::string& fname,
|
||||||
const FileOptions& file_opts,
|
const FileOptions& file_opts,
|
||||||
std::unique_ptr<FSWritableFile>&& f,
|
std::unique_ptr<FSWritableFile>&& f,
|
||||||
|
@ -177,6 +177,10 @@ class TestFSDirectory : public FSDirectory {
|
|||||||
virtual IOStatus Fsync(const IOOptions& options,
|
virtual IOStatus Fsync(const IOOptions& options,
|
||||||
IODebugContext* dbg) override;
|
IODebugContext* dbg) override;
|
||||||
|
|
||||||
|
virtual IOStatus FsyncWithDirOptions(
|
||||||
|
const IOOptions& options, IODebugContext* dbg,
|
||||||
|
const DirFsyncOptions& dir_fsync_options) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
FaultInjectionTestFS* fs_;
|
FaultInjectionTestFS* fs_;
|
||||||
std::string dirname_;
|
std::string dirname_;
|
||||||
|
Loading…
Reference in New Issue
Block a user