Handle rename() failure in non-local FS (#8192)
Summary: In a distributed environment, a file `rename()` operation can succeed on server (remote) side, but the client can somehow return non-ok status to RocksDB. Possible reasons include network partition, connection issue, etc. This happens in `rocksdb::SetCurrentFile()`, which can be called in `LogAndApply() -> ProcessManifestWrites()` if RocksDB tries to switch to a new MANIFEST. We currently always delete the new MANIFEST if an error occurs. This is problematic in distributed world. If the server-side successfully updates the CURRENT file via renaming, then a subsequent `DB::Open()` will try to look for the new MANIFEST and fail. As a fix, we can track the execution result of IO operations on the new MANIFEST. - If IO operations on the new MANIFEST fail, then we know the CURRENT must point to the original MANIFEST. Therefore, it is safe to remove the new MANIFEST. - If IO operations on the new MANIFEST all succeed, but somehow we end up in the clean up code block, then we do not know whether CURRENT points to the new or old MANIFEST. (For local POSIX-compliant FS, it should still point to old MANIFEST, but it does not matter if we keep the new MANIFEST.) Therefore, we keep the new MANIFEST. - Any future `LogAndApply()` will switch to a new MANIFEST and update CURRENT. - If process reopens the db immediately after the failure, then the CURRENT file can point to either the new MANIFEST or the old one, both of which exist. Therefore, recovery can succeed and ignore the other. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8192 Test Plan: make check Reviewed By: zhichao-cao Differential Revision: D27804648 Pulled By: riversand963 fbshipit-source-id: 9c16f2a5ce41bc6aadf085e48449b19ede8423e4
This commit is contained in:
parent
0c6e4674a6
commit
a376c22066
@ -1,4 +1,8 @@
|
|||||||
# Rocksdb Change Log
|
# Rocksdb Change Log
|
||||||
|
## Unreleased
|
||||||
|
### Bug Fixes
|
||||||
|
* Fixed a bug in handling file rename error in distributed/network file systems when the server succeeds but client returns error. The bug can cause CURRENT file to point to non-existing MANIFEST file, thus DB cannot be opened.
|
||||||
|
|
||||||
## 6.20.0 (04/16/2021)
|
## 6.20.0 (04/16/2021)
|
||||||
### Behavior Changes
|
### Behavior Changes
|
||||||
* `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush.
|
* `ColumnFamilyOptions::sample_for_compression` now takes effect for creation of all block-based tables. Previously it only took effect for block-based tables created by flush.
|
||||||
|
@ -943,7 +943,7 @@ Status DBImpl::DeleteUnreferencedSstFiles() {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (largest_file_number > next_file_number) {
|
if (largest_file_number >= next_file_number) {
|
||||||
versions_->next_file_number_.store(largest_file_number + 1);
|
versions_->next_file_number_.store(largest_file_number + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,6 +285,9 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
|
|||||||
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
|
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
|
||||||
const std::string manifest = DescriptorFileName(dbname_, 1);
|
const std::string manifest = DescriptorFileName(dbname_, 1);
|
||||||
{
|
{
|
||||||
|
if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
|
||||||
|
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
|
||||||
|
}
|
||||||
std::unique_ptr<FSWritableFile> file;
|
std::unique_ptr<FSWritableFile> file;
|
||||||
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
|
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
|
||||||
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
|
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
|
||||||
@ -314,7 +317,7 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
|
|||||||
manifest.substr(manifest.find_last_of("/\\") + 1));
|
manifest.substr(manifest.find_last_of("/\\") + 1));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fs_->DeleteFile(manifest, IOOptions(), nullptr);
|
fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
@ -5439,6 +5439,98 @@ TEST_F(DBTest2, AutoPrefixMode1) {
|
|||||||
ASSERT_EQ("a1", iterator->key().ToString());
|
ASSERT_EQ("a1", iterator->key().ToString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class RenameCurrentTest : public DBTestBase,
|
||||||
|
public testing::WithParamInterface<std::string> {
|
||||||
|
public:
|
||||||
|
RenameCurrentTest()
|
||||||
|
: DBTestBase("rename_current_test", /*env_do_fsync=*/true),
|
||||||
|
sync_point_(GetParam()) {}
|
||||||
|
|
||||||
|
~RenameCurrentTest() override {}
|
||||||
|
|
||||||
|
void SetUp() override {
|
||||||
|
env_->no_file_overwrite_.store(true, std::memory_order_release);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TearDown() override {
|
||||||
|
env_->no_file_overwrite_.store(false, std::memory_order_release);
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetupSyncPoints() {
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
SyncPoint::GetInstance()->SetCallBack(sync_point_, [&](void* arg) {
|
||||||
|
Status* s = reinterpret_cast<Status*>(arg);
|
||||||
|
assert(s);
|
||||||
|
*s = Status::IOError("Injected IO error.");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const std::string sync_point_;
|
||||||
|
};
|
||||||
|
|
||||||
|
INSTANTIATE_TEST_CASE_P(DistributedFS, RenameCurrentTest,
|
||||||
|
::testing::Values("SetCurrentFile:BeforeRename",
|
||||||
|
"SetCurrentFile:AfterRename"));
|
||||||
|
|
||||||
|
TEST_P(RenameCurrentTest, Open) {
|
||||||
|
Destroy(last_options_);
|
||||||
|
Options options = GetDefaultOptions();
|
||||||
|
options.create_if_missing = true;
|
||||||
|
SetupSyncPoints();
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
Status s = TryReopen(options);
|
||||||
|
ASSERT_NOK(s);
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
Reopen(options);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(RenameCurrentTest, Flush) {
|
||||||
|
Destroy(last_options_);
|
||||||
|
Options options = GetDefaultOptions();
|
||||||
|
options.max_manifest_file_size = 1;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
Reopen(options);
|
||||||
|
ASSERT_OK(Put("key", "value"));
|
||||||
|
SetupSyncPoints();
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
ASSERT_NOK(Flush());
|
||||||
|
|
||||||
|
ASSERT_NOK(Put("foo", "value"));
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
Reopen(options);
|
||||||
|
ASSERT_EQ("value", Get("key"));
|
||||||
|
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_P(RenameCurrentTest, Compaction) {
|
||||||
|
Destroy(last_options_);
|
||||||
|
Options options = GetDefaultOptions();
|
||||||
|
options.max_manifest_file_size = 1;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
Reopen(options);
|
||||||
|
ASSERT_OK(Put("a", "a_value"));
|
||||||
|
ASSERT_OK(Put("c", "c_value"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
ASSERT_OK(Put("b", "b_value"));
|
||||||
|
ASSERT_OK(Put("d", "d_value"));
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
|
||||||
|
SetupSyncPoints();
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
ASSERT_NOK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
|
||||||
|
/*end=*/nullptr));
|
||||||
|
|
||||||
|
ASSERT_NOK(Put("foo", "value"));
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->DisableProcessing();
|
||||||
|
Reopen(options);
|
||||||
|
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||||
|
ASSERT_EQ("d_value", Get("d"));
|
||||||
|
}
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.
|
// WAL recovery mode is WALRecoveryMode::kPointInTimeRecovery.
|
||||||
|
@ -44,6 +44,7 @@ SpecialEnv::SpecialEnv(Env* base, bool time_elapse_only_sleep)
|
|||||||
manifest_sync_error_.store(false, std::memory_order_release);
|
manifest_sync_error_.store(false, std::memory_order_release);
|
||||||
manifest_write_error_.store(false, std::memory_order_release);
|
manifest_write_error_.store(false, std::memory_order_release);
|
||||||
log_write_error_.store(false, std::memory_order_release);
|
log_write_error_.store(false, std::memory_order_release);
|
||||||
|
no_file_overwrite_.store(false, std::memory_order_release);
|
||||||
random_file_open_counter_.store(0, std::memory_order_relaxed);
|
random_file_open_counter_.store(0, std::memory_order_relaxed);
|
||||||
delete_count_.store(0, std::memory_order_relaxed);
|
delete_count_.store(0, std::memory_order_relaxed);
|
||||||
num_open_wal_file_.store(0);
|
num_open_wal_file_.store(0);
|
||||||
|
@ -440,6 +440,11 @@ class SpecialEnv : public EnvWrapper {
|
|||||||
std::unique_ptr<WritableFile> base_;
|
std::unique_ptr<WritableFile> base_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
if (no_file_overwrite_.load(std::memory_order_acquire) &&
|
||||||
|
target()->FileExists(f).ok()) {
|
||||||
|
return Status::NotSupported("SpecialEnv::no_file_overwrite_ is true.");
|
||||||
|
}
|
||||||
|
|
||||||
if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
|
if (non_writeable_rate_.load(std::memory_order_acquire) > 0) {
|
||||||
uint32_t random_number;
|
uint32_t random_number;
|
||||||
{
|
{
|
||||||
@ -687,6 +692,9 @@ class SpecialEnv : public EnvWrapper {
|
|||||||
// Slow down every log write, in micro-seconds.
|
// Slow down every log write, in micro-seconds.
|
||||||
std::atomic<int> log_write_slowdown_;
|
std::atomic<int> log_write_slowdown_;
|
||||||
|
|
||||||
|
// If true, returns Status::NotSupported for file overwrite.
|
||||||
|
std::atomic<bool> no_file_overwrite_;
|
||||||
|
|
||||||
// Number of WAL files that are still open for write.
|
// Number of WAL files that are still open for write.
|
||||||
std::atomic<int> num_open_wal_file_;
|
std::atomic<int> num_open_wal_file_;
|
||||||
|
|
||||||
|
@ -4083,6 +4083,7 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
uint64_t new_manifest_file_size = 0;
|
uint64_t new_manifest_file_size = 0;
|
||||||
Status s;
|
Status s;
|
||||||
IOStatus io_s;
|
IOStatus io_s;
|
||||||
|
IOStatus manifest_io_status;
|
||||||
{
|
{
|
||||||
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
|
FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
|
||||||
mu->Unlock();
|
mu->Unlock();
|
||||||
@ -4134,6 +4135,7 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
s = WriteCurrentStateToManifest(curr_state, wal_additions,
|
s = WriteCurrentStateToManifest(curr_state, wal_additions,
|
||||||
descriptor_log_.get(), io_s);
|
descriptor_log_.get(), io_s);
|
||||||
} else {
|
} else {
|
||||||
|
manifest_io_status = io_s;
|
||||||
s = io_s;
|
s = io_s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4171,11 +4173,13 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
io_s = descriptor_log_->AddRecord(record);
|
io_s = descriptor_log_->AddRecord(record);
|
||||||
if (!io_s.ok()) {
|
if (!io_s.ok()) {
|
||||||
s = io_s;
|
s = io_s;
|
||||||
|
manifest_io_status = io_s;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
io_s = SyncManifest(db_options_, descriptor_log_->file());
|
io_s = SyncManifest(db_options_, descriptor_log_->file());
|
||||||
|
manifest_io_status = io_s;
|
||||||
TEST_SYNC_POINT_CALLBACK(
|
TEST_SYNC_POINT_CALLBACK(
|
||||||
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
|
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
|
||||||
}
|
}
|
||||||
@ -4188,6 +4192,9 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
|
|
||||||
// If we just created a new descriptor file, install it by writing a
|
// If we just created a new descriptor file, install it by writing a
|
||||||
// new CURRENT file that points to it.
|
// new CURRENT file that points to it.
|
||||||
|
if (s.ok()) {
|
||||||
|
assert(manifest_io_status.ok());
|
||||||
|
}
|
||||||
if (s.ok() && new_descriptor_log) {
|
if (s.ok() && new_descriptor_log) {
|
||||||
io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
|
io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
|
||||||
db_directory);
|
db_directory);
|
||||||
@ -4303,11 +4310,41 @@ Status VersionSet::ProcessManifestWrites(
|
|||||||
for (auto v : versions) {
|
for (auto v : versions) {
|
||||||
delete v;
|
delete v;
|
||||||
}
|
}
|
||||||
|
if (manifest_io_status.ok()) {
|
||||||
|
manifest_file_number_ = pending_manifest_file_number_;
|
||||||
|
manifest_file_size_ = new_manifest_file_size;
|
||||||
|
}
|
||||||
// If manifest append failed for whatever reason, the file could be
|
// If manifest append failed for whatever reason, the file could be
|
||||||
// corrupted. So we need to force the next version update to start a
|
// corrupted. So we need to force the next version update to start a
|
||||||
// new manifest file.
|
// new manifest file.
|
||||||
descriptor_log_.reset();
|
descriptor_log_.reset();
|
||||||
if (new_descriptor_log) {
|
// If manifest operations failed, then we know the CURRENT file still
|
||||||
|
// points to the original MANIFEST. Therefore, we can safely delete the
|
||||||
|
// new MANIFEST.
|
||||||
|
// If manifest operations succeeded, and we are here, then it is possible
|
||||||
|
// that renaming tmp file to CURRENT failed.
|
||||||
|
//
|
||||||
|
// On local POSIX-compliant FS, the CURRENT must point to the original
|
||||||
|
// MANIFEST. We can delete the new MANIFEST for simplicity, but we can also
|
||||||
|
// keep it. Future recovery will ignore this MANIFEST. It's also ok for the
|
||||||
|
// process not to crash and continue using the db. Any future LogAndApply()
|
||||||
|
// call will switch to a new MANIFEST and update CURRENT, still ignoring
|
||||||
|
// this one.
|
||||||
|
//
|
||||||
|
// On non-local FS, it is
|
||||||
|
// possible that the rename operation succeeded on the server (remote)
|
||||||
|
// side, but the client somehow returns a non-ok status to RocksDB. Note
|
||||||
|
// that this does not violate atomicity. Should we delete the new MANIFEST
|
||||||
|
// successfully, a subsequent recovery attempt will likely see the CURRENT
|
||||||
|
// pointing to the new MANIFEST, thus fail. We will not be able to open the
|
||||||
|
// DB again. Therefore, if manifest operations succeed, we should keep the
|
||||||
|
// the new MANIFEST. If the process proceeds, any future LogAndApply() call
|
||||||
|
// will switch to a new MANIFEST and update CURRENT. If user tries to
|
||||||
|
// re-open the DB,
|
||||||
|
// a) CURRENT points to the new MANIFEST, and the new MANIFEST is present.
|
||||||
|
// b) CURRENT points to the original MANIFEST, and the original MANIFEST
|
||||||
|
// also exists.
|
||||||
|
if (new_descriptor_log && !manifest_io_status.ok()) {
|
||||||
ROCKS_LOG_INFO(db_options_->info_log,
|
ROCKS_LOG_INFO(db_options_->info_log,
|
||||||
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
|
"Deleting manifest %" PRIu64 " current manifest %" PRIu64
|
||||||
"\n",
|
"\n",
|
||||||
|
@ -383,10 +383,12 @@ IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
|
|||||||
contents.remove_prefix(dbname.size() + 1);
|
contents.remove_prefix(dbname.size() + 1);
|
||||||
std::string tmp = TempFileName(dbname, descriptor_number);
|
std::string tmp = TempFileName(dbname, descriptor_number);
|
||||||
IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true);
|
IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true);
|
||||||
|
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:BeforeRename", &s);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
|
TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
|
||||||
s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr);
|
s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr);
|
||||||
TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
|
TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
|
||||||
|
TEST_SYNC_POINT_CALLBACK("SetCurrentFile:AfterRename", &s);
|
||||||
}
|
}
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
if (directory_to_fsync != nullptr) {
|
if (directory_to_fsync != nullptr) {
|
||||||
|
@ -1479,8 +1479,8 @@ public class RocksDBTest {
|
|||||||
assertThat(livefiles.manifestFileSize).isEqualTo(57);
|
assertThat(livefiles.manifestFileSize).isEqualTo(57);
|
||||||
assertThat(livefiles.files.size()).isEqualTo(3);
|
assertThat(livefiles.files.size()).isEqualTo(3);
|
||||||
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
|
assertThat(livefiles.files.get(0)).isEqualTo("/CURRENT");
|
||||||
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000003");
|
assertThat(livefiles.files.get(1)).isEqualTo("/MANIFEST-000004");
|
||||||
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000006");
|
assertThat(livefiles.files.get(2)).isEqualTo("/OPTIONS-000007");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2718,19 +2718,19 @@ TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) {
|
|||||||
OpenDBAndBackupEngine(true);
|
OpenDBAndBackupEngine(true);
|
||||||
|
|
||||||
ASSERT_OK(backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared"));
|
ASSERT_OK(backup_chroot_env_->CreateDirIfMissing(backupdir_ + "/shared"));
|
||||||
std::string file_five = backupdir_ + "/shared/000008.sst";
|
std::string file_five = backupdir_ + "/shared/000009.sst";
|
||||||
std::string file_five_contents = "I'm not really a sst file";
|
std::string file_five_contents = "I'm not really a sst file";
|
||||||
// this depends on the fact that 00008.sst is the first file created by the DB
|
// this depends on the fact that 00009.sst is the first file created by the DB
|
||||||
ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents));
|
ASSERT_OK(file_manager_->WriteToFile(file_five, file_five_contents));
|
||||||
|
|
||||||
FillDB(db_.get(), 0, 100);
|
FillDB(db_.get(), 0, 100);
|
||||||
// backup overwrites file 000008.sst
|
// backup overwrites file 000009.sst
|
||||||
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), true));
|
||||||
|
|
||||||
std::string new_file_five_contents;
|
std::string new_file_five_contents;
|
||||||
ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five,
|
ASSERT_OK(ReadFileToString(backup_chroot_env_.get(), file_five,
|
||||||
&new_file_five_contents));
|
&new_file_five_contents));
|
||||||
// file 000008.sst was overwritten
|
// file 000009.sst was overwritten
|
||||||
ASSERT_TRUE(new_file_five_contents != file_five_contents);
|
ASSERT_TRUE(new_file_five_contents != file_five_contents);
|
||||||
|
|
||||||
CloseDBAndBackupEngine();
|
CloseDBAndBackupEngine();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user