Make DBImpl::has_unpersisted_data_ atomic
Summary: Seems to me `has_unpersisted_data_` is read from read thread and write from write thread concurrently without synchronization. Making it an atomic. I update the logic not because seeing any problem with it, but it just feel confusing. Closes https://github.com/facebook/rocksdb/pull/1869 Differential Revision: D4555837 Pulled By: yiwu-arbug fbshipit-source-id: eff2ab8
This commit is contained in:
parent
eb912a927e
commit
c2247dc1c7
@ -388,7 +388,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
|
|||||||
"Shutdown: canceling all background work");
|
"Shutdown: canceling all background work");
|
||||||
|
|
||||||
if (!shutting_down_.load(std::memory_order_acquire) &&
|
if (!shutting_down_.load(std::memory_order_acquire) &&
|
||||||
has_unpersisted_data_ &&
|
has_unpersisted_data_.load(std::memory_order_relaxed) &&
|
||||||
!mutable_db_options_.avoid_flush_during_shutdown) {
|
!mutable_db_options_.avoid_flush_during_shutdown) {
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||||
if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) {
|
if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) {
|
||||||
@ -4036,8 +4036,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
|
|||||||
LookupKey lkey(key, snapshot);
|
LookupKey lkey(key, snapshot);
|
||||||
PERF_TIMER_STOP(get_snapshot_time);
|
PERF_TIMER_STOP(get_snapshot_time);
|
||||||
|
|
||||||
bool skip_memtable =
|
bool skip_memtable = (read_options.read_tier == kPersistedTier &&
|
||||||
(read_options.read_tier == kPersistedTier && has_unpersisted_data_);
|
has_unpersisted_data_.load(std::memory_order_relaxed));
|
||||||
bool done = false;
|
bool done = false;
|
||||||
if (!skip_memtable) {
|
if (!skip_memtable) {
|
||||||
if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg,
|
if (sv->mem->Get(lkey, value, &s, &merge_context, &range_del_agg,
|
||||||
@ -4142,7 +4142,8 @@ std::vector<Status> DBImpl::MultiGet(
|
|||||||
auto mgd = mgd_iter->second;
|
auto mgd = mgd_iter->second;
|
||||||
auto super_version = mgd->super_version;
|
auto super_version = mgd->super_version;
|
||||||
bool skip_memtable =
|
bool skip_memtable =
|
||||||
(read_options.read_tier == kPersistedTier && has_unpersisted_data_);
|
(read_options.read_tier == kPersistedTier &&
|
||||||
|
has_unpersisted_data_.load(std::memory_order_relaxed));
|
||||||
bool done = false;
|
bool done = false;
|
||||||
if (!skip_memtable) {
|
if (!skip_memtable) {
|
||||||
if (super_version->mem->Get(lkey, value, &s, &merge_context,
|
if (super_version->mem->Get(lkey, value, &s, &merge_context,
|
||||||
@ -4875,7 +4876,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
|||||||
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
||||||
|
|
||||||
if (write_options.disableWAL) {
|
if (write_options.disableWAL) {
|
||||||
has_unpersisted_data_ = true;
|
has_unpersisted_data_.store(true, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t log_size = 0;
|
uint64_t log_size = 0;
|
||||||
|
@ -998,8 +998,7 @@ class DBImpl : public DB {
|
|||||||
// A flag indicating whether the current rocksdb database has any
|
// A flag indicating whether the current rocksdb database has any
|
||||||
// data that is not yet persisted into either WAL or SST file.
|
// data that is not yet persisted into either WAL or SST file.
|
||||||
// Used when disableWAL is true.
|
// Used when disableWAL is true.
|
||||||
bool has_unpersisted_data_;
|
std::atomic<bool> has_unpersisted_data_;
|
||||||
|
|
||||||
|
|
||||||
// if an attempt was made to flush all column families that
|
// if an attempt was made to flush all column families that
|
||||||
// the oldest log depends on but uncommited data in the oldest
|
// the oldest log depends on but uncommited data in the oldest
|
||||||
|
Loading…
Reference in New Issue
Block a user