Flush job should release reference current version if sync log failed
Summary: Fix the bug when sync log fail, FlushJob::Run() will not be execute and reference to cfd->current() will not be release. Closes https://github.com/facebook/rocksdb/pull/1792 Differential Revision: D4441316 Pulled By: yiwu-arbug fbshipit-source-id: 5523e28
This commit is contained in:
parent
512e441819
commit
7079b78827
@ -9,6 +9,7 @@
|
|||||||
|
|
||||||
#include "db/db_test_util.h"
|
#include "db/db_test_util.h"
|
||||||
#include "port/stack_trace.h"
|
#include "port/stack_trace.h"
|
||||||
|
#include "util/fault_injection_test_env.h"
|
||||||
#include "util/sync_point.h"
|
#include "util/sync_point.h"
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
@ -47,6 +48,38 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
|
|||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBFlushTest, SyncFail) {
|
||||||
|
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
|
||||||
|
new FaultInjectionTestEnv(Env::Default()));
|
||||||
|
Options options;
|
||||||
|
options.disable_auto_compactions = true;
|
||||||
|
options.env = fault_injection_env.get();
|
||||||
|
|
||||||
|
SyncPoint::GetInstance()->LoadDependency(
|
||||||
|
{{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
|
||||||
|
{"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
|
||||||
|
SyncPoint::GetInstance()->EnableProcessing();
|
||||||
|
|
||||||
|
Reopen(options);
|
||||||
|
Put("key", "value");
|
||||||
|
auto* cfd =
|
||||||
|
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
|
||||||
|
->cfd();
|
||||||
|
int refs_before = cfd->current()->TEST_refs();
|
||||||
|
FlushOptions flush_options;
|
||||||
|
flush_options.wait = false;
|
||||||
|
ASSERT_OK(dbfull()->Flush(flush_options));
|
||||||
|
fault_injection_env->SetFilesystemActive(false);
|
||||||
|
TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
|
||||||
|
TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
|
||||||
|
fault_injection_env->SetFilesystemActive(true);
|
||||||
|
dbfull()->TEST_WaitForFlushMemTable();
|
||||||
|
ASSERT_EQ("", FilesPerLevel()); // flush failed.
|
||||||
|
// Flush job should release ref count to current version.
|
||||||
|
ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
|
||||||
|
Destroy(options);
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -1842,6 +1842,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
|||||||
}
|
}
|
||||||
|
|
||||||
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
|
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
|
||||||
|
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
|
||||||
mutex_.AssertHeld();
|
mutex_.AssertHeld();
|
||||||
autovector<log::Writer*, 1> logs_to_sync;
|
autovector<log::Writer*, 1> logs_to_sync;
|
||||||
uint64_t current_log_number = logfile_number_;
|
uint64_t current_log_number = logfile_number_;
|
||||||
@ -1878,6 +1879,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
|
|||||||
MarkLogsSynced(current_log_number - 1, true, s);
|
MarkLogsSynced(current_log_number - 1, true, s);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
bg_error_ = s;
|
bg_error_ = s;
|
||||||
|
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1928,6 +1930,8 @@ Status DBImpl::FlushMemTableToOutputFile(
|
|||||||
// is unlocked by the current thread.
|
// is unlocked by the current thread.
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
s = flush_job.Run(&file_meta);
|
s = flush_job.Run(&file_meta);
|
||||||
|
} else {
|
||||||
|
flush_job.Cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
@ -2762,7 +2766,7 @@ void DBImpl::MarkLogsSynced(
|
|||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert(logs_.empty() || logs_[0].number > up_to ||
|
assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
|
||||||
(logs_.size() == 1 && !logs_[0].getting_synced));
|
(logs_.size() == 1 && !logs_[0].getting_synced));
|
||||||
log_sync_cv_.SignalAll();
|
log_sync_cv_.SignalAll();
|
||||||
}
|
}
|
||||||
|
@ -230,6 +230,12 @@ Status FlushJob::Run(FileMetaData* file_meta) {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FlushJob::Cancel() {
|
||||||
|
db_mutex_->AssertHeld();
|
||||||
|
assert(base_ != nullptr);
|
||||||
|
base_->Unref();
|
||||||
|
}
|
||||||
|
|
||||||
Status FlushJob::WriteLevel0Table() {
|
Status FlushJob::WriteLevel0Table() {
|
||||||
AutoThreadOperationStageUpdater stage_updater(
|
AutoThreadOperationStageUpdater stage_updater(
|
||||||
ThreadStatus::STAGE_FLUSH_WRITE_L0);
|
ThreadStatus::STAGE_FLUSH_WRITE_L0);
|
||||||
|
@ -67,9 +67,11 @@ class FlushJob {
|
|||||||
|
|
||||||
~FlushJob();
|
~FlushJob();
|
||||||
|
|
||||||
// Require db_mutex held
|
// Require db_mutex held.
|
||||||
|
// Once PickMemTable() is called, either Run() or Cancel() has to be call.
|
||||||
void PickMemTable();
|
void PickMemTable();
|
||||||
Status Run(FileMetaData* file_meta = nullptr);
|
Status Run(FileMetaData* file_meta = nullptr);
|
||||||
|
void Cancel();
|
||||||
TableProperties GetTableProperties() const { return table_properties_; }
|
TableProperties GetTableProperties() const { return table_properties_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -520,6 +520,8 @@ class Version {
|
|||||||
return next_;
|
return next_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int TEST_refs() const { return refs_; }
|
||||||
|
|
||||||
VersionStorageInfo* storage_info() { return &storage_info_; }
|
VersionStorageInfo* storage_info() { return &storage_info_; }
|
||||||
|
|
||||||
VersionSet* version_set() { return vset_; }
|
VersionSet* version_set() { return vset_; }
|
||||||
|
@ -149,7 +149,7 @@ Status TestWritableFile::Flush() {
|
|||||||
|
|
||||||
Status TestWritableFile::Sync() {
|
Status TestWritableFile::Sync() {
|
||||||
if (!env_->IsFilesystemActive()) {
|
if (!env_->IsFilesystemActive()) {
|
||||||
return Status::OK();
|
return Status::IOError("FaultInjectionTestEnv: not active");
|
||||||
}
|
}
|
||||||
// No need to actual sync.
|
// No need to actual sync.
|
||||||
state_.pos_at_last_sync_ = state_.pos_;
|
state_.pos_at_last_sync_ = state_.pos_;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user