6b3c71f6ed
Summary: Access of `cfd->current()` needs to hold db mutex. The data race is caught by TSAN but hard to reproduce: https://gist.github.com/yiwu-arbug/0fc6dc0de915297a1740aa9610be9373 Closes https://github.com/facebook/rocksdb/pull/2894 Differential Revision: D5843884 Pulled By: yiwu-arbug fbshipit-source-id: 0a30a421bc96f51840821538ad6453dc0815a942
1915 lines
70 KiB
C++
1915 lines
70 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#include "db/db_impl.h"
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
#include <inttypes.h>
|
|
|
|
#include "db/builder.h"
|
|
#include "db/event_helpers.h"
|
|
#include "monitoring/iostats_context_imp.h"
|
|
#include "monitoring/perf_context_imp.h"
|
|
#include "monitoring/thread_status_updater.h"
|
|
#include "monitoring/thread_status_util.h"
|
|
#include "util/sst_file_manager_impl.h"
|
|
#include "util/sync_point.h"
|
|
|
|
namespace rocksdb {
|
|
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
|
|
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
|
|
mutex_.AssertHeld();
|
|
autovector<log::Writer*, 1> logs_to_sync;
|
|
uint64_t current_log_number = logfile_number_;
|
|
while (logs_.front().number < current_log_number &&
|
|
logs_.front().getting_synced) {
|
|
log_sync_cv_.Wait();
|
|
}
|
|
for (auto it = logs_.begin();
|
|
it != logs_.end() && it->number < current_log_number; ++it) {
|
|
auto& log = *it;
|
|
assert(!log.getting_synced);
|
|
log.getting_synced = true;
|
|
logs_to_sync.push_back(log.writer);
|
|
}
|
|
|
|
Status s;
|
|
if (!logs_to_sync.empty()) {
|
|
mutex_.Unlock();
|
|
|
|
for (log::Writer* log : logs_to_sync) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
|
|
log->get_log_number());
|
|
s = log->file()->Sync(immutable_db_options_.use_fsync);
|
|
}
|
|
if (s.ok()) {
|
|
s = directories_.GetWalDir()->Fsync();
|
|
}
|
|
|
|
mutex_.Lock();
|
|
|
|
// "number <= current_log_number - 1" is equivalent to
|
|
// "number < current_log_number".
|
|
MarkLogsSynced(current_log_number - 1, true, s);
|
|
if (!s.ok()) {
|
|
Status new_bg_error = s;
|
|
// may temporarily unlock and lock the mutex.
|
|
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
|
|
BackgroundErrorReason::kFlush,
|
|
&new_bg_error, &mutex_);
|
|
if (!new_bg_error.ok()) {
|
|
bg_error_ = new_bg_error;
|
|
}
|
|
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
|
|
return s;
|
|
}
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::FlushMemTableToOutputFile(
|
|
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
|
|
bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
|
|
mutex_.AssertHeld();
|
|
assert(cfd->imm()->NumNotFlushed() != 0);
|
|
assert(cfd->imm()->IsFlushPending());
|
|
|
|
SequenceNumber earliest_write_conflict_snapshot;
|
|
std::vector<SequenceNumber> snapshot_seqs =
|
|
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
|
|
|
FlushJob flush_job(
|
|
dbname_, cfd, immutable_db_options_, mutable_cf_options, env_options_,
|
|
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
|
|
earliest_write_conflict_snapshot, job_context, log_buffer,
|
|
directories_.GetDbDir(), directories_.GetDataDir(0U),
|
|
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
|
|
&event_logger_, mutable_cf_options.report_bg_io_stats);
|
|
|
|
FileMetaData file_meta;
|
|
|
|
flush_job.PickMemTable();
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
// may temporarily unlock and lock the mutex.
|
|
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
|
|
flush_job.GetTableProperties());
|
|
#endif // ROCKSDB_LITE
|
|
|
|
Status s;
|
|
if (logfile_number_ > 0 &&
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 0) {
|
|
// If there are more than one column families, we need to make sure that
|
|
// all the log files except the most recent one are synced. Otherwise if
|
|
// the host crashes after flushing and before WAL is persistent, the
|
|
// flushed SST may contain data from write batches whose updates to
|
|
// other column families are missing.
|
|
// SyncClosedLogs() may unlock and re-lock the db_mutex.
|
|
s = SyncClosedLogs(job_context);
|
|
}
|
|
|
|
// Within flush_job.Run, rocksdb may call event listener to notify
|
|
// file creation and deletion.
|
|
//
|
|
// Note that flush_job.Run will unlock and lock the db_mutex,
|
|
// and EventListener callback will be called when the db_mutex
|
|
// is unlocked by the current thread.
|
|
if (s.ok()) {
|
|
s = flush_job.Run(&file_meta);
|
|
} else {
|
|
flush_job.Cancel();
|
|
}
|
|
|
|
if (s.ok()) {
|
|
InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
|
|
mutable_cf_options);
|
|
if (made_progress) {
|
|
*made_progress = 1;
|
|
}
|
|
VersionStorageInfo::LevelSummaryStorage tmp;
|
|
ROCKS_LOG_BUFFER(log_buffer, "[%s] Level summary: %s\n",
|
|
cfd->GetName().c_str(),
|
|
cfd->current()->storage_info()->LevelSummary(&tmp));
|
|
}
|
|
|
|
if (!s.ok() && !s.IsShutdownInProgress() &&
|
|
immutable_db_options_.paranoid_checks && bg_error_.ok()) {
|
|
Status new_bg_error = s;
|
|
// may temporarily unlock and lock the mutex.
|
|
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
|
|
BackgroundErrorReason::kFlush,
|
|
&new_bg_error, &mutex_);
|
|
if (!new_bg_error.ok()) {
|
|
// if a bad error happened (not ShutdownInProgress), paranoid_checks is
|
|
// true, and the error isn't handled by callback, mark DB read-only
|
|
bg_error_ = new_bg_error;
|
|
}
|
|
}
|
|
if (s.ok()) {
|
|
#ifndef ROCKSDB_LITE
|
|
// may temporarily unlock and lock the mutex.
|
|
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
|
|
job_context->job_id, flush_job.GetTableProperties());
|
|
auto sfm = static_cast<SstFileManagerImpl*>(
|
|
immutable_db_options_.sst_file_manager.get());
|
|
if (sfm) {
|
|
// Notify sst_file_manager that a new file was added
|
|
std::string file_path = MakeTableFileName(
|
|
immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber());
|
|
sfm->OnAddFile(file_path);
|
|
if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
|
|
Status new_bg_error = Status::IOError("Max allowed space was reached");
|
|
TEST_SYNC_POINT_CALLBACK(
|
|
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
|
|
&new_bg_error);
|
|
// may temporarily unlock and lock the mutex.
|
|
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
|
|
BackgroundErrorReason::kFlush,
|
|
&new_bg_error, &mutex_);
|
|
if (!new_bg_error.ok()) {
|
|
bg_error_ = new_bg_error;
|
|
}
|
|
}
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
return s;
|
|
}
|
|
|
|
void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
|
|
const MutableCFOptions& mutable_cf_options,
|
|
int job_id, TableProperties prop) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (immutable_db_options_.listeners.size() == 0U) {
|
|
return;
|
|
}
|
|
mutex_.AssertHeld();
|
|
if (shutting_down_.load(std::memory_order_acquire)) {
|
|
return;
|
|
}
|
|
bool triggered_writes_slowdown =
|
|
(cfd->current()->storage_info()->NumLevelFiles(0) >=
|
|
mutable_cf_options.level0_slowdown_writes_trigger);
|
|
bool triggered_writes_stop =
|
|
(cfd->current()->storage_info()->NumLevelFiles(0) >=
|
|
mutable_cf_options.level0_stop_writes_trigger);
|
|
// release lock while notifying events
|
|
mutex_.Unlock();
|
|
{
|
|
FlushJobInfo info;
|
|
info.cf_name = cfd->GetName();
|
|
// TODO(yhchiang): make db_paths dynamic in case flush does not
|
|
// go to L0 in the future.
|
|
info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
|
|
file_meta->fd.GetNumber());
|
|
info.thread_id = env_->GetThreadID();
|
|
info.job_id = job_id;
|
|
info.triggered_writes_slowdown = triggered_writes_slowdown;
|
|
info.triggered_writes_stop = triggered_writes_stop;
|
|
info.smallest_seqno = file_meta->smallest_seqno;
|
|
info.largest_seqno = file_meta->largest_seqno;
|
|
info.table_properties = prop;
|
|
for (auto listener : immutable_db_options_.listeners) {
|
|
listener->OnFlushBegin(this, info);
|
|
}
|
|
}
|
|
mutex_.Lock();
|
|
// no need to signal bg_cv_ as it will be signaled at the end of the
|
|
// flush process.
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
|
|
FileMetaData* file_meta,
|
|
const MutableCFOptions& mutable_cf_options,
|
|
int job_id, TableProperties prop) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (immutable_db_options_.listeners.size() == 0U) {
|
|
return;
|
|
}
|
|
mutex_.AssertHeld();
|
|
if (shutting_down_.load(std::memory_order_acquire)) {
|
|
return;
|
|
}
|
|
bool triggered_writes_slowdown =
|
|
(cfd->current()->storage_info()->NumLevelFiles(0) >=
|
|
mutable_cf_options.level0_slowdown_writes_trigger);
|
|
bool triggered_writes_stop =
|
|
(cfd->current()->storage_info()->NumLevelFiles(0) >=
|
|
mutable_cf_options.level0_stop_writes_trigger);
|
|
// release lock while notifying events
|
|
mutex_.Unlock();
|
|
{
|
|
FlushJobInfo info;
|
|
info.cf_name = cfd->GetName();
|
|
// TODO(yhchiang): make db_paths dynamic in case flush does not
|
|
// go to L0 in the future.
|
|
info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
|
|
file_meta->fd.GetNumber());
|
|
info.thread_id = env_->GetThreadID();
|
|
info.job_id = job_id;
|
|
info.triggered_writes_slowdown = triggered_writes_slowdown;
|
|
info.triggered_writes_stop = triggered_writes_stop;
|
|
info.smallest_seqno = file_meta->smallest_seqno;
|
|
info.largest_seqno = file_meta->largest_seqno;
|
|
info.table_properties = prop;
|
|
for (auto listener : immutable_db_options_.listeners) {
|
|
listener->OnFlushCompleted(this, info);
|
|
}
|
|
}
|
|
mutex_.Lock();
|
|
// no need to signal bg_cv_ as it will be signaled at the end of the
|
|
// flush process.
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
Status DBImpl::CompactRange(const CompactRangeOptions& options,
|
|
ColumnFamilyHandle* column_family,
|
|
const Slice* begin, const Slice* end) {
|
|
if (options.target_path_id >= immutable_db_options_.db_paths.size()) {
|
|
return Status::InvalidArgument("Invalid target path ID");
|
|
}
|
|
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
auto cfd = cfh->cfd();
|
|
bool exclusive = options.exclusive_manual_compaction;
|
|
|
|
Status s = FlushMemTable(cfd, FlushOptions());
|
|
if (!s.ok()) {
|
|
LogFlush(immutable_db_options_.info_log);
|
|
return s;
|
|
}
|
|
|
|
int max_level_with_files = 0;
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
Version* base = cfd->current();
|
|
for (int level = 1; level < base->storage_info()->num_non_empty_levels();
|
|
level++) {
|
|
if (base->storage_info()->OverlapInLevel(level, begin, end)) {
|
|
max_level_with_files = level;
|
|
}
|
|
}
|
|
}
|
|
|
|
int final_output_level = 0;
|
|
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal &&
|
|
cfd->NumberLevels() > 1) {
|
|
// Always compact all files together.
|
|
final_output_level = cfd->NumberLevels() - 1;
|
|
// if bottom most level is reserved
|
|
if (immutable_db_options_.allow_ingest_behind) {
|
|
final_output_level--;
|
|
}
|
|
s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels,
|
|
final_output_level, options.target_path_id,
|
|
begin, end, exclusive);
|
|
} else {
|
|
for (int level = 0; level <= max_level_with_files; level++) {
|
|
int output_level;
|
|
// in case the compaction is universal or if we're compacting the
|
|
// bottom-most level, the output level will be the same as input one.
|
|
// level 0 can never be the bottommost level (i.e. if all files are in
|
|
// level 0, we will compact to level 1)
|
|
if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
|
|
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
|
|
output_level = level;
|
|
} else if (level == max_level_with_files && level > 0) {
|
|
if (options.bottommost_level_compaction ==
|
|
BottommostLevelCompaction::kSkip) {
|
|
// Skip bottommost level compaction
|
|
continue;
|
|
} else if (options.bottommost_level_compaction ==
|
|
BottommostLevelCompaction::kIfHaveCompactionFilter &&
|
|
cfd->ioptions()->compaction_filter == nullptr &&
|
|
cfd->ioptions()->compaction_filter_factory == nullptr) {
|
|
// Skip bottommost level compaction since we don't have a compaction
|
|
// filter
|
|
continue;
|
|
}
|
|
output_level = level;
|
|
} else {
|
|
output_level = level + 1;
|
|
if (cfd->ioptions()->compaction_style == kCompactionStyleLevel &&
|
|
cfd->ioptions()->level_compaction_dynamic_level_bytes &&
|
|
level == 0) {
|
|
output_level = ColumnFamilyData::kCompactToBaseLevel;
|
|
}
|
|
}
|
|
s = RunManualCompaction(cfd, level, output_level, options.target_path_id,
|
|
begin, end, exclusive);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
|
|
final_output_level = cfd->NumberLevels() - 1;
|
|
} else if (output_level > final_output_level) {
|
|
final_output_level = output_level;
|
|
}
|
|
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::1");
|
|
TEST_SYNC_POINT("DBImpl::RunManualCompaction()::2");
|
|
}
|
|
}
|
|
if (!s.ok()) {
|
|
LogFlush(immutable_db_options_.info_log);
|
|
return s;
|
|
}
|
|
|
|
if (options.change_level) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"[RefitLevel] waiting for background threads to stop");
|
|
s = PauseBackgroundWork();
|
|
if (s.ok()) {
|
|
s = ReFitLevel(cfd, final_output_level, options.target_level);
|
|
}
|
|
ContinueBackgroundWork();
|
|
}
|
|
LogFlush(immutable_db_options_.info_log);
|
|
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
// an automatic compaction that has been scheduled might have been
|
|
// preempted by the manual compactions. Need to schedule it back.
|
|
MaybeScheduleFlushOrCompaction();
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::CompactFiles(
|
|
const CompactionOptions& compact_options,
|
|
ColumnFamilyHandle* column_family,
|
|
const std::vector<std::string>& input_file_names,
|
|
const int output_level, const int output_path_id) {
|
|
#ifdef ROCKSDB_LITE
|
|
// not supported in lite version
|
|
return Status::NotSupported("Not supported in ROCKSDB LITE");
|
|
#else
|
|
if (column_family == nullptr) {
|
|
return Status::InvalidArgument("ColumnFamilyHandle must be non-null.");
|
|
}
|
|
|
|
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family)->cfd();
|
|
assert(cfd);
|
|
|
|
Status s;
|
|
JobContext job_context(0, true);
|
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
|
|
immutable_db_options_.info_log.get());
|
|
|
|
// Perform CompactFiles
|
|
SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_);
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
// This call will unlock/lock the mutex to wait for current running
|
|
// IngestExternalFile() calls to finish.
|
|
WaitForIngestFile();
|
|
|
|
s = CompactFilesImpl(compact_options, cfd, sv->current,
|
|
input_file_names, output_level,
|
|
output_path_id, &job_context, &log_buffer);
|
|
}
|
|
if (sv->Unref()) {
|
|
mutex_.Lock();
|
|
sv->Cleanup();
|
|
mutex_.Unlock();
|
|
delete sv;
|
|
}
|
|
|
|
// Find and delete obsolete files
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
// If !s.ok(), this means that Compaction failed. In that case, we want
|
|
// to delete all obsolete files we might have created and we force
|
|
// FindObsoleteFiles(). This is because job_context does not
|
|
// catch all created files if compaction failed.
|
|
FindObsoleteFiles(&job_context, !s.ok());
|
|
} // release the mutex
|
|
|
|
// delete unnecessary files if any, this is done outside the mutex
|
|
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
|
|
// Have to flush the info logs before bg_compaction_scheduled_--
|
|
// because if bg_flush_scheduled_ becomes 0 and the lock is
|
|
// released, the deconstructor of DB can kick in and destroy all the
|
|
// states of DB so info_log might not be available after that point.
|
|
// It also applies to access other states that DB owns.
|
|
log_buffer.FlushBufferToLog();
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
// no mutex is locked here. No need to Unlock() and Lock() here.
|
|
PurgeObsoleteFiles(job_context);
|
|
}
|
|
job_context.Clean();
|
|
}
|
|
|
|
return s;
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
Status DBImpl::CompactFilesImpl(
|
|
const CompactionOptions& compact_options, ColumnFamilyData* cfd,
|
|
Version* version, const std::vector<std::string>& input_file_names,
|
|
const int output_level, int output_path_id, JobContext* job_context,
|
|
LogBuffer* log_buffer) {
|
|
mutex_.AssertHeld();
|
|
|
|
if (shutting_down_.load(std::memory_order_acquire)) {
|
|
return Status::ShutdownInProgress();
|
|
}
|
|
|
|
std::unordered_set<uint64_t> input_set;
|
|
for (auto file_name : input_file_names) {
|
|
input_set.insert(TableFileNameToNumber(file_name));
|
|
}
|
|
|
|
ColumnFamilyMetaData cf_meta;
|
|
// TODO(yhchiang): can directly use version here if none of the
|
|
// following functions call is pluggable to external developers.
|
|
version->GetColumnFamilyMetaData(&cf_meta);
|
|
|
|
if (output_path_id < 0) {
|
|
if (immutable_db_options_.db_paths.size() == 1U) {
|
|
output_path_id = 0;
|
|
} else {
|
|
return Status::NotSupported(
|
|
"Automatic output path selection is not "
|
|
"yet supported in CompactFiles()");
|
|
}
|
|
}
|
|
|
|
Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
|
|
&input_set, cf_meta, output_level);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
std::vector<CompactionInputFiles> input_files;
|
|
s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers(
|
|
&input_files, &input_set, version->storage_info(), compact_options);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
for (auto inputs : input_files) {
|
|
if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) {
|
|
return Status::Aborted(
|
|
"Some of the necessary compaction input "
|
|
"files are already being compacted");
|
|
}
|
|
}
|
|
|
|
// At this point, CompactFiles will be run.
|
|
bg_compaction_scheduled_++;
|
|
|
|
unique_ptr<Compaction> c;
|
|
assert(cfd->compaction_picker());
|
|
c.reset(cfd->compaction_picker()->CompactFiles(
|
|
compact_options, input_files, output_level, version->storage_info(),
|
|
*cfd->GetLatestMutableCFOptions(), output_path_id));
|
|
// we already sanitized the set of input files and checked for conflicts
|
|
// without releasing the lock, so we're guaranteed a compaction can be formed.
|
|
assert(c != nullptr);
|
|
|
|
c->SetInputVersion(version);
|
|
// deletion compaction currently not allowed in CompactFiles.
|
|
assert(!c->deletion_compaction());
|
|
|
|
SequenceNumber earliest_write_conflict_snapshot;
|
|
std::vector<SequenceNumber> snapshot_seqs =
|
|
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
|
|
|
auto pending_outputs_inserted_elem =
|
|
CaptureCurrentFileNumberInPendingOutputs();
|
|
|
|
assert(is_snapshot_supported_ || snapshots_.empty());
|
|
CompactionJob compaction_job(
|
|
job_context->job_id, c.get(), immutable_db_options_, env_options_,
|
|
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
|
|
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
|
|
snapshot_seqs, earliest_write_conflict_snapshot, table_cache_,
|
|
&event_logger_, c->mutable_cf_options()->paranoid_file_checks,
|
|
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
|
nullptr); // Here we pass a nullptr for CompactionJobStats because
|
|
// CompactFiles does not trigger OnCompactionCompleted(),
|
|
// which is the only place where CompactionJobStats is
|
|
// returned. The idea of not triggering OnCompationCompleted()
|
|
// is that CompactFiles runs in the caller thread, so the user
|
|
// should always know when it completes. As a result, it makes
|
|
// less sense to notify the users something they should already
|
|
// know.
|
|
//
|
|
// In the future, if we would like to add CompactionJobStats
|
|
// support for CompactFiles, we should have CompactFiles API
|
|
// pass a pointer of CompactionJobStats as the out-value
|
|
// instead of using EventListener.
|
|
|
|
// Creating a compaction influences the compaction score because the score
|
|
// takes running compactions into account (by skipping files that are already
|
|
// being compacted). Since we just changed compaction score, we recalculate it
|
|
// here.
|
|
version->storage_info()->ComputeCompactionScore(*cfd->ioptions(),
|
|
*c->mutable_cf_options());
|
|
|
|
compaction_job.Prepare();
|
|
|
|
mutex_.Unlock();
|
|
TEST_SYNC_POINT("CompactFilesImpl:0");
|
|
TEST_SYNC_POINT("CompactFilesImpl:1");
|
|
compaction_job.Run();
|
|
TEST_SYNC_POINT("CompactFilesImpl:2");
|
|
TEST_SYNC_POINT("CompactFilesImpl:3");
|
|
mutex_.Lock();
|
|
|
|
Status status = compaction_job.Install(*c->mutable_cf_options());
|
|
if (status.ok()) {
|
|
InstallSuperVersionAndScheduleWorkWrapper(
|
|
c->column_family_data(), job_context, *c->mutable_cf_options());
|
|
}
|
|
c->ReleaseCompactionFiles(s);
|
|
|
|
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
|
|
|
if (status.ok()) {
|
|
// Done
|
|
} else if (status.IsShutdownInProgress()) {
|
|
// Ignore compaction errors found during shutting down
|
|
} else {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log,
|
|
"[%s] [JOB %d] Compaction error: %s",
|
|
c->column_family_data()->GetName().c_str(),
|
|
job_context->job_id, status.ToString().c_str());
|
|
if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
|
|
Status new_bg_error = status;
|
|
// may temporarily unlock and lock the mutex.
|
|
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
|
|
BackgroundErrorReason::kCompaction,
|
|
&new_bg_error, &mutex_);
|
|
if (!new_bg_error.ok()) {
|
|
bg_error_ = new_bg_error;
|
|
}
|
|
}
|
|
}
|
|
|
|
c.reset();
|
|
|
|
bg_compaction_scheduled_--;
|
|
if (bg_compaction_scheduled_ == 0) {
|
|
bg_cv_.SignalAll();
|
|
}
|
|
|
|
return status;
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
Status DBImpl::PauseBackgroundWork() {
|
|
InstrumentedMutexLock guard_lock(&mutex_);
|
|
bg_compaction_paused_++;
|
|
while (bg_bottom_compaction_scheduled_ > 0 || bg_compaction_scheduled_ > 0 ||
|
|
bg_flush_scheduled_ > 0) {
|
|
bg_cv_.Wait();
|
|
}
|
|
bg_work_paused_++;
|
|
return Status::OK();
|
|
}
|
|
|
|
Status DBImpl::ContinueBackgroundWork() {
|
|
InstrumentedMutexLock guard_lock(&mutex_);
|
|
if (bg_work_paused_ == 0) {
|
|
return Status::InvalidArgument();
|
|
}
|
|
assert(bg_work_paused_ > 0);
|
|
assert(bg_compaction_paused_ > 0);
|
|
bg_compaction_paused_--;
|
|
bg_work_paused_--;
|
|
// It's sufficient to check just bg_work_paused_ here since
|
|
// bg_work_paused_ is always no greater than bg_compaction_paused_
|
|
if (bg_work_paused_ == 0) {
|
|
MaybeScheduleFlushOrCompaction();
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void DBImpl::NotifyOnCompactionCompleted(
|
|
ColumnFamilyData* cfd, Compaction *c, const Status &st,
|
|
const CompactionJobStats& compaction_job_stats,
|
|
const int job_id) {
|
|
#ifndef ROCKSDB_LITE
|
|
if (immutable_db_options_.listeners.size() == 0U) {
|
|
return;
|
|
}
|
|
mutex_.AssertHeld();
|
|
if (shutting_down_.load(std::memory_order_acquire)) {
|
|
return;
|
|
}
|
|
Version* current = cfd->current();
|
|
current->Ref();
|
|
// release lock while notifying events
|
|
mutex_.Unlock();
|
|
TEST_SYNC_POINT("DBImpl::NotifyOnCompactionCompleted::UnlockMutex");
|
|
{
|
|
CompactionJobInfo info;
|
|
info.cf_name = cfd->GetName();
|
|
info.status = st;
|
|
info.thread_id = env_->GetThreadID();
|
|
info.job_id = job_id;
|
|
info.base_input_level = c->start_level();
|
|
info.output_level = c->output_level();
|
|
info.stats = compaction_job_stats;
|
|
info.table_properties = c->GetOutputTableProperties();
|
|
info.compaction_reason = c->compaction_reason();
|
|
info.compression = c->output_compression();
|
|
for (size_t i = 0; i < c->num_input_levels(); ++i) {
|
|
for (const auto fmd : *c->inputs(i)) {
|
|
auto fn = TableFileName(immutable_db_options_.db_paths,
|
|
fmd->fd.GetNumber(), fmd->fd.GetPathId());
|
|
info.input_files.push_back(fn);
|
|
if (info.table_properties.count(fn) == 0) {
|
|
std::shared_ptr<const TableProperties> tp;
|
|
auto s = current->GetTableProperties(&tp, fmd, &fn);
|
|
if (s.ok()) {
|
|
info.table_properties[fn] = tp;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
for (const auto newf : c->edit()->GetNewFiles()) {
|
|
info.output_files.push_back(TableFileName(immutable_db_options_.db_paths,
|
|
newf.second.fd.GetNumber(),
|
|
newf.second.fd.GetPathId()));
|
|
}
|
|
for (auto listener : immutable_db_options_.listeners) {
|
|
listener->OnCompactionCompleted(this, info);
|
|
}
|
|
}
|
|
mutex_.Lock();
|
|
current->Unref();
|
|
// no need to signal bg_cv_ as it will be signaled at the end of the
|
|
// flush process.
|
|
#endif // ROCKSDB_LITE
|
|
}
|
|
|
|
// REQUIREMENT: block all background work by calling PauseBackgroundWork()
|
|
// before calling this function
|
|
Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
|
|
assert(level < cfd->NumberLevels());
|
|
if (target_level >= cfd->NumberLevels()) {
|
|
return Status::InvalidArgument("Target level exceeds number of levels");
|
|
}
|
|
|
|
std::unique_ptr<SuperVersion> superversion_to_free;
|
|
std::unique_ptr<SuperVersion> new_superversion(new SuperVersion());
|
|
|
|
Status status;
|
|
|
|
InstrumentedMutexLock guard_lock(&mutex_);
|
|
|
|
// only allow one thread refitting
|
|
if (refitting_level_) {
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"[ReFitLevel] another thread is refitting");
|
|
return Status::NotSupported("another thread is refitting");
|
|
}
|
|
refitting_level_ = true;
|
|
|
|
const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
|
|
// move to a smaller level
|
|
int to_level = target_level;
|
|
if (target_level < 0) {
|
|
to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
|
|
}
|
|
|
|
auto* vstorage = cfd->current()->storage_info();
|
|
if (to_level > level) {
|
|
if (level == 0) {
|
|
return Status::NotSupported(
|
|
"Cannot change from level 0 to other levels.");
|
|
}
|
|
// Check levels are empty for a trivial move
|
|
for (int l = level + 1; l <= to_level; l++) {
|
|
if (vstorage->NumLevelFiles(l) > 0) {
|
|
return Status::NotSupported(
|
|
"Levels between source and target are not empty for a move.");
|
|
}
|
|
}
|
|
}
|
|
if (to_level != level) {
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
|
"[%s] Before refitting:\n%s", cfd->GetName().c_str(),
|
|
cfd->current()->DebugString().data());
|
|
|
|
VersionEdit edit;
|
|
edit.SetColumnFamily(cfd->GetID());
|
|
for (const auto& f : vstorage->LevelFiles(level)) {
|
|
edit.DeleteFile(level, f->fd.GetNumber());
|
|
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
|
f->fd.GetFileSize(), f->smallest, f->largest,
|
|
f->smallest_seqno, f->largest_seqno,
|
|
f->marked_for_compaction);
|
|
}
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
|
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
|
|
edit.DebugString().data());
|
|
|
|
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
|
|
directories_.GetDbDir());
|
|
superversion_to_free.reset(InstallSuperVersionAndScheduleWork(
|
|
cfd, new_superversion.release(), mutable_cf_options));
|
|
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] LogAndApply: %s\n",
|
|
cfd->GetName().c_str(), status.ToString().data());
|
|
|
|
if (status.ok()) {
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
|
"[%s] After refitting:\n%s", cfd->GetName().c_str(),
|
|
cfd->current()->DebugString().data());
|
|
}
|
|
}
|
|
|
|
refitting_level_ = false;
|
|
|
|
return status;
|
|
}
|
|
|
|
int DBImpl::NumberLevels(ColumnFamilyHandle* column_family) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
return cfh->cfd()->NumberLevels();
|
|
}
|
|
|
|
int DBImpl::MaxMemCompactionLevel(ColumnFamilyHandle* column_family) {
|
|
return 0;
|
|
}
|
|
|
|
int DBImpl::Level0StopWriteTrigger(ColumnFamilyHandle* column_family) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
InstrumentedMutexLock l(&mutex_);
|
|
return cfh->cfd()->GetSuperVersion()->
|
|
mutable_cf_options.level0_stop_writes_trigger;
|
|
}
|
|
|
|
Status DBImpl::Flush(const FlushOptions& flush_options,
|
|
ColumnFamilyHandle* column_family) {
|
|
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
|
return FlushMemTable(cfh->cfd(), flush_options);
|
|
}
|
|
|
|
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
|
|
int output_level, uint32_t output_path_id,
|
|
const Slice* begin, const Slice* end,
|
|
bool exclusive, bool disallow_trivial_move) {
|
|
assert(input_level == ColumnFamilyData::kCompactAllLevels ||
|
|
input_level >= 0);
|
|
|
|
InternalKey begin_storage, end_storage;
|
|
CompactionArg* ca;
|
|
|
|
bool scheduled = false;
|
|
bool manual_conflict = false;
|
|
ManualCompactionState manual;
|
|
manual.cfd = cfd;
|
|
manual.input_level = input_level;
|
|
manual.output_level = output_level;
|
|
manual.output_path_id = output_path_id;
|
|
manual.done = false;
|
|
manual.in_progress = false;
|
|
manual.incomplete = false;
|
|
manual.exclusive = exclusive;
|
|
manual.disallow_trivial_move = disallow_trivial_move;
|
|
// For universal compaction, we enforce every manual compaction to compact
|
|
// all files.
|
|
if (begin == nullptr ||
|
|
cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
|
|
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
|
|
manual.begin = nullptr;
|
|
} else {
|
|
begin_storage.SetMinPossibleForUserKey(*begin);
|
|
manual.begin = &begin_storage;
|
|
}
|
|
if (end == nullptr ||
|
|
cfd->ioptions()->compaction_style == kCompactionStyleUniversal ||
|
|
cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
|
|
manual.end = nullptr;
|
|
} else {
|
|
end_storage.SetMaxPossibleForUserKey(*end);
|
|
manual.end = &end_storage;
|
|
}
|
|
|
|
TEST_SYNC_POINT("DBImpl::RunManualCompaction:0");
|
|
TEST_SYNC_POINT("DBImpl::RunManualCompaction:1");
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
// When a manual compaction arrives, temporarily disable scheduling of
|
|
// non-manual compactions and wait until the number of scheduled compaction
|
|
// jobs drops to zero. This is needed to ensure that this manual compaction
|
|
// can compact any range of keys/files.
|
|
//
|
|
// HasPendingManualCompaction() is true when at least one thread is inside
|
|
// RunManualCompaction(), i.e. during that time no other compaction will
|
|
// get scheduled (see MaybeScheduleFlushOrCompaction).
|
|
//
|
|
// Note that the following loop doesn't stop more that one thread calling
|
|
// RunManualCompaction() from getting to the second while loop below.
|
|
// However, only one of them will actually schedule compaction, while
|
|
// others will wait on a condition variable until it completes.
|
|
|
|
AddManualCompaction(&manual);
|
|
TEST_SYNC_POINT_CALLBACK("DBImpl::RunManualCompaction:NotScheduled", &mutex_);
|
|
if (exclusive) {
|
|
while (bg_bottom_compaction_scheduled_ > 0 ||
|
|
bg_compaction_scheduled_ > 0) {
|
|
TEST_SYNC_POINT("DBImpl::RunManualCompaction:WaitScheduled");
|
|
ROCKS_LOG_INFO(
|
|
immutable_db_options_.info_log,
|
|
"[%s] Manual compaction waiting for all other scheduled background "
|
|
"compactions to finish",
|
|
cfd->GetName().c_str());
|
|
bg_cv_.Wait();
|
|
}
|
|
}
|
|
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"[%s] Manual compaction starting", cfd->GetName().c_str());
|
|
|
|
// We don't check bg_error_ here, because if we get the error in compaction,
|
|
// the compaction will set manual.status to bg_error_ and set manual.done to
|
|
// true.
|
|
while (!manual.done) {
|
|
assert(HasPendingManualCompaction());
|
|
manual_conflict = false;
|
|
Compaction* compaction;
|
|
if (ShouldntRunManualCompaction(&manual) || (manual.in_progress == true) ||
|
|
scheduled ||
|
|
((manual.manual_end = &manual.tmp_storage1) &&
|
|
((compaction = manual.cfd->CompactRange(
|
|
*manual.cfd->GetLatestMutableCFOptions(), manual.input_level,
|
|
manual.output_level, manual.output_path_id, manual.begin,
|
|
manual.end, &manual.manual_end, &manual_conflict)) == nullptr) &&
|
|
manual_conflict)) {
|
|
// exclusive manual compactions should not see a conflict during
|
|
// CompactRange
|
|
assert(!exclusive || !manual_conflict);
|
|
// Running either this or some other manual compaction
|
|
bg_cv_.Wait();
|
|
if (scheduled && manual.incomplete == true) {
|
|
assert(!manual.in_progress);
|
|
scheduled = false;
|
|
manual.incomplete = false;
|
|
}
|
|
} else if (!scheduled) {
|
|
if (compaction == nullptr) {
|
|
manual.done = true;
|
|
bg_cv_.SignalAll();
|
|
continue;
|
|
}
|
|
ca = new CompactionArg;
|
|
ca->db = this;
|
|
ca->prepicked_compaction = new PrepickedCompaction;
|
|
ca->prepicked_compaction->manual_compaction_state = &manual;
|
|
ca->prepicked_compaction->compaction = compaction;
|
|
manual.incomplete = false;
|
|
bg_compaction_scheduled_++;
|
|
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
|
&DBImpl::UnscheduleCallback);
|
|
scheduled = true;
|
|
}
|
|
}
|
|
|
|
assert(!manual.in_progress);
|
|
assert(HasPendingManualCompaction());
|
|
RemoveManualCompaction(&manual);
|
|
bg_cv_.SignalAll();
|
|
return manual.status;
|
|
}
|
|
|
|
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
|
|
const FlushOptions& flush_options,
|
|
bool writes_stopped) {
|
|
Status s;
|
|
{
|
|
WriteContext context;
|
|
InstrumentedMutexLock guard_lock(&mutex_);
|
|
|
|
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty()) {
|
|
// Nothing to flush
|
|
return Status::OK();
|
|
}
|
|
|
|
WriteThread::Writer w;
|
|
if (!writes_stopped) {
|
|
write_thread_.EnterUnbatched(&w, &mutex_);
|
|
}
|
|
|
|
// SwitchMemtable() will release and reacquire mutex
|
|
// during execution
|
|
s = SwitchMemtable(cfd, &context);
|
|
|
|
if (!writes_stopped) {
|
|
write_thread_.ExitUnbatched(&w);
|
|
}
|
|
|
|
cfd->imm()->FlushRequested();
|
|
|
|
// schedule flush
|
|
SchedulePendingFlush(cfd);
|
|
MaybeScheduleFlushOrCompaction();
|
|
}
|
|
|
|
if (s.ok() && flush_options.wait) {
|
|
// Wait until the compaction completes
|
|
s = WaitForFlushMemTable(cfd);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) {
|
|
Status s;
|
|
// Wait until the compaction completes
|
|
InstrumentedMutexLock l(&mutex_);
|
|
while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok()) {
|
|
if (shutting_down_.load(std::memory_order_acquire)) {
|
|
return Status::ShutdownInProgress();
|
|
}
|
|
if (cfd->IsDropped()) {
|
|
// FlushJob cannot flush a dropped CF, if we did not break here
|
|
// we will loop forever since cfd->imm()->NumNotFlushed() will never
|
|
// drop to zero
|
|
return Status::InvalidArgument("Cannot flush a dropped CF");
|
|
}
|
|
bg_cv_.Wait();
|
|
}
|
|
if (!bg_error_.ok()) {
|
|
s = bg_error_;
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::EnableAutoCompaction(
|
|
const std::vector<ColumnFamilyHandle*>& column_family_handles) {
|
|
Status s;
|
|
for (auto cf_ptr : column_family_handles) {
|
|
Status status =
|
|
this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}});
|
|
if (!status.ok()) {
|
|
s = status;
|
|
}
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
void DBImpl::MaybeScheduleFlushOrCompaction() {
|
|
mutex_.AssertHeld();
|
|
if (!opened_successfully_) {
|
|
// Compaction may introduce data race to DB open
|
|
return;
|
|
}
|
|
if (bg_work_paused_ > 0) {
|
|
// we paused the background work
|
|
return;
|
|
} else if (shutting_down_.load(std::memory_order_acquire)) {
|
|
// DB is being deleted; no more background compactions
|
|
return;
|
|
}
|
|
auto bg_job_limits = GetBGJobLimits();
|
|
bool is_flush_pool_empty =
|
|
env_->GetBackgroundThreads(Env::Priority::HIGH) == 0;
|
|
while (!is_flush_pool_empty && unscheduled_flushes_ > 0 &&
|
|
bg_flush_scheduled_ < bg_job_limits.max_flushes) {
|
|
unscheduled_flushes_--;
|
|
bg_flush_scheduled_++;
|
|
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH, this);
|
|
}
|
|
|
|
// special case -- if high-pri (flush) thread pool is empty, then schedule
|
|
// flushes in low-pri (compaction) thread pool.
|
|
if (is_flush_pool_empty) {
|
|
while (unscheduled_flushes_ > 0 &&
|
|
bg_flush_scheduled_ + bg_compaction_scheduled_ <
|
|
bg_job_limits.max_flushes) {
|
|
unscheduled_flushes_--;
|
|
bg_flush_scheduled_++;
|
|
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::LOW, this);
|
|
}
|
|
}
|
|
|
|
if (bg_compaction_paused_ > 0) {
|
|
// we paused the background compaction
|
|
return;
|
|
}
|
|
|
|
if (HasExclusiveManualCompaction()) {
|
|
// only manual compactions are allowed to run. don't schedule automatic
|
|
// compactions
|
|
return;
|
|
}
|
|
|
|
while (bg_compaction_scheduled_ < bg_job_limits.max_compactions &&
|
|
unscheduled_compactions_ > 0) {
|
|
CompactionArg* ca = new CompactionArg;
|
|
ca->db = this;
|
|
ca->prepicked_compaction = nullptr;
|
|
bg_compaction_scheduled_++;
|
|
unscheduled_compactions_--;
|
|
env_->Schedule(&DBImpl::BGWorkCompaction, ca, Env::Priority::LOW, this,
|
|
&DBImpl::UnscheduleCallback);
|
|
}
|
|
}
|
|
|
|
DBImpl::BGJobLimits DBImpl::GetBGJobLimits() const {
|
|
mutex_.AssertHeld();
|
|
return GetBGJobLimits(immutable_db_options_.max_background_flushes,
|
|
mutable_db_options_.max_background_compactions,
|
|
mutable_db_options_.max_background_jobs,
|
|
write_controller_.NeedSpeedupCompaction());
|
|
}
|
|
|
|
DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
|
|
int max_background_compactions,
|
|
int max_background_jobs,
|
|
bool parallelize_compactions) {
|
|
BGJobLimits res;
|
|
if (max_background_flushes == -1 && max_background_compactions == -1) {
|
|
// for our first stab implementing max_background_jobs, simply allocate a
|
|
// quarter of the threads to flushes.
|
|
res.max_flushes = std::max(1, max_background_jobs / 4);
|
|
res.max_compactions = std::max(1, max_background_jobs - res.max_flushes);
|
|
} else {
|
|
// compatibility code in case users haven't migrated to max_background_jobs,
|
|
// which automatically computes flush/compaction limits
|
|
res.max_flushes = std::max(1, max_background_flushes);
|
|
res.max_compactions = std::max(1, max_background_compactions);
|
|
}
|
|
if (!parallelize_compactions) {
|
|
// throttle background compactions until we deem necessary
|
|
res.max_compactions = 1;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
void DBImpl::AddToCompactionQueue(ColumnFamilyData* cfd) {
|
|
assert(!cfd->pending_compaction());
|
|
cfd->Ref();
|
|
compaction_queue_.push_back(cfd);
|
|
cfd->set_pending_compaction(true);
|
|
}
|
|
|
|
ColumnFamilyData* DBImpl::PopFirstFromCompactionQueue() {
|
|
assert(!compaction_queue_.empty());
|
|
auto cfd = *compaction_queue_.begin();
|
|
compaction_queue_.pop_front();
|
|
assert(cfd->pending_compaction());
|
|
cfd->set_pending_compaction(false);
|
|
return cfd;
|
|
}
|
|
|
|
void DBImpl::AddToFlushQueue(ColumnFamilyData* cfd) {
|
|
assert(!cfd->pending_flush());
|
|
cfd->Ref();
|
|
flush_queue_.push_back(cfd);
|
|
cfd->set_pending_flush(true);
|
|
}
|
|
|
|
ColumnFamilyData* DBImpl::PopFirstFromFlushQueue() {
|
|
assert(!flush_queue_.empty());
|
|
auto cfd = *flush_queue_.begin();
|
|
flush_queue_.pop_front();
|
|
assert(cfd->pending_flush());
|
|
cfd->set_pending_flush(false);
|
|
return cfd;
|
|
}
|
|
|
|
void DBImpl::SchedulePendingFlush(ColumnFamilyData* cfd) {
|
|
if (!cfd->pending_flush() && cfd->imm()->IsFlushPending()) {
|
|
AddToFlushQueue(cfd);
|
|
++unscheduled_flushes_;
|
|
}
|
|
}
|
|
|
|
void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
|
|
if (!cfd->pending_compaction() && cfd->NeedsCompaction()) {
|
|
AddToCompactionQueue(cfd);
|
|
++unscheduled_compactions_;
|
|
}
|
|
}
|
|
|
|
void DBImpl::SchedulePendingPurge(std::string fname, FileType type,
|
|
uint64_t number, uint32_t path_id,
|
|
int job_id) {
|
|
mutex_.AssertHeld();
|
|
PurgeFileInfo file_info(fname, type, number, path_id, job_id);
|
|
purge_queue_.push_back(std::move(file_info));
|
|
}
|
|
|
|
void DBImpl::BGWorkFlush(void* db) {
|
|
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
|
|
TEST_SYNC_POINT("DBImpl::BGWorkFlush");
|
|
reinterpret_cast<DBImpl*>(db)->BackgroundCallFlush();
|
|
TEST_SYNC_POINT("DBImpl::BGWorkFlush:done");
|
|
}
|
|
|
|
void DBImpl::BGWorkCompaction(void* arg) {
|
|
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
|
|
delete reinterpret_cast<CompactionArg*>(arg);
|
|
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::LOW);
|
|
TEST_SYNC_POINT("DBImpl::BGWorkCompaction");
|
|
auto prepicked_compaction =
|
|
static_cast<PrepickedCompaction*>(ca.prepicked_compaction);
|
|
reinterpret_cast<DBImpl*>(ca.db)->BackgroundCallCompaction(
|
|
prepicked_compaction, Env::Priority::LOW);
|
|
delete prepicked_compaction;
|
|
}
|
|
|
|
void DBImpl::BGWorkBottomCompaction(void* arg) {
|
|
CompactionArg ca = *(static_cast<CompactionArg*>(arg));
|
|
delete static_cast<CompactionArg*>(arg);
|
|
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::BOTTOM);
|
|
TEST_SYNC_POINT("DBImpl::BGWorkBottomCompaction");
|
|
auto* prepicked_compaction = ca.prepicked_compaction;
|
|
assert(prepicked_compaction && prepicked_compaction->compaction &&
|
|
!prepicked_compaction->manual_compaction_state);
|
|
ca.db->BackgroundCallCompaction(prepicked_compaction, Env::Priority::BOTTOM);
|
|
delete prepicked_compaction;
|
|
}
|
|
|
|
void DBImpl::BGWorkPurge(void* db) {
|
|
IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);
|
|
TEST_SYNC_POINT("DBImpl::BGWorkPurge:start");
|
|
reinterpret_cast<DBImpl*>(db)->BackgroundCallPurge();
|
|
TEST_SYNC_POINT("DBImpl::BGWorkPurge:end");
|
|
}
|
|
|
|
void DBImpl::UnscheduleCallback(void* arg) {
|
|
CompactionArg ca = *(reinterpret_cast<CompactionArg*>(arg));
|
|
delete reinterpret_cast<CompactionArg*>(arg);
|
|
if (ca.prepicked_compaction != nullptr) {
|
|
if (ca.prepicked_compaction->compaction != nullptr) {
|
|
delete ca.prepicked_compaction->compaction;
|
|
}
|
|
delete ca.prepicked_compaction;
|
|
}
|
|
TEST_SYNC_POINT("DBImpl::UnscheduleCallback");
|
|
}
|
|
|
|
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
|
|
LogBuffer* log_buffer) {
|
|
mutex_.AssertHeld();
|
|
|
|
Status status = bg_error_;
|
|
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
|
|
status = Status::ShutdownInProgress();
|
|
}
|
|
|
|
if (!status.ok()) {
|
|
return status;
|
|
}
|
|
|
|
ColumnFamilyData* cfd = nullptr;
|
|
while (!flush_queue_.empty()) {
|
|
// This cfd is already referenced
|
|
auto first_cfd = PopFirstFromFlushQueue();
|
|
|
|
if (first_cfd->IsDropped() || !first_cfd->imm()->IsFlushPending()) {
|
|
// can't flush this CF, try next one
|
|
if (first_cfd->Unref()) {
|
|
delete first_cfd;
|
|
}
|
|
continue;
|
|
}
|
|
|
|
// found a flush!
|
|
cfd = first_cfd;
|
|
break;
|
|
}
|
|
|
|
if (cfd != nullptr) {
|
|
const MutableCFOptions mutable_cf_options =
|
|
*cfd->GetLatestMutableCFOptions();
|
|
auto bg_job_limits = GetBGJobLimits();
|
|
ROCKS_LOG_BUFFER(
|
|
log_buffer,
|
|
"Calling FlushMemTableToOutputFile with column "
|
|
"family [%s], flush slots available %d, compaction slots available %d, "
|
|
"flush slots scheduled %d, compaction slots scheduled %d",
|
|
cfd->GetName().c_str(), bg_job_limits.max_flushes,
|
|
bg_job_limits.max_compactions, bg_flush_scheduled_,
|
|
bg_compaction_scheduled_);
|
|
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
|
|
job_context, log_buffer);
|
|
if (cfd->Unref()) {
|
|
delete cfd;
|
|
}
|
|
}
|
|
return status;
|
|
}
|
|
|
|
void DBImpl::BackgroundCallFlush() {
|
|
bool made_progress = false;
|
|
JobContext job_context(next_job_id_.fetch_add(1), true);
|
|
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:start");
|
|
|
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
|
|
immutable_db_options_.info_log.get());
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
assert(bg_flush_scheduled_);
|
|
num_running_flushes_++;
|
|
|
|
auto pending_outputs_inserted_elem =
|
|
CaptureCurrentFileNumberInPendingOutputs();
|
|
|
|
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer);
|
|
if (!s.ok() && !s.IsShutdownInProgress()) {
|
|
// Wait a little bit before retrying background flush in
|
|
// case this is an environmental problem and we do not want to
|
|
// chew up resources for failed flushes for the duration of
|
|
// the problem.
|
|
uint64_t error_cnt =
|
|
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
|
|
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
|
|
mutex_.Unlock();
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"Waiting after background flush error: %s"
|
|
"Accumulated background error counts: %" PRIu64,
|
|
s.ToString().c_str(), error_cnt);
|
|
log_buffer.FlushBufferToLog();
|
|
LogFlush(immutable_db_options_.info_log);
|
|
env_->SleepForMicroseconds(1000000);
|
|
mutex_.Lock();
|
|
}
|
|
|
|
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
|
|
|
// If flush failed, we want to delete all temporary files that we might have
|
|
// created. Thus, we force full scan in FindObsoleteFiles()
|
|
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
|
|
// delete unnecessary files if any, this is done outside the mutex
|
|
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
|
|
mutex_.Unlock();
|
|
// Have to flush the info logs before bg_flush_scheduled_--
|
|
// because if bg_flush_scheduled_ becomes 0 and the lock is
|
|
// released, the deconstructor of DB can kick in and destroy all the
|
|
// states of DB so info_log might not be available after that point.
|
|
// It also applies to access other states that DB owns.
|
|
log_buffer.FlushBufferToLog();
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
PurgeObsoleteFiles(job_context);
|
|
}
|
|
job_context.Clean();
|
|
mutex_.Lock();
|
|
}
|
|
|
|
assert(num_running_flushes_ > 0);
|
|
num_running_flushes_--;
|
|
bg_flush_scheduled_--;
|
|
// See if there's more work to be done
|
|
MaybeScheduleFlushOrCompaction();
|
|
bg_cv_.SignalAll();
|
|
// IMPORTANT: there should be no code after calling SignalAll. This call may
|
|
// signal the DB destructor that it's OK to proceed with destruction. In
|
|
// that case, all DB variables will be dealloacated and referencing them
|
|
// will cause trouble.
|
|
}
|
|
}
|
|
|
|
void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
|
|
Env::Priority bg_thread_pri) {
|
|
bool made_progress = false;
|
|
JobContext job_context(next_job_id_.fetch_add(1), true);
|
|
TEST_SYNC_POINT("BackgroundCallCompaction:0");
|
|
MaybeDumpStats();
|
|
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
|
|
immutable_db_options_.info_log.get());
|
|
{
|
|
InstrumentedMutexLock l(&mutex_);
|
|
|
|
// This call will unlock/lock the mutex to wait for current running
|
|
// IngestExternalFile() calls to finish.
|
|
WaitForIngestFile();
|
|
|
|
num_running_compactions_++;
|
|
|
|
auto pending_outputs_inserted_elem =
|
|
CaptureCurrentFileNumberInPendingOutputs();
|
|
|
|
assert((bg_thread_pri == Env::Priority::BOTTOM &&
|
|
bg_bottom_compaction_scheduled_) ||
|
|
(bg_thread_pri == Env::Priority::LOW && bg_compaction_scheduled_));
|
|
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer,
|
|
prepicked_compaction);
|
|
TEST_SYNC_POINT("BackgroundCallCompaction:1");
|
|
if (!s.ok() && !s.IsShutdownInProgress()) {
|
|
// Wait a little bit before retrying background compaction in
|
|
// case this is an environmental problem and we do not want to
|
|
// chew up resources for failed compactions for the duration of
|
|
// the problem.
|
|
uint64_t error_cnt =
|
|
default_cf_internal_stats_->BumpAndGetBackgroundErrorCount();
|
|
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
|
|
mutex_.Unlock();
|
|
log_buffer.FlushBufferToLog();
|
|
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
|
"Waiting after background compaction error: %s, "
|
|
"Accumulated background error counts: %" PRIu64,
|
|
s.ToString().c_str(), error_cnt);
|
|
LogFlush(immutable_db_options_.info_log);
|
|
env_->SleepForMicroseconds(1000000);
|
|
mutex_.Lock();
|
|
}
|
|
|
|
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
|
|
|
// If compaction failed, we want to delete all temporary files that we might
|
|
// have created (they might not be all recorded in job_context in case of a
|
|
// failure). Thus, we force full scan in FindObsoleteFiles()
|
|
FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
|
|
|
|
// delete unnecessary files if any, this is done outside the mutex
|
|
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
|
|
mutex_.Unlock();
|
|
// Have to flush the info logs before bg_compaction_scheduled_--
|
|
// because if bg_flush_scheduled_ becomes 0 and the lock is
|
|
// released, the deconstructor of DB can kick in and destroy all the
|
|
// states of DB so info_log might not be available after that point.
|
|
// It also applies to access other states that DB owns.
|
|
log_buffer.FlushBufferToLog();
|
|
if (job_context.HaveSomethingToDelete()) {
|
|
PurgeObsoleteFiles(job_context);
|
|
}
|
|
job_context.Clean();
|
|
mutex_.Lock();
|
|
}
|
|
|
|
assert(num_running_compactions_ > 0);
|
|
num_running_compactions_--;
|
|
if (bg_thread_pri == Env::Priority::LOW) {
|
|
bg_compaction_scheduled_--;
|
|
} else {
|
|
assert(bg_thread_pri == Env::Priority::BOTTOM);
|
|
bg_bottom_compaction_scheduled_--;
|
|
}
|
|
|
|
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
|
|
|
|
// See if there's more work to be done
|
|
MaybeScheduleFlushOrCompaction();
|
|
if (made_progress ||
|
|
(bg_compaction_scheduled_ == 0 &&
|
|
bg_bottom_compaction_scheduled_ == 0) ||
|
|
HasPendingManualCompaction()) {
|
|
// signal if
|
|
// * made_progress -- need to wakeup DelayWrite
|
|
// * bg_{bottom,}_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
|
|
// * HasPendingManualCompaction -- need to wakeup RunManualCompaction
|
|
// If none of this is true, there is no need to signal since nobody is
|
|
// waiting for it
|
|
bg_cv_.SignalAll();
|
|
}
|
|
// IMPORTANT: there should be no code after calling SignalAll. This call may
|
|
// signal the DB destructor that it's OK to proceed with destruction. In
|
|
// that case, all DB variables will be dealloacated and referencing them
|
|
// will cause trouble.
|
|
}
|
|
}
|
|
|
|
Status DBImpl::BackgroundCompaction(bool* made_progress,
|
|
JobContext* job_context,
|
|
LogBuffer* log_buffer,
|
|
PrepickedCompaction* prepicked_compaction) {
|
|
ManualCompactionState* manual_compaction =
|
|
prepicked_compaction == nullptr
|
|
? nullptr
|
|
: prepicked_compaction->manual_compaction_state;
|
|
*made_progress = false;
|
|
mutex_.AssertHeld();
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Start");
|
|
|
|
bool is_manual = (manual_compaction != nullptr);
|
|
unique_ptr<Compaction> c;
|
|
if (prepicked_compaction != nullptr &&
|
|
prepicked_compaction->compaction != nullptr) {
|
|
c.reset(prepicked_compaction->compaction);
|
|
}
|
|
bool is_prepicked = is_manual || c;
|
|
|
|
// (manual_compaction->in_progress == false);
|
|
bool trivial_move_disallowed =
|
|
is_manual && manual_compaction->disallow_trivial_move;
|
|
|
|
CompactionJobStats compaction_job_stats;
|
|
Status status = bg_error_;
|
|
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
|
|
status = Status::ShutdownInProgress();
|
|
}
|
|
|
|
if (!status.ok()) {
|
|
if (is_manual) {
|
|
manual_compaction->status = status;
|
|
manual_compaction->done = true;
|
|
manual_compaction->in_progress = false;
|
|
manual_compaction = nullptr;
|
|
}
|
|
return status;
|
|
}
|
|
|
|
if (is_manual) {
|
|
// another thread cannot pick up the same work
|
|
manual_compaction->in_progress = true;
|
|
}
|
|
|
|
// InternalKey manual_end_storage;
|
|
// InternalKey* manual_end = &manual_end_storage;
|
|
if (is_manual) {
|
|
ManualCompactionState* m = manual_compaction;
|
|
assert(m->in_progress);
|
|
if (!c) {
|
|
m->done = true;
|
|
m->manual_end = nullptr;
|
|
ROCKS_LOG_BUFFER(log_buffer,
|
|
"[%s] Manual compaction from level-%d from %s .. "
|
|
"%s; nothing to do\n",
|
|
m->cfd->GetName().c_str(), m->input_level,
|
|
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
|
|
(m->end ? m->end->DebugString().c_str() : "(end)"));
|
|
} else {
|
|
ROCKS_LOG_BUFFER(
|
|
log_buffer,
|
|
"[%s] Manual compaction from level-%d to level-%d from %s .. "
|
|
"%s; will stop at %s\n",
|
|
m->cfd->GetName().c_str(), m->input_level, c->output_level(),
|
|
(m->begin ? m->begin->DebugString().c_str() : "(begin)"),
|
|
(m->end ? m->end->DebugString().c_str() : "(end)"),
|
|
((m->done || m->manual_end == nullptr)
|
|
? "(end)"
|
|
: m->manual_end->DebugString().c_str()));
|
|
}
|
|
} else if (!is_prepicked && !compaction_queue_.empty()) {
|
|
if (HaveManualCompaction(compaction_queue_.front())) {
|
|
// Can't compact right now, but try again later
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction()::Conflict");
|
|
|
|
// Stay in the compaction queue.
|
|
unscheduled_compactions_++;
|
|
|
|
return Status::OK();
|
|
}
|
|
|
|
// cfd is referenced here
|
|
auto cfd = PopFirstFromCompactionQueue();
|
|
// We unreference here because the following code will take a Ref() on
|
|
// this cfd if it is going to use it (Compaction class holds a
|
|
// reference).
|
|
// This will all happen under a mutex so we don't have to be afraid of
|
|
// somebody else deleting it.
|
|
if (cfd->Unref()) {
|
|
delete cfd;
|
|
// This was the last reference of the column family, so no need to
|
|
// compact.
|
|
return Status::OK();
|
|
}
|
|
|
|
// Pick up latest mutable CF Options and use it throughout the
|
|
// compaction job
|
|
// Compaction makes a copy of the latest MutableCFOptions. It should be used
|
|
// throughout the compaction procedure to make sure consistency. It will
|
|
// eventually be installed into SuperVersion
|
|
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
|
if (!mutable_cf_options->disable_auto_compactions && !cfd->IsDropped()) {
|
|
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
|
|
// compaction is not necessary. Need to make sure mutex is held
|
|
// until we make a copy in the following code
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
|
|
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
|
|
if (c != nullptr) {
|
|
// update statistics
|
|
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
|
|
c->inputs(0)->size());
|
|
// There are three things that can change compaction score:
|
|
// 1) When flush or compaction finish. This case is covered by
|
|
// InstallSuperVersionAndScheduleWork
|
|
// 2) When MutableCFOptions changes. This case is also covered by
|
|
// InstallSuperVersionAndScheduleWork, because this is when the new
|
|
// options take effect.
|
|
// 3) When we Pick a new compaction, we "remove" those files being
|
|
// compacted from the calculation, which then influences compaction
|
|
// score. Here we check if we need the new compaction even without the
|
|
// files that are currently being compacted. If we need another
|
|
// compaction, we might be able to execute it in parallel, so we add it
|
|
// to the queue and schedule a new thread.
|
|
if (cfd->NeedsCompaction()) {
|
|
// Yes, we need more compactions!
|
|
AddToCompactionQueue(cfd);
|
|
++unscheduled_compactions_;
|
|
MaybeScheduleFlushOrCompaction();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!c) {
|
|
// Nothing to do
|
|
ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
|
|
} else if (c->deletion_compaction()) {
|
|
// TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
|
|
// file if there is alive snapshot pointing to it
|
|
assert(c->num_input_files(1) == 0);
|
|
assert(c->level() == 0);
|
|
assert(c->column_family_data()->ioptions()->compaction_style ==
|
|
kCompactionStyleFIFO);
|
|
|
|
compaction_job_stats.num_input_files = c->num_input_files(0);
|
|
|
|
for (const auto& f : *c->inputs(0)) {
|
|
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
|
}
|
|
status = versions_->LogAndApply(c->column_family_data(),
|
|
*c->mutable_cf_options(), c->edit(),
|
|
&mutex_, directories_.GetDbDir());
|
|
InstallSuperVersionAndScheduleWorkWrapper(
|
|
c->column_family_data(), job_context, *c->mutable_cf_options());
|
|
ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n",
|
|
c->column_family_data()->GetName().c_str(),
|
|
c->num_input_files(0));
|
|
*made_progress = true;
|
|
} else if (!trivial_move_disallowed && c->IsTrivialMove()) {
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
|
|
// Instrument for event update
|
|
// TODO(yhchiang): add op details for showing trivial-move.
|
|
ThreadStatusUtil::SetColumnFamily(
|
|
c->column_family_data(), c->column_family_data()->ioptions()->env,
|
|
immutable_db_options_.enable_thread_tracking);
|
|
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
|
|
|
|
compaction_job_stats.num_input_files = c->num_input_files(0);
|
|
|
|
// Move files to next level
|
|
int32_t moved_files = 0;
|
|
int64_t moved_bytes = 0;
|
|
for (unsigned int l = 0; l < c->num_input_levels(); l++) {
|
|
if (c->level(l) == c->output_level()) {
|
|
continue;
|
|
}
|
|
for (size_t i = 0; i < c->num_input_files(l); i++) {
|
|
FileMetaData* f = c->input(l, i);
|
|
c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
|
|
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
|
|
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
|
|
f->largest, f->smallest_seqno, f->largest_seqno,
|
|
f->marked_for_compaction);
|
|
|
|
ROCKS_LOG_BUFFER(log_buffer, "[%s] Moving #%" PRIu64
|
|
" to level-%d %" PRIu64 " bytes\n",
|
|
c->column_family_data()->GetName().c_str(),
|
|
f->fd.GetNumber(), c->output_level(),
|
|
f->fd.GetFileSize());
|
|
++moved_files;
|
|
moved_bytes += f->fd.GetFileSize();
|
|
}
|
|
}
|
|
|
|
status = versions_->LogAndApply(c->column_family_data(),
|
|
*c->mutable_cf_options(), c->edit(),
|
|
&mutex_, directories_.GetDbDir());
|
|
// Use latest MutableCFOptions
|
|
InstallSuperVersionAndScheduleWorkWrapper(
|
|
c->column_family_data(), job_context, *c->mutable_cf_options());
|
|
|
|
VersionStorageInfo::LevelSummaryStorage tmp;
|
|
c->column_family_data()->internal_stats()->IncBytesMoved(c->output_level(),
|
|
moved_bytes);
|
|
{
|
|
event_logger_.LogToBuffer(log_buffer)
|
|
<< "job" << job_context->job_id << "event"
|
|
<< "trivial_move"
|
|
<< "destination_level" << c->output_level() << "files" << moved_files
|
|
<< "total_files_size" << moved_bytes;
|
|
}
|
|
ROCKS_LOG_BUFFER(
|
|
log_buffer,
|
|
"[%s] Moved #%d files to level-%d %" PRIu64 " bytes %s: %s\n",
|
|
c->column_family_data()->GetName().c_str(), moved_files,
|
|
c->output_level(), moved_bytes, status.ToString().c_str(),
|
|
c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
|
|
*made_progress = true;
|
|
|
|
// Clear Instrument
|
|
ThreadStatusUtil::ResetThreadStatus();
|
|
} else if (c->column_family_data()->ioptions()->compaction_style ==
|
|
kCompactionStyleUniversal &&
|
|
!is_prepicked && c->output_level() > 0 &&
|
|
c->output_level() ==
|
|
c->column_family_data()
|
|
->current()
|
|
->storage_info()
|
|
->MaxOutputLevel(
|
|
immutable_db_options_.allow_ingest_behind) &&
|
|
env_->GetBackgroundThreads(Env::Priority::BOTTOM) > 0) {
|
|
// Forward universal compactions involving last level to the bottom pool
|
|
// if it exists, such that long-running compactions can't block short-
|
|
// lived ones, like L0->L0s.
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:ForwardToBottomPriPool");
|
|
CompactionArg* ca = new CompactionArg;
|
|
ca->db = this;
|
|
ca->prepicked_compaction = new PrepickedCompaction;
|
|
ca->prepicked_compaction->compaction = c.release();
|
|
ca->prepicked_compaction->manual_compaction_state = nullptr;
|
|
++bg_bottom_compaction_scheduled_;
|
|
env_->Schedule(&DBImpl::BGWorkBottomCompaction, ca, Env::Priority::BOTTOM,
|
|
this, &DBImpl::UnscheduleCallback);
|
|
} else {
|
|
int output_level __attribute__((unused)) = c->output_level();
|
|
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:NonTrivial",
|
|
&output_level);
|
|
|
|
SequenceNumber earliest_write_conflict_snapshot;
|
|
std::vector<SequenceNumber> snapshot_seqs =
|
|
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
|
|
|
assert(is_snapshot_supported_ || snapshots_.empty());
|
|
CompactionJob compaction_job(
|
|
job_context->job_id, c.get(), immutable_db_options_, env_options_,
|
|
versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(),
|
|
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
|
|
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
|
|
table_cache_, &event_logger_,
|
|
c->mutable_cf_options()->paranoid_file_checks,
|
|
c->mutable_cf_options()->report_bg_io_stats, dbname_,
|
|
&compaction_job_stats);
|
|
compaction_job.Prepare();
|
|
|
|
mutex_.Unlock();
|
|
compaction_job.Run();
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
|
|
mutex_.Lock();
|
|
|
|
status = compaction_job.Install(*c->mutable_cf_options());
|
|
if (status.ok()) {
|
|
InstallSuperVersionAndScheduleWorkWrapper(
|
|
c->column_family_data(), job_context, *c->mutable_cf_options());
|
|
}
|
|
*made_progress = true;
|
|
}
|
|
if (c != nullptr) {
|
|
c->ReleaseCompactionFiles(status);
|
|
*made_progress = true;
|
|
NotifyOnCompactionCompleted(
|
|
c->column_family_data(), c.get(), status,
|
|
compaction_job_stats, job_context->job_id);
|
|
}
|
|
// this will unref its input_version and column_family_data
|
|
c.reset();
|
|
|
|
if (status.ok()) {
|
|
// Done
|
|
} else if (status.IsShutdownInProgress()) {
|
|
// Ignore compaction errors found during shutting down
|
|
} else {
|
|
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
|
|
status.ToString().c_str());
|
|
if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
|
|
Status new_bg_error = status;
|
|
// may temporarily unlock and lock the mutex.
|
|
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
|
|
BackgroundErrorReason::kCompaction,
|
|
&new_bg_error, &mutex_);
|
|
if (!new_bg_error.ok()) {
|
|
bg_error_ = new_bg_error;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (is_manual) {
|
|
ManualCompactionState* m = manual_compaction;
|
|
if (!status.ok()) {
|
|
m->status = status;
|
|
m->done = true;
|
|
}
|
|
// For universal compaction:
|
|
// Because universal compaction always happens at level 0, so one
|
|
// compaction will pick up all overlapped files. No files will be
|
|
// filtered out due to size limit and left for a successive compaction.
|
|
// So we can safely conclude the current compaction.
|
|
//
|
|
// Also note that, if we don't stop here, then the current compaction
|
|
// writes a new file back to level 0, which will be used in successive
|
|
// compaction. Hence the manual compaction will never finish.
|
|
//
|
|
// Stop the compaction if manual_end points to nullptr -- this means
|
|
// that we compacted the whole range. manual_end should always point
|
|
// to nullptr in case of universal compaction
|
|
if (m->manual_end == nullptr) {
|
|
m->done = true;
|
|
}
|
|
if (!m->done) {
|
|
// We only compacted part of the requested range. Update *m
|
|
// to the range that is left to be compacted.
|
|
// Universal and FIFO compactions should always compact the whole range
|
|
assert(m->cfd->ioptions()->compaction_style !=
|
|
kCompactionStyleUniversal ||
|
|
m->cfd->ioptions()->num_levels > 1);
|
|
assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO);
|
|
m->tmp_storage = *m->manual_end;
|
|
m->begin = &m->tmp_storage;
|
|
m->incomplete = true;
|
|
}
|
|
m->in_progress = false; // not being processed anymore
|
|
}
|
|
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish");
|
|
return status;
|
|
}
|
|
|
|
bool DBImpl::HasPendingManualCompaction() {
|
|
return (!manual_compaction_dequeue_.empty());
|
|
}
|
|
|
|
void DBImpl::AddManualCompaction(DBImpl::ManualCompactionState* m) {
|
|
manual_compaction_dequeue_.push_back(m);
|
|
}
|
|
|
|
void DBImpl::RemoveManualCompaction(DBImpl::ManualCompactionState* m) {
|
|
// Remove from queue
|
|
std::deque<ManualCompactionState*>::iterator it =
|
|
manual_compaction_dequeue_.begin();
|
|
while (it != manual_compaction_dequeue_.end()) {
|
|
if (m == (*it)) {
|
|
it = manual_compaction_dequeue_.erase(it);
|
|
return;
|
|
}
|
|
it++;
|
|
}
|
|
assert(false);
|
|
return;
|
|
}
|
|
|
|
bool DBImpl::ShouldntRunManualCompaction(ManualCompactionState* m) {
|
|
if (num_running_ingest_file_ > 0) {
|
|
// We need to wait for other IngestExternalFile() calls to finish
|
|
// before running a manual compaction.
|
|
return true;
|
|
}
|
|
if (m->exclusive) {
|
|
return (bg_bottom_compaction_scheduled_ > 0 ||
|
|
bg_compaction_scheduled_ > 0);
|
|
}
|
|
std::deque<ManualCompactionState*>::iterator it =
|
|
manual_compaction_dequeue_.begin();
|
|
bool seen = false;
|
|
while (it != manual_compaction_dequeue_.end()) {
|
|
if (m == (*it)) {
|
|
it++;
|
|
seen = true;
|
|
continue;
|
|
} else if (MCOverlap(m, (*it)) && (!seen && !(*it)->in_progress)) {
|
|
// Consider the other manual compaction *it, conflicts if:
|
|
// overlaps with m
|
|
// and (*it) is ahead in the queue and is not yet in progress
|
|
return true;
|
|
}
|
|
it++;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool DBImpl::HaveManualCompaction(ColumnFamilyData* cfd) {
|
|
// Remove from priority queue
|
|
std::deque<ManualCompactionState*>::iterator it =
|
|
manual_compaction_dequeue_.begin();
|
|
while (it != manual_compaction_dequeue_.end()) {
|
|
if ((*it)->exclusive) {
|
|
return true;
|
|
}
|
|
if ((cfd == (*it)->cfd) && (!((*it)->in_progress || (*it)->done))) {
|
|
// Allow automatic compaction if manual compaction is
|
|
// in progress
|
|
return true;
|
|
}
|
|
it++;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool DBImpl::HasExclusiveManualCompaction() {
|
|
// Remove from priority queue
|
|
std::deque<ManualCompactionState*>::iterator it =
|
|
manual_compaction_dequeue_.begin();
|
|
while (it != manual_compaction_dequeue_.end()) {
|
|
if ((*it)->exclusive) {
|
|
return true;
|
|
}
|
|
it++;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool DBImpl::MCOverlap(ManualCompactionState* m, ManualCompactionState* m1) {
|
|
if ((m->exclusive) || (m1->exclusive)) {
|
|
return true;
|
|
}
|
|
if (m->cfd != m1->cfd) {
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
// JobContext gets created and destructed outside of the lock --
|
|
// we
|
|
// use this convinently to:
|
|
// * malloc one SuperVersion() outside of the lock -- new_superversion
|
|
// * delete SuperVersion()s outside of the lock -- superversions_to_free
|
|
//
|
|
// However, if InstallSuperVersionAndScheduleWork() gets called twice with the
|
|
// same job_context, we can't reuse the SuperVersion() that got
|
|
// malloced because
|
|
// first call already used it. In that rare case, we take a hit and create a
|
|
// new SuperVersion() inside of the mutex. We do similar thing
|
|
// for superversion_to_free
|
|
void DBImpl::InstallSuperVersionAndScheduleWorkWrapper(
|
|
ColumnFamilyData* cfd, JobContext* job_context,
|
|
const MutableCFOptions& mutable_cf_options) {
|
|
mutex_.AssertHeld();
|
|
SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork(
|
|
cfd, job_context->new_superversion, mutable_cf_options);
|
|
job_context->new_superversion = nullptr;
|
|
job_context->superversions_to_free.push_back(old_superversion);
|
|
}
|
|
|
|
SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
|
|
ColumnFamilyData* cfd, SuperVersion* new_sv,
|
|
const MutableCFOptions& mutable_cf_options) {
|
|
mutex_.AssertHeld();
|
|
|
|
// Update max_total_in_memory_state_
|
|
size_t old_memtable_size = 0;
|
|
auto* old_sv = cfd->GetSuperVersion();
|
|
if (old_sv) {
|
|
old_memtable_size = old_sv->mutable_cf_options.write_buffer_size *
|
|
old_sv->mutable_cf_options.max_write_buffer_number;
|
|
}
|
|
|
|
auto* old = cfd->InstallSuperVersion(
|
|
new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
|
|
|
|
// Whenever we install new SuperVersion, we might need to issue new flushes or
|
|
// compactions.
|
|
SchedulePendingFlush(cfd);
|
|
SchedulePendingCompaction(cfd);
|
|
MaybeScheduleFlushOrCompaction();
|
|
|
|
// Update max_total_in_memory_state_
|
|
max_total_in_memory_state_ =
|
|
max_total_in_memory_state_ - old_memtable_size +
|
|
mutable_cf_options.write_buffer_size *
|
|
mutable_cf_options.max_write_buffer_number;
|
|
return old;
|
|
}
|
|
} // namespace rocksdb
|