delete superversions in BackgroundCallPurge (#6146)
Summary: I found that CleanupSuperVersion() may block Get() for 30ms+ (per MemTable is 256MB). Then I found "delete sv" in ~SuperVersion() takes the time. The backtrace looks like this DBImpl::GetImpl() -> DBImpl::ReturnAndCleanupSuperVersion() -> DBImpl::CleanupSuperVersion() : delete sv; -> ~SuperVersion() I think it's better to delete in a background thread, please review it。 Pull Request resolved: https://github.com/facebook/rocksdb/pull/6146 Differential Revision: D18972066 fbshipit-source-id: 0f7b0b70b9bb1e27ad6fc1c8a408fbbf237ae08c
This commit is contained in:
parent
7168d16103
commit
924bc5fb95
@ -64,7 +64,7 @@ Status ArenaWrappedDBIter::Refresh() {
|
|||||||
arena_.~Arena();
|
arena_.~Arena();
|
||||||
new (&arena_) Arena();
|
new (&arena_) Arena();
|
||||||
|
|
||||||
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex());
|
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
|
||||||
if (read_callback_) {
|
if (read_callback_) {
|
||||||
read_callback_->Refresh(latest_seq);
|
read_callback_->Refresh(latest_seq);
|
||||||
}
|
}
|
||||||
|
@ -1080,9 +1080,8 @@ Compaction* ColumnFamilyData::CompactRange(
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
|
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) {
|
||||||
InstrumentedMutex* db_mutex) {
|
SuperVersion* sv = GetThreadLocalSuperVersion(db);
|
||||||
SuperVersion* sv = GetThreadLocalSuperVersion(db_mutex);
|
|
||||||
sv->Ref();
|
sv->Ref();
|
||||||
if (!ReturnThreadLocalSuperVersion(sv)) {
|
if (!ReturnThreadLocalSuperVersion(sv)) {
|
||||||
// This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
|
// This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion()
|
||||||
@ -1094,8 +1093,7 @@ SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
|
|||||||
return sv;
|
return sv;
|
||||||
}
|
}
|
||||||
|
|
||||||
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
|
SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) {
|
||||||
InstrumentedMutex* db_mutex) {
|
|
||||||
// The SuperVersion is cached in thread local storage to avoid acquiring
|
// The SuperVersion is cached in thread local storage to avoid acquiring
|
||||||
// mutex when SuperVersion does not change since the last use. When a new
|
// mutex when SuperVersion does not change since the last use. When a new
|
||||||
// SuperVersion is installed, the compaction or flush thread cleans up
|
// SuperVersion is installed, the compaction or flush thread cleans up
|
||||||
@ -1122,16 +1120,21 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
|
|||||||
|
|
||||||
if (sv && sv->Unref()) {
|
if (sv && sv->Unref()) {
|
||||||
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
|
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
|
||||||
db_mutex->Lock();
|
db->mutex()->Lock();
|
||||||
// NOTE: underlying resources held by superversion (sst files) might
|
// NOTE: underlying resources held by superversion (sst files) might
|
||||||
// not be released until the next background job.
|
// not be released until the next background job.
|
||||||
sv->Cleanup();
|
sv->Cleanup();
|
||||||
sv_to_delete = sv;
|
if (db->immutable_db_options().avoid_unnecessary_blocking_io) {
|
||||||
|
db->AddSuperVersionsToFreeQueue(sv);
|
||||||
|
db->SchedulePurge();
|
||||||
} else {
|
} else {
|
||||||
db_mutex->Lock();
|
sv_to_delete = sv;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
db->mutex()->Lock();
|
||||||
}
|
}
|
||||||
sv = super_version_->Ref();
|
sv = super_version_->Ref();
|
||||||
db_mutex->Unlock();
|
db->mutex()->Unlock();
|
||||||
|
|
||||||
delete sv_to_delete;
|
delete sv_to_delete;
|
||||||
}
|
}
|
||||||
|
@ -430,11 +430,11 @@ class ColumnFamilyData {
|
|||||||
SuperVersion* GetSuperVersion() { return super_version_; }
|
SuperVersion* GetSuperVersion() { return super_version_; }
|
||||||
// thread-safe
|
// thread-safe
|
||||||
// Return a already referenced SuperVersion to be used safely.
|
// Return a already referenced SuperVersion to be used safely.
|
||||||
SuperVersion* GetReferencedSuperVersion(InstrumentedMutex* db_mutex);
|
SuperVersion* GetReferencedSuperVersion(DBImpl* db);
|
||||||
// thread-safe
|
// thread-safe
|
||||||
// Get SuperVersion stored in thread local storage. If it does not exist,
|
// Get SuperVersion stored in thread local storage. If it does not exist,
|
||||||
// get a reference from a current SuperVersion.
|
// get a reference from a current SuperVersion.
|
||||||
SuperVersion* GetThreadLocalSuperVersion(InstrumentedMutex* db_mutex);
|
SuperVersion* GetThreadLocalSuperVersion(DBImpl* db);
|
||||||
// Try to return SuperVersion back to thread local storage. Retrun true on
|
// Try to return SuperVersion back to thread local storage. Retrun true on
|
||||||
// success and false on failure. It fails when the thread local storage
|
// success and false on failure. It fails when the thread local storage
|
||||||
// contains anything other than SuperVersion::kSVInUse flag.
|
// contains anything other than SuperVersion::kSVInUse flag.
|
||||||
|
@ -879,7 +879,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
|
|||||||
column_family);
|
column_family);
|
||||||
ColumnFamilyData* cfd = cfh->cfd();
|
ColumnFamilyData* cfd = cfh->cfd();
|
||||||
|
|
||||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
|
||||||
Version* version = super_version->current;
|
Version* version = super_version->current;
|
||||||
|
|
||||||
Status s =
|
Status s =
|
||||||
@ -895,7 +895,6 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
|
|||||||
AddToLogsToFreeQueue(l);
|
AddToLogsToFreeQueue(l);
|
||||||
}
|
}
|
||||||
job_context->logs_to_free.clear();
|
job_context->logs_to_free.clear();
|
||||||
SchedulePurge();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1322,6 +1321,14 @@ void DBImpl::BackgroundCallPurge() {
|
|||||||
delete log_writer;
|
delete log_writer;
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
}
|
}
|
||||||
|
while (!superversions_to_free_queue_.empty()) {
|
||||||
|
assert(!superversions_to_free_queue_.empty());
|
||||||
|
SuperVersion* sv = superversions_to_free_queue_.front();
|
||||||
|
superversions_to_free_queue_.pop_front();
|
||||||
|
mutex_.Unlock();
|
||||||
|
delete sv;
|
||||||
|
mutex_.Lock();
|
||||||
|
}
|
||||||
for (const auto& file : purge_files_) {
|
for (const auto& file : purge_files_) {
|
||||||
const PurgeFileInfo& purge_file = file.second;
|
const PurgeFileInfo& purge_file = file.second;
|
||||||
const std::string& fname = purge_file.fname;
|
const std::string& fname = purge_file.fname;
|
||||||
@ -1374,10 +1381,14 @@ static void CleanupIteratorState(void* arg1, void* /*arg2*/) {
|
|||||||
state->db->FindObsoleteFiles(&job_context, false, true);
|
state->db->FindObsoleteFiles(&job_context, false, true);
|
||||||
if (state->background_purge) {
|
if (state->background_purge) {
|
||||||
state->db->ScheduleBgLogWriterClose(&job_context);
|
state->db->ScheduleBgLogWriterClose(&job_context);
|
||||||
|
state->db->AddSuperVersionsToFreeQueue(state->super_version);
|
||||||
|
state->db->SchedulePurge();
|
||||||
}
|
}
|
||||||
state->mu->Unlock();
|
state->mu->Unlock();
|
||||||
|
|
||||||
|
if (!state->background_purge) {
|
||||||
delete state->super_version;
|
delete state->super_version;
|
||||||
|
}
|
||||||
if (job_context.HaveSomethingToDelete()) {
|
if (job_context.HaveSomethingToDelete()) {
|
||||||
if (state->background_purge) {
|
if (state->background_purge) {
|
||||||
// PurgeObsoleteFiles here does not delete files. Instead, it adds the
|
// PurgeObsoleteFiles here does not delete files. Instead, it adds the
|
||||||
@ -2452,7 +2463,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
|||||||
result = nullptr;
|
result = nullptr;
|
||||||
|
|
||||||
#else
|
#else
|
||||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||||
auto iter = new ForwardIterator(this, read_options, cfd, sv);
|
auto iter = new ForwardIterator(this, read_options, cfd, sv);
|
||||||
result = NewDBIterator(
|
result = NewDBIterator(
|
||||||
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
|
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
|
||||||
@ -2478,7 +2489,7 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
|
|||||||
ReadCallback* read_callback,
|
ReadCallback* read_callback,
|
||||||
bool allow_blob,
|
bool allow_blob,
|
||||||
bool allow_refresh) {
|
bool allow_refresh) {
|
||||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||||
|
|
||||||
// Try to generate a DB iterator tree in continuous memory area to be
|
// Try to generate a DB iterator tree in continuous memory area to be
|
||||||
// cache friendly. Here is an example of result:
|
// cache friendly. Here is an example of result:
|
||||||
@ -2557,7 +2568,7 @@ Status DBImpl::NewIterators(
|
|||||||
#else
|
#else
|
||||||
for (auto cfh : column_families) {
|
for (auto cfh : column_families) {
|
||||||
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
|
||||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||||
auto iter = new ForwardIterator(this, read_options, cfd, sv);
|
auto iter = new ForwardIterator(this, read_options, cfd, sv);
|
||||||
iterators->push_back(NewDBIterator(
|
iterators->push_back(NewDBIterator(
|
||||||
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
|
env_, read_options, *cfd->ioptions(), sv->mutable_cf_options,
|
||||||
@ -2884,7 +2895,7 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
|
|||||||
|
|
||||||
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
|
SuperVersion* DBImpl::GetAndRefSuperVersion(ColumnFamilyData* cfd) {
|
||||||
// TODO(ljin): consider using GetReferencedSuperVersion() directly
|
// TODO(ljin): consider using GetReferencedSuperVersion() directly
|
||||||
return cfd->GetThreadLocalSuperVersion(&mutex_);
|
return cfd->GetThreadLocalSuperVersion(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// REQUIRED: this function should only be called on the write thread or if the
|
// REQUIRED: this function should only be called on the write thread or if the
|
||||||
@ -2902,11 +2913,19 @@ SuperVersion* DBImpl::GetAndRefSuperVersion(uint32_t column_family_id) {
|
|||||||
void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
|
void DBImpl::CleanupSuperVersion(SuperVersion* sv) {
|
||||||
// Release SuperVersion
|
// Release SuperVersion
|
||||||
if (sv->Unref()) {
|
if (sv->Unref()) {
|
||||||
|
bool defer_purge =
|
||||||
|
immutable_db_options().avoid_unnecessary_blocking_io;
|
||||||
{
|
{
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
sv->Cleanup();
|
sv->Cleanup();
|
||||||
|
if (defer_purge) {
|
||||||
|
AddSuperVersionsToFreeQueue(sv);
|
||||||
|
SchedulePurge();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (!defer_purge) {
|
||||||
delete sv;
|
delete sv;
|
||||||
|
}
|
||||||
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
|
RecordTick(stats_, NUMBER_SUPERVERSION_CLEANUPS);
|
||||||
}
|
}
|
||||||
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
|
RecordTick(stats_, NUMBER_SUPERVERSION_RELEASES);
|
||||||
@ -3912,7 +3931,7 @@ Status DBImpl::IngestExternalFiles(
|
|||||||
start_file_number += args[i - 1].external_files.size();
|
start_file_number += args[i - 1].external_files.size();
|
||||||
auto* cfd =
|
auto* cfd =
|
||||||
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
static_cast<ColumnFamilyHandleImpl*>(args[i].column_family)->cfd();
|
||||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
|
||||||
exec_results[i].second = ingestion_jobs[i].Prepare(
|
exec_results[i].second = ingestion_jobs[i].Prepare(
|
||||||
args[i].external_files, start_file_number, super_version);
|
args[i].external_files, start_file_number, super_version);
|
||||||
exec_results[i].first = true;
|
exec_results[i].first = true;
|
||||||
@ -3923,7 +3942,7 @@ Status DBImpl::IngestExternalFiles(
|
|||||||
{
|
{
|
||||||
auto* cfd =
|
auto* cfd =
|
||||||
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
|
static_cast<ColumnFamilyHandleImpl*>(args[0].column_family)->cfd();
|
||||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
|
||||||
exec_results[0].second = ingestion_jobs[0].Prepare(
|
exec_results[0].second = ingestion_jobs[0].Prepare(
|
||||||
args[0].external_files, next_file_number, super_version);
|
args[0].external_files, next_file_number, super_version);
|
||||||
exec_results[0].first = true;
|
exec_results[0].first = true;
|
||||||
@ -4192,7 +4211,7 @@ Status DBImpl::CreateColumnFamilyWithImport(
|
|||||||
dummy_sv_ctx.Clean();
|
dummy_sv_ctx.Clean();
|
||||||
|
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||||
status = import_job.Prepare(next_file_number, sv);
|
status = import_job.Prepare(next_file_number, sv);
|
||||||
CleanupSuperVersion(sv);
|
CleanupSuperVersion(sv);
|
||||||
}
|
}
|
||||||
@ -4269,7 +4288,7 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
|
|||||||
}
|
}
|
||||||
std::vector<SuperVersion*> sv_list;
|
std::vector<SuperVersion*> sv_list;
|
||||||
for (auto cfd : cfd_list) {
|
for (auto cfd : cfd_list) {
|
||||||
sv_list.push_back(cfd->GetReferencedSuperVersion(&mutex_));
|
sv_list.push_back(cfd->GetReferencedSuperVersion(this));
|
||||||
}
|
}
|
||||||
for (auto& sv : sv_list) {
|
for (auto& sv : sv_list) {
|
||||||
VersionStorageInfo* vstorage = sv->current->storage_info();
|
VersionStorageInfo* vstorage = sv->current->storage_info();
|
||||||
@ -4294,14 +4313,23 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
bool defer_purge =
|
||||||
|
immutable_db_options().avoid_unnecessary_blocking_io;
|
||||||
{
|
{
|
||||||
InstrumentedMutexLock l(&mutex_);
|
InstrumentedMutexLock l(&mutex_);
|
||||||
for (auto sv : sv_list) {
|
for (auto sv : sv_list) {
|
||||||
if (sv && sv->Unref()) {
|
if (sv && sv->Unref()) {
|
||||||
sv->Cleanup();
|
sv->Cleanup();
|
||||||
|
if (defer_purge) {
|
||||||
|
AddSuperVersionsToFreeQueue(sv);
|
||||||
|
} else {
|
||||||
delete sv;
|
delete sv;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (defer_purge) {
|
||||||
|
SchedulePurge();
|
||||||
|
}
|
||||||
for (auto cfd : cfd_list) {
|
for (auto cfd : cfd_list) {
|
||||||
cfd->Unref();
|
cfd->Unref();
|
||||||
}
|
}
|
||||||
|
@ -798,6 +798,10 @@ class DBImpl : public DB {
|
|||||||
logs_to_free_queue_.push_back(log_writer);
|
logs_to_free_queue_.push_back(log_writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void AddSuperVersionsToFreeQueue(SuperVersion* sv) {
|
||||||
|
superversions_to_free_queue_.push_back(sv);
|
||||||
|
}
|
||||||
|
|
||||||
void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
|
void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
|
||||||
|
|
||||||
// Fill JobContext with snapshot information needed by flush and compaction.
|
// Fill JobContext with snapshot information needed by flush and compaction.
|
||||||
@ -1890,6 +1894,7 @@ class DBImpl : public DB {
|
|||||||
|
|
||||||
// A queue to store log writers to close
|
// A queue to store log writers to close
|
||||||
std::deque<log::Writer*> logs_to_free_queue_;
|
std::deque<log::Writer*> logs_to_free_queue_;
|
||||||
|
std::deque<SuperVersion*> superversions_to_free_queue_;
|
||||||
int unscheduled_flushes_;
|
int unscheduled_flushes_;
|
||||||
int unscheduled_compactions_;
|
int unscheduled_compactions_;
|
||||||
|
|
||||||
|
@ -659,7 +659,7 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
|||||||
// one/both sides of the interval are unbounded. But it requires more
|
// one/both sides of the interval are unbounded. But it requires more
|
||||||
// changes to RangesOverlapWithMemtables.
|
// changes to RangesOverlapWithMemtables.
|
||||||
Range range(*begin, *end);
|
Range range(*begin, *end);
|
||||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
|
||||||
cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
|
cfd->RangesOverlapWithMemtables({range}, super_version, &flush_needed);
|
||||||
CleanupSuperVersion(super_version);
|
CleanupSuperVersion(super_version);
|
||||||
}
|
}
|
||||||
|
@ -406,7 +406,7 @@ ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
|
|||||||
const ReadOptions& read_options, ColumnFamilyData* cfd,
|
const ReadOptions& read_options, ColumnFamilyData* cfd,
|
||||||
SequenceNumber snapshot, ReadCallback* read_callback) {
|
SequenceNumber snapshot, ReadCallback* read_callback) {
|
||||||
assert(nullptr != cfd);
|
assert(nullptr != cfd);
|
||||||
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
|
SuperVersion* super_version = cfd->GetReferencedSuperVersion(this);
|
||||||
auto db_iter = NewArenaWrappedDbIterator(
|
auto db_iter = NewArenaWrappedDbIterator(
|
||||||
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
|
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
|
||||||
snapshot,
|
snapshot,
|
||||||
|
@ -4210,6 +4210,42 @@ TEST_F(DBTest2, SeekFileRangeDeleteTail) {
|
|||||||
}
|
}
|
||||||
db_->ReleaseSnapshot(s1);
|
db_->ReleaseSnapshot(s1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBTest2, BackgroundPurgeTest) {
|
||||||
|
Options options = CurrentOptions();
|
||||||
|
options.write_buffer_manager = std::make_shared<rocksdb::WriteBufferManager>(1 << 20);
|
||||||
|
options.avoid_unnecessary_blocking_io = true;
|
||||||
|
DestroyAndReopen(options);
|
||||||
|
size_t base_value = options.write_buffer_manager->memory_usage();
|
||||||
|
|
||||||
|
ASSERT_OK(Put("a", "a"));
|
||||||
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||||
|
ASSERT_OK(Flush());
|
||||||
|
size_t value = options.write_buffer_manager->memory_usage();
|
||||||
|
ASSERT_GT(value, base_value);
|
||||||
|
|
||||||
|
db_->GetEnv()->SetBackgroundThreads(1, Env::Priority::HIGH);
|
||||||
|
test::SleepingBackgroundTask sleeping_task_after;
|
||||||
|
db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||||
|
&sleeping_task_after, Env::Priority::HIGH);
|
||||||
|
delete iter;
|
||||||
|
|
||||||
|
Env::Default()->SleepForMicroseconds(100000);
|
||||||
|
value = options.write_buffer_manager->memory_usage();
|
||||||
|
ASSERT_GT(value, base_value);
|
||||||
|
|
||||||
|
sleeping_task_after.WakeUp();
|
||||||
|
sleeping_task_after.WaitUntilDone();
|
||||||
|
|
||||||
|
test::SleepingBackgroundTask sleeping_task_after2;
|
||||||
|
db_->GetEnv()->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
||||||
|
&sleeping_task_after2, Env::Priority::HIGH);
|
||||||
|
sleeping_task_after2.WakeUp();
|
||||||
|
sleeping_task_after2.WaitUntilDone();
|
||||||
|
|
||||||
|
value = options.write_buffer_manager->memory_usage();
|
||||||
|
ASSERT_EQ(base_value, value);
|
||||||
|
}
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
||||||
|
@ -239,9 +239,13 @@ void ForwardIterator::SVCleanup(DBImpl* db, SuperVersion* sv,
|
|||||||
db->FindObsoleteFiles(&job_context, false, true);
|
db->FindObsoleteFiles(&job_context, false, true);
|
||||||
if (background_purge_on_iterator_cleanup) {
|
if (background_purge_on_iterator_cleanup) {
|
||||||
db->ScheduleBgLogWriterClose(&job_context);
|
db->ScheduleBgLogWriterClose(&job_context);
|
||||||
|
db->AddSuperVersionsToFreeQueue(sv);
|
||||||
|
db->SchedulePurge();
|
||||||
}
|
}
|
||||||
db->mutex_.Unlock();
|
db->mutex_.Unlock();
|
||||||
|
if (!background_purge_on_iterator_cleanup) {
|
||||||
delete sv;
|
delete sv;
|
||||||
|
}
|
||||||
if (job_context.HaveSomethingToDelete()) {
|
if (job_context.HaveSomethingToDelete()) {
|
||||||
db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
|
db->PurgeObsoleteFiles(job_context, background_purge_on_iterator_cleanup);
|
||||||
}
|
}
|
||||||
@ -614,7 +618,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
|
|||||||
Cleanup(refresh_sv);
|
Cleanup(refresh_sv);
|
||||||
if (refresh_sv) {
|
if (refresh_sv) {
|
||||||
// New
|
// New
|
||||||
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
|
sv_ = cfd_->GetReferencedSuperVersion(db_);
|
||||||
}
|
}
|
||||||
ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
|
ReadRangeDelAggregator range_del_agg(&cfd_->internal_comparator(),
|
||||||
kMaxSequenceNumber /* upper_bound */);
|
kMaxSequenceNumber /* upper_bound */);
|
||||||
@ -668,7 +672,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
|
|||||||
void ForwardIterator::RenewIterators() {
|
void ForwardIterator::RenewIterators() {
|
||||||
SuperVersion* svnew;
|
SuperVersion* svnew;
|
||||||
assert(sv_);
|
assert(sv_);
|
||||||
svnew = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
|
svnew = cfd_->GetReferencedSuperVersion(db_);
|
||||||
|
|
||||||
if (mutable_iter_ != nullptr) {
|
if (mutable_iter_ != nullptr) {
|
||||||
DeleteIterator(mutable_iter_, true /* is_arena */);
|
DeleteIterator(mutable_iter_, true /* is_arena */);
|
||||||
|
@ -1079,10 +1079,10 @@ struct DBOptions {
|
|||||||
// independently if the process crashes later and tries to recover.
|
// independently if the process crashes later and tries to recover.
|
||||||
bool atomic_flush = false;
|
bool atomic_flush = false;
|
||||||
|
|
||||||
// If true, ColumnFamilyHandle's and Iterator's destructors won't delete
|
// If true, working thread may avoid doing unnecessary and long-latency
|
||||||
// obsolete files directly and will instead schedule a background job
|
// operation (such as deleting obsolete files directly or deleting memtable)
|
||||||
// to do it. Use it if you're destroying iterators or ColumnFamilyHandle-s
|
// and will instead schedule a background job to do it.
|
||||||
// from latency-sensitive threads.
|
// Use it if you're latency-sensitive.
|
||||||
// If set to true, takes precedence over
|
// If set to true, takes precedence over
|
||||||
// ReadOptions::background_purge_on_iterator_cleanup.
|
// ReadOptions::background_purge_on_iterator_cleanup.
|
||||||
bool avoid_unnecessary_blocking_io = false;
|
bool avoid_unnecessary_blocking_io = false;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user