FlushProcess

Summary:
Abstract out FlushProcess and take it out of DBImpl.
This also includes taking DeletionState outside of DBImpl.

Currently this diff is only doing the refactoring. Future work includes:
1. Decoupling flush_process.cc, make it depend on less state
2. Write flush_process_test, which will mock out everything that FlushProcess depends on and test it in isolation

Test Plan: make check

Reviewers: rven, yhchiang, sdong, ljin

Reviewed By: ljin

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D27561
This commit is contained in:
Igor Canadi 2014-10-28 11:54:33 -07:00
parent efa2fb33b0
commit a39e931e50
13 changed files with 689 additions and 431 deletions

View File

@ -143,7 +143,8 @@ TESTS = \
cuckoo_table_builder_test \
cuckoo_table_reader_test \
cuckoo_table_db_test \
write_batch_with_index_test
write_batch_with_index_test \
flush_job_test
TOOLS = \
sst_dump \
@ -412,6 +413,9 @@ ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS)
write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
dbformat_test: db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/dbformat_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

View File

@ -20,6 +20,7 @@
#include <limits>
#include "db/db_impl.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/internal_stats.h"
#include "db/compaction_picker.h"
@ -71,15 +72,15 @@ ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(ColumnFamilyData* cfd,
ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() {
if (cfd_ != nullptr) {
DBImpl::DeletionState deletion_state;
JobContext job_context;
mutex_->Lock();
if (cfd_->Unref()) {
delete cfd_;
}
db_->FindObsoleteFiles(deletion_state, false, true);
db_->FindObsoleteFiles(&job_context, false, true);
mutex_->Unlock();
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context);
}
}
}

View File

@ -42,7 +42,7 @@ Status DBImpl::DisableFileDeletions() {
}
Status DBImpl::EnableFileDeletions(bool force) {
DeletionState deletion_state;
JobContext job_context;
bool should_purge_files = false;
{
MutexLock l(&mutex_);
@ -55,7 +55,7 @@ Status DBImpl::EnableFileDeletions(bool force) {
if (disable_delete_obsolete_files_ == 0) {
Log(db_options_.info_log, "File Deletions Enabled");
should_purge_files = true;
FindObsoleteFiles(deletion_state, true);
FindObsoleteFiles(&job_context, true);
} else {
Log(db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
@ -63,7 +63,7 @@ Status DBImpl::EnableFileDeletions(bool force) {
}
}
if (should_purge_files) {
PurgeObsoleteFiles(deletion_state);
PurgeObsoleteFiles(job_context);
}
LogFlush(db_options_.info_log);
return Status::OK();

View File

@ -27,6 +27,7 @@
#include <vector>
#include "db/builder.h"
#include "db/flush_job.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
@ -412,12 +413,12 @@ DBImpl::~DBImpl() {
// result, all "live" files can get deleted by accident. However, corrupted
// manifest is recoverable by RepairDB().
if (opened_successfully_) {
DeletionState deletion_state;
FindObsoleteFiles(deletion_state, true);
JobContext job_context;
FindObsoleteFiles(&job_context, true);
// manifest number starting from 2
deletion_state.manifest_file_number = 1;
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state);
job_context.manifest_file_number = 1;
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
}
@ -531,8 +532,7 @@ void DBImpl::MaybeDumpStats() {
// force = false -- don't force the full scan, except every
// db_options_.delete_obsolete_files_period_micros
// force = true -- force the full scan
void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
bool force,
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
bool no_full_scan) {
mutex_.AssertHeld();
@ -558,16 +558,16 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
}
// get obsolete files
versions_->GetObsoleteFiles(&deletion_state.sst_delete_files);
versions_->GetObsoleteFiles(&job_context->sst_delete_files);
// store the current filenum, lognum, etc
deletion_state.manifest_file_number = versions_->ManifestFileNumber();
deletion_state.pending_manifest_file_number =
job_context->manifest_file_number = versions_->ManifestFileNumber();
job_context->pending_manifest_file_number =
versions_->PendingManifestFileNumber();
deletion_state.log_number = versions_->MinLogNumber();
deletion_state.prev_log_number = versions_->PrevLogNumber();
job_context->log_number = versions_->MinLogNumber();
job_context->prev_log_number = versions_->PrevLogNumber();
if (!doing_the_full_scan && !deletion_state.HaveSomethingToDelete()) {
if (!doing_the_full_scan && !job_context->HaveSomethingToDelete()) {
// avoid filling up sst_live if we're sure that we
// are not going to do the full scan and that we don't have
// anything to delete at the moment
@ -576,11 +576,9 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// don't delete live files
for (auto pair : pending_outputs_) {
deletion_state.sst_live.emplace_back(pair.first, pair.second, 0);
job_context->sst_live.emplace_back(pair.first, pair.second, 0);
}
/* deletion_state.sst_live.insert(pending_outputs_.begin(),
pending_outputs_.end());*/
versions_->AddLiveFiles(&deletion_state.sst_live);
versions_->AddLiveFiles(&job_context->sst_live);
if (doing_the_full_scan) {
for (uint32_t path_id = 0;
@ -592,7 +590,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
&files); // Ignore errors
for (std::string file : files) {
// TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
deletion_state.candidate_files.emplace_back("/" + file, path_id);
job_context->candidate_files.emplace_back("/" + file, path_id);
}
}
@ -601,7 +599,7 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
std::vector<std::string> log_files;
env_->GetChildren(db_options_.wal_dir, &log_files); // Ignore errors
for (std::string log_file : log_files) {
deletion_state.candidate_files.emplace_back(log_file, 0);
job_context->candidate_files.emplace_back(log_file, 0);
}
}
// Add info log files in db_log_dir
@ -610,15 +608,15 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
// Ignore errors
env_->GetChildren(db_options_.db_log_dir, &info_log_files);
for (std::string log_file : info_log_files) {
deletion_state.candidate_files.emplace_back(log_file, 0);
job_context->candidate_files.emplace_back(log_file, 0);
}
}
}
}
namespace {
bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first,
const rocksdb::DBImpl::CandidateFileInfo& second) {
bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
const JobContext::CandidateFileInfo& second) {
if (first.file_name > second.file_name) {
return true;
} else if (first.file_name < second.file_name) {
@ -633,7 +631,7 @@ bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first,
// belong to live files are posibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
// we'd better have sth to delete
assert(state.HaveSomethingToDelete());
@ -647,13 +645,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
// Now, convert live list to an unordered map, WITHOUT mutex held;
// set is slow.
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
for (FileDescriptor& fd : state.sst_live) {
for (const FileDescriptor& fd : state.sst_live) {
sst_live_map[fd.GetNumber()] = &fd;
}
auto& candidate_files = state.candidate_files;
candidate_files.reserve(
candidate_files.size() +
auto candidate_files = state.candidate_files;
candidate_files.reserve(candidate_files.size() +
state.sst_delete_files.size() +
state.log_delete_files.size());
// We may ignore the dbname when generating the file names.
@ -784,10 +781,10 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
void DBImpl::DeleteObsoleteFiles() {
mutex_.AssertHeld();
DeletionState deletion_state;
FindObsoleteFiles(deletion_state, true);
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state);
JobContext job_context;
FindObsoleteFiles(&job_context, true);
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
}
@ -1480,159 +1477,23 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
return s;
}
Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
*filenumber = meta.fd.GetNumber();
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber();
Version* base = cfd->current();
base->Ref(); // it is likely that we do not need this reference
Status s;
{
mutex_.Unlock();
log_buffer->FlushBufferToLog();
std::vector<Iterator*> memtables;
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
for (MemTable* m : mems) {
Log(db_options_.info_log,
"[%s] Flushing memtable with next log file: %" PRIu64 "\n",
cfd->GetName().c_str(), m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
}
{
ScopedArenaIterator iter(NewMergingIterator(&cfd->internal_comparator(),
&memtables[0],
memtables.size(), &arena));
Log(db_options_.info_log,
"[%s] Level-0 flush table #%" PRIu64 ": started",
cfd->GetName().c_str(), meta.fd.GetNumber());
s = BuildTable(
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
iter.get(), &meta, cfd->internal_comparator(), newest_snapshot,
earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()),
cfd->ioptions()->compression_opts, Env::IO_HIGH);
LogFlush(db_options_.info_log);
}
Log(db_options_.info_log,
"[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str());
if (!db_options_.disableDataSync) {
db_directory_->Fsync();
}
mutex_.Lock();
}
base->Unref();
// re-acquire the most current version
base = cfd->current();
// There could be multiple threads writing to its own level-0 file.
// The pending_outputs cannot be cleared here, otherwise this newly
// created file might not be considered as a live-file by another
// compaction thread that is concurrently deleting obselete files.
// The pending_outputs can be cleared only after the new version is
// committed so that other threads can recognize this file as a
// valid one.
// pending_outputs_.erase(meta.number);
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.fd.GetFileSize() > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
// if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for
// that key range.
if (base != nullptr && db_options_.max_background_compactions <= 1 &&
db_options_.max_background_flushes == 0 &&
cfd->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->PickLevelForMemTableOutput(
mutable_cf_options, min_user_key, max_user_key);
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno);
}
InternalStats::CompactionStats stats(1);
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize();
cfd->internal_stats()->AddCompactionStats(level, stats);
cfd->internal_stats()->AddCFStats(
InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
return s;
}
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer) {
bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();
assert(cfd->imm()->size() != 0);
assert(cfd->imm()->IsFlushPending());
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
autovector<MemTable*> mems;
cfd->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
LogToBuffer(log_buffer, "[%s] Nothing in memtable to flush",
cfd->GetName().c_str());
return Status::OK();
}
FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
env_options_, versions_.get(), &mutex_, &shutting_down_,
&pending_outputs_, snapshots_.GetNewest(), job_context,
log_buffer, db_directory_.get(),
GetCompressionFlush(*cfd->ioptions()), stats_);
// record the logfile_number_ before we release the mutex
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
MemTable* m = mems[0];
VersionEdit* edit = m->GetEdits();
edit->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
// will no longer be picked up for recovery.
edit->SetLogNumber(mems.back()->GetNextLogNumber());
edit->SetColumnFamily(cfd->GetID());
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit,
&file_number, log_buffer);
if (s.ok() &&
(shutting_down_.load(std::memory_order_acquire) || cfd->IsDropped())) {
s = Status::ShutdownInProgress(
"Database shutdown or Column family drop during flush");
}
if (!s.ok()) {
cfd->imm()->RollbackMemtableFlush(mems, file_number, &pending_outputs_);
} else {
// Replace immutable memtable with the generated Table
s = cfd->imm()->InstallMemtableFlushResults(
cfd, mutable_cf_options, mems, versions_.get(), &mutex_,
db_options_.info_log.get(), file_number, &pending_outputs_,
&deletion_state.memtables_to_free, db_directory_.get(), log_buffer);
}
Status s = flush_job.Run();
if (s.ok()) {
InstallSuperVersion(cfd, deletion_state, mutable_cf_options);
InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
if (madeProgress) {
*madeProgress = 1;
}
@ -1645,7 +1506,7 @@ Status DBImpl::FlushMemTableToOutputFile(
while (alive_log_files_.size() &&
alive_log_files_.begin()->number < versions_->MinLogNumber()) {
const auto& earliest = *alive_log_files_.begin();
deletion_state.log_delete_files.push_back(earliest.number);
job_context->log_delete_files.push_back(earliest.number);
total_log_size_ -= earliest.size;
alive_log_files_.pop_front();
}
@ -2082,8 +1943,7 @@ void DBImpl::BGWorkCompaction(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}
Status DBImpl::BackgroundFlush(bool* madeProgress,
DeletionState& deletion_state,
Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer) {
mutex_.AssertHeld();
@ -2109,7 +1969,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
cfd->GetName().c_str(),
db_options_.max_background_flushes - bg_flush_scheduled_);
flush_status = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer);
cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
}
if (call_status.ok() && !flush_status.ok()) {
call_status = flush_status;
@ -2122,7 +1982,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
DeletionState deletion_state(true);
JobContext job_context(true);
assert(bg_flush_scheduled_);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
@ -2131,7 +1991,7 @@ void DBImpl::BackgroundCallFlush() {
Status s;
if (!shutting_down_.load(std::memory_order_acquire)) {
s = BackgroundFlush(&madeProgress, deletion_state, &log_buffer);
s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
@ -2154,9 +2014,9 @@ void DBImpl::BackgroundCallFlush() {
// If !s.ok(), this means that Flush failed. In that case, we want
// to delete all obsolete files and we force FindObsoleteFiles()
FindObsoleteFiles(deletion_state, !s.ok());
FindObsoleteFiles(&job_context, !s.ok());
// delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_flush_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
@ -2164,8 +2024,8 @@ void DBImpl::BackgroundCallFlush() {
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state);
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
mutex_.Lock();
}
@ -2189,7 +2049,7 @@ void DBImpl::BackgroundCallFlush() {
void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
DeletionState deletion_state(true);
JobContext job_context(true);
MaybeDumpStats();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
@ -2198,7 +2058,7 @@ void DBImpl::BackgroundCallCompaction() {
assert(bg_compaction_scheduled_);
Status s;
if (!shutting_down_.load(std::memory_order_acquire)) {
s = BackgroundCompaction(&madeProgress, deletion_state, &log_buffer);
s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
@ -2221,12 +2081,12 @@ void DBImpl::BackgroundCallCompaction() {
// If !s.ok(), this means that Compaction failed. In that case, we want
// to delete all obsolete files we might have created and we force
// FindObsoleteFiles(). This is because deletion_state does not catch
// all created files if compaction failed.
FindObsoleteFiles(deletion_state, !s.ok());
// FindObsoleteFiles(). This is because job_context does not
// catch all created files if compaction failed.
FindObsoleteFiles(&job_context, !s.ok());
// delete unnecessary files if any, this is done outside the mutex
if (deletion_state.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock();
// Have to flush the info logs before bg_compaction_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is
@ -2234,8 +2094,8 @@ void DBImpl::BackgroundCallCompaction() {
// states of DB so info_log might not be available after that point.
// It also applies to access other states that DB owns.
log_buffer.FlushBufferToLog();
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state);
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
mutex_.Lock();
}
@ -2271,8 +2131,7 @@ void DBImpl::BackgroundCallCompaction() {
}
}
Status DBImpl::BackgroundCompaction(bool* madeProgress,
DeletionState& deletion_state,
Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer) {
*madeProgress = false;
mutex_.AssertHeld();
@ -2312,7 +2171,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
db_options_.max_background_compactions - bg_compaction_scheduled_);
cfd->Ref();
flush_stat = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer);
cfd, mutable_cf_options, madeProgress, job_context, log_buffer);
cfd->Unref();
if (!flush_stat.ok()) {
if (is_manual) {
@ -2388,7 +2247,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
status = versions_->LogAndApply(
c->column_family_data(), *c->mutable_cf_options(), c->edit(),
&mutex_, db_directory_.get());
InstallSuperVersion(c->column_family_data(), deletion_state,
InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options());
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
@ -2407,7 +2266,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
*c->mutable_cf_options(),
c->edit(), &mutex_, db_directory_.get());
// Use latest MutableCFOptions
InstallSuperVersion(c->column_family_data(), deletion_state,
InstallSuperVersionBackground(c->column_family_data(), job_context,
*c->mutable_cf_options());
Version::LevelSummaryStorage tmp;
@ -2423,8 +2282,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact, *c->mutable_cf_options(),
deletion_state, log_buffer);
status = DoCompactionWork(compact, *c->mutable_cf_options(), job_context,
log_buffer);
CleanupCompaction(compact, status);
c->ReleaseCompactionFiles(status);
c->ReleaseInputs();
@ -2694,9 +2553,9 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
return 0;
}
uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state,
LogBuffer* log_buffer) {
uint64_t DBImpl::CallFlushDuringCompaction(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
JobContext* job_context, LogBuffer* log_buffer) {
if (db_options_.max_background_flushes > 0) {
// flush thread will take care of this
return 0;
@ -2706,8 +2565,8 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
cfd->Ref();
FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr,
deletion_state, log_buffer);
FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, job_context,
log_buffer);
cfd->Unref();
bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary
}
@ -2719,18 +2578,11 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
}
Status DBImpl::ProcessKeyValueCompaction(
const MutableCFOptions& mutable_cf_options,
bool is_snapshot_supported,
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot,
DeletionState& deletion_state,
bool bottommost_level,
int64_t& imm_micros,
Iterator* input,
CompactionState* compact,
bool is_compaction_v2,
int* num_output_records,
const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported,
SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot, JobContext* job_context,
bool bottommost_level, int64_t* imm_micros, Iterator* input,
CompactionState* compact, bool is_compaction_v2, int* num_output_records,
LogBuffer* log_buffer) {
assert(num_output_records != nullptr);
@ -2786,8 +2638,8 @@ Status DBImpl::ProcessKeyValueCompaction(
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += CallFlushDuringCompaction(
cfd, mutable_cf_options, deletion_state, log_buffer);
(*imm_micros) += CallFlushDuringCompaction(cfd, mutable_cf_options,
job_context, log_buffer);
Slice key;
Slice value;
@ -3127,7 +2979,7 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact,
Status DBImpl::DoCompactionWork(CompactionState* compact,
const MutableCFOptions& mutable_cf_options,
DeletionState& deletion_state,
JobContext* job_context,
LogBuffer* log_buffer) {
assert(compact);
compact->CleanupBatchBuffer();
@ -3198,18 +3050,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
false,
&num_output_records,
mutable_cf_options, is_snapshot_supported, visible_at_tip,
earliest_snapshot, latest_snapshot, job_context, bottommost_level,
&imm_micros, input.get(), compact, false, &num_output_records,
log_buffer);
} else {
// temp_backup_input always point to the start of the current buffer
@ -3231,7 +3074,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += CallFlushDuringCompaction(cfd, mutable_cf_options,
deletion_state, log_buffer);
job_context, log_buffer);
Slice key = backup_input->key();
Slice value = backup_input->value();
@ -3281,18 +3124,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
&num_output_records,
mutable_cf_options, is_snapshot_supported, visible_at_tip,
earliest_snapshot, latest_snapshot, job_context, bottommost_level,
&imm_micros, input.get(), compact, true, &num_output_records,
log_buffer);
if (!status.ok()) {
@ -3319,18 +3153,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
&num_output_records,
mutable_cf_options, is_snapshot_supported, visible_at_tip,
earliest_snapshot, latest_snapshot, job_context, bottommost_level,
&imm_micros, input.get(), compact, true, &num_output_records,
log_buffer);
compact->CleanupBatchBuffer();
@ -3343,18 +3168,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
latest_snapshot,
deletion_state,
bottommost_level,
imm_micros,
input.get(),
compact,
true,
&num_output_records,
mutable_cf_options, is_snapshot_supported, visible_at_tip,
earliest_snapshot, latest_snapshot, job_context, bottommost_level,
&imm_micros, input.get(), compact, true, &num_output_records,
log_buffer);
} // checking for compaction filter v2
@ -3421,7 +3237,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (status.ok()) {
status = InstallCompactionResults(compact, mutable_cf_options, log_buffer);
InstallSuperVersion(cfd, deletion_state, mutable_cf_options);
InstallSuperVersionBackground(cfd, job_context, mutable_cf_options);
}
Version::LevelSummaryStorage tmp;
LogToBuffer(
@ -3461,16 +3277,16 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
if (state->super_version->Unref()) {
DBImpl::DeletionState deletion_state;
JobContext job_context;
state->mu->Lock();
state->super_version->Cleanup();
state->db->FindObsoleteFiles(deletion_state, false, true);
state->db->FindObsoleteFiles(&job_context, false, true);
state->mu->Unlock();
delete state->super_version;
if (deletion_state.HaveSomethingToDelete()) {
state->db->PurgeObsoleteFiles(deletion_state);
if (job_context.HaveSomethingToDelete()) {
state->db->PurgeObsoleteFiles(job_context);
}
}
@ -3511,25 +3327,27 @@ Status DBImpl::Get(const ReadOptions& read_options,
return GetImpl(read_options, column_family, key, value);
}
// DeletionState gets created and destructed outside of the lock -- we
// JobContext gets created and destructed outside of the lock --
// we
// use this convinently to:
// * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete SuperVersion()s outside of the lock -- superversions_to_free
//
// However, if InstallSuperVersion() gets called twice with the same,
// deletion_state, we can't reuse the SuperVersion() that got malloced because
// However, if InstallSuperVersion() gets called twice with the same
// job_context, we can't reuse the SuperVersion() that got
// malloced
// because
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl::InstallSuperVersion(
ColumnFamilyData* cfd, DeletionState& deletion_state,
void DBImpl::InstallSuperVersionBackground(
ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
SuperVersion* old_superversion =
InstallSuperVersion(cfd, deletion_state.new_superversion,
mutable_cf_options);
deletion_state.new_superversion = nullptr;
deletion_state.superversions_to_free.push_back(old_superversion);
SuperVersion* old_superversion = InstallSuperVersion(
cfd, job_context->new_superversion, mutable_cf_options);
job_context->new_superversion = nullptr;
job_context->superversions_to_free.push_back(old_superversion);
}
SuperVersion* DBImpl::InstallSuperVersion(
@ -4529,7 +4347,7 @@ Status DBImpl::DeleteFile(std::string name) {
FileMetaData* metadata;
ColumnFamilyData* cfd;
VersionEdit edit;
DeletionState deletion_state(true);
JobContext job_context(true);
{
MutexLock l(&mutex_);
status = versions_->GetMetadataForFile(number, &level, &metadata, &cfd);
@ -4567,15 +4385,15 @@ Status DBImpl::DeleteFile(std::string name) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, db_directory_.get());
if (status.ok()) {
InstallSuperVersion(cfd, deletion_state,
InstallSuperVersionBackground(cfd, &job_context,
*cfd->GetLatestMutableCFOptions());
}
FindObsoleteFiles(deletion_state, false);
FindObsoleteFiles(&job_context, false);
} // lock released here
LogFlush(db_options_.info_log);
// remove files outside the db-lock
if (deletion_state.HaveSomethingToDelete()) {
PurgeObsoleteFiles(deletion_state);
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
{
MutexLock l(&mutex_);

View File

@ -35,6 +35,7 @@
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"
#include "db/job_context.h"
namespace rocksdb {
@ -223,88 +224,19 @@ class DBImpl : public DB {
void TEST_EndWrite(void* w);
#endif // NDEBUG
// Structure to store information for candidate files to delete.
struct CandidateFileInfo {
std::string file_name;
uint32_t path_id;
CandidateFileInfo(std::string name, uint32_t path)
: file_name(name), path_id(path) {}
bool operator==(const CandidateFileInfo& other) const {
return file_name == other.file_name && path_id == other.path_id;
}
};
// needed for CleanupIteratorState
struct DeletionState {
inline bool HaveSomethingToDelete() const {
return candidate_files.size() ||
sst_delete_files.size() ||
log_delete_files.size();
}
// a list of all files that we'll consider deleting
// (every once in a while this is filled up with all files
// in the DB directory)
std::vector<CandidateFileInfo> candidate_files;
// the list of all live sst files that cannot be deleted
std::vector<FileDescriptor> sst_live;
// a list of sst files that we need to delete
std::vector<FileMetaData*> sst_delete_files;
// a list of log files that we need to delete
std::vector<uint64_t> log_delete_files;
// a list of memtables to be free
autovector<MemTable*> memtables_to_free;
autovector<SuperVersion*> superversions_to_free;
SuperVersion* new_superversion; // if nullptr no new superversion
// the current manifest_file_number, log_number and prev_log_number
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number, pending_manifest_file_number, log_number,
prev_log_number;
explicit DeletionState(bool create_superversion = false) {
manifest_file_number = 0;
pending_manifest_file_number = 0;
log_number = 0;
prev_log_number = 0;
new_superversion = create_superversion ? new SuperVersion() : nullptr;
}
~DeletionState() {
// free pending memtables
for (auto m : memtables_to_free) {
delete m;
}
// free superversions
for (auto s : superversions_to_free) {
delete s;
}
// if new_superversion was not used, it will be non-nullptr and needs
// to be freed here
delete new_superversion;
}
};
// Returns the list of live files in 'live' and the list
// of all files in the filesystem in 'candidate_files'.
// If force == false and the last call was less than
// db_options_.delete_obsolete_files_period_micros microseconds ago,
// it will not fill up the deletion_state
void FindObsoleteFiles(DeletionState& deletion_state,
bool force,
// it will not fill up the job_context
void FindObsoleteFiles(JobContext* job_context, bool force,
bool no_full_scan = false);
// Diffs the files listed in filenames and those that do not
// belong to live files are posibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void PurgeObsoleteFiles(DeletionState& deletion_state);
void PurgeObsoleteFiles(const JobContext& background_contet);
ColumnFamilyHandle* DefaultColumnFamily() const;
@ -347,9 +279,10 @@ class DBImpl : public DB {
// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer);
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
@ -362,11 +295,6 @@ class DBImpl : public DB {
// concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
VersionEdit* edit);
Status WriteLevel0Table(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer);
Status DelayWrite(uint64_t expiration_time);
Status ScheduleFlushes(WriteContext* context);
@ -388,38 +316,31 @@ class DBImpl : public DB {
static void BGWorkFlush(void* db);
void BackgroundCallCompaction();
void BackgroundCallFlush();
Status BackgroundCompaction(bool* madeProgress, DeletionState& deletion_state,
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);
Status BackgroundFlush(bool* madeProgress, DeletionState& deletion_state,
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);
void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact,
const MutableCFOptions& mutable_cf_options,
DeletionState& deletion_state,
LogBuffer* log_buffer);
JobContext* job_context, LogBuffer* log_buffer);
// This function is called as part of compaction. It enables Flush process to
// preempt compaction, since it's higher prioirty
// Returns: micros spent executing
uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state,
const MutableCFOptions& mutable_cf_options,
JobContext* job_context,
LogBuffer* log_buffer);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(
const MutableCFOptions& mutable_cf_options,
bool is_snapshot_supported,
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot,
DeletionState& deletion_state,
bool bottommost_level,
int64_t& imm_micros,
Iterator* input,
CompactionState* compact,
bool is_compaction_v2,
int* num_output_records,
const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported,
SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot,
SequenceNumber latest_snapshot, JobContext* job_context,
bool bottommost_level, int64_t* imm_micros, Iterator* input,
CompactionState* compact, bool is_compaction_v2, int* num_output_records,
LogBuffer* log_buffer);
// Call compaction_filter_v2->Filter() on kv-pairs in compact
@ -624,10 +545,11 @@ class DBImpl : public DB {
SequenceNumber* prev_snapshot);
// Background threads call this function, which is just a wrapper around
// the cfd->InstallSuperVersion() function. Background threads carry
// deletion_state which can have new_superversion already allocated.
void InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state,
// the InstallSuperVersion() function. Background threads carry
// job_context which can have new_superversion already
// allocated.
void InstallSuperVersionBackground(
ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options);
SuperVersion* InstallSuperVersion(

223
db/flush_job.cc Normal file
View File

@ -0,0 +1,223 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/flush_job.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <vector>
#include "db/builder.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/version_set.h"
#include "port/port.h"
#include "port/likely.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/merger.h"
#include "table/table_builder.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/log_buffer.h"
#include "util/mutexlock.h"
#include "util/perf_context_imp.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
namespace rocksdb {
FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
FileNumToPathIdMap* pending_outputs,
SequenceNumber newest_snapshot, JobContext* job_context,
LogBuffer* log_buffer, Directory* db_directory,
CompressionType output_compression, Statistics* stats)
: dbname_(dbname),
cfd_(cfd),
db_options_(db_options),
mutable_cf_options_(mutable_cf_options),
env_options_(env_options),
versions_(versions),
db_mutex_(db_mutex),
shutting_down_(shutting_down),
pending_outputs_(pending_outputs),
newest_snapshot_(newest_snapshot),
job_context_(job_context),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_compression_(output_compression),
stats_(stats) {}
Status FlushJob::Run() {
// Save the contents of the earliest memtable as a new Table
uint64_t file_number;
autovector<MemTable*> mems;
cfd_->imm()->PickMemtablesToFlush(&mems);
if (mems.empty()) {
LogToBuffer(log_buffer_, "[%s] Nothing in memtable to flush",
cfd_->GetName().c_str());
return Status::OK();
}
// entries mems are (implicitly) sorted in ascending order by their created
// time. We will use the first memtable's `edit` to keep the meta info for
// this flush.
MemTable* m = mems[0];
VersionEdit* edit = m->GetEdits();
edit->SetPrevLogNumber(0);
// SetLogNumber(log_num) indicates logs with number smaller than log_num
// will no longer be picked up for recovery.
edit->SetLogNumber(mems.back()->GetNextLogNumber());
edit->SetColumnFamily(cfd_->GetID());
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(mems, edit, &file_number);
if (s.ok() &&
(shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
s = Status::ShutdownInProgress(
"Database shutdown or Column family drop during flush");
}
if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems, file_number, pending_outputs_);
} else {
// Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems, versions_, db_mutex_, file_number,
pending_outputs_, &job_context_->memtables_to_free, db_directory_,
log_buffer_);
}
return s;
}
Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber) {
db_mutex_->AssertHeld();
const uint64_t start_micros = db_options_.env->NowMicros();
FileMetaData meta;
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
*filenumber = meta.fd.GetNumber();
// path 0 for level 0 file.
pending_outputs_->insert({meta.fd.GetNumber(), 0});
const SequenceNumber earliest_seqno_in_memtable =
mems[0]->GetFirstSequenceNumber();
Version* base = cfd_->current();
base->Ref(); // it is likely that we do not need this reference
Status s;
{
db_mutex_->Unlock();
if (log_buffer_) {
log_buffer_->FlushBufferToLog();
}
std::vector<Iterator*> memtables;
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
for (MemTable* m : mems) {
Log(db_options_.info_log,
"[%s] Flushing memtable with next log file: %" PRIu64 "\n",
cfd_->GetName().c_str(), m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ro, &arena));
}
{
ScopedArenaIterator iter(NewMergingIterator(&cfd_->internal_comparator(),
&memtables[0],
memtables.size(), &arena));
Log(db_options_.info_log,
"[%s] Level-0 flush table #%" PRIu64 ": started",
cfd_->GetName().c_str(), meta.fd.GetNumber());
s = BuildTable(dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), &meta,
cfd_->internal_comparator(), newest_snapshot_,
earliest_seqno_in_memtable, output_compression_,
cfd_->ioptions()->compression_opts, Env::IO_HIGH);
LogFlush(db_options_.info_log);
}
Log(db_options_.info_log,
"[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s",
cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(),
s.ToString().c_str());
if (!db_options_.disableDataSync && db_directory_ != nullptr) {
db_directory_->Fsync();
}
db_mutex_->Lock();
}
base->Unref();
// re-acquire the most current version
base = cfd_->current();
// There could be multiple threads writing to its own level-0 file.
// The pending_outputs cannot be cleared here, otherwise this newly
// created file might not be considered as a live-file by another
// compaction thread that is concurrently deleting obselete files.
// The pending_outputs can be cleared only after the new version is
// committed so that other threads can recognize this file as a
// valid one.
// pending_outputs_.erase(meta.number);
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.fd.GetFileSize() > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
// if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for
// that key range.
if (base != nullptr && db_options_.max_background_compactions <= 1 &&
db_options_.max_background_flushes == 0 &&
cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->PickLevelForMemTableOutput(mutable_cf_options_,
min_user_key, max_user_key);
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno);
}
InternalStats::CompactionStats stats(1);
stats.micros = db_options_.env->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize();
cfd_->internal_stats()->AddCompactionStats(level, stats);
cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
meta.fd.GetFileSize());
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
return s;
}
} // namespace rocksdb

86
db/flush_job.h Normal file
View File

@ -0,0 +1,86 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <deque>
#include <limits>
#include <set>
#include <utility>
#include <vector>
#include <string>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "db/memtable_list.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/transaction_log.h"
#include "util/autovector.h"
#include "util/stop_watch.h"
#include "util/thread_local.h"
#include "util/scoped_arena_iterator.h"
#include "db/internal_stats.h"
#include "db/write_controller.h"
#include "db/flush_scheduler.h"
#include "db/write_thread.h"
#include "db/job_context.h"
namespace rocksdb {
class MemTable;
class TableCache;
class Version;
class VersionEdit;
class VersionSet;
class Arena;
class FlushJob {
public:
// TODO(icanadi) make effort to reduce number of parameters here
// IMPORTANT: mutable_cf_options needs to be alive while FlushJob is alive
FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
const DBOptions& db_options,
const MutableCFOptions& mutable_cf_options,
const EnvOptions& env_options, VersionSet* versions,
port::Mutex* db_mutex, std::atomic<bool>* shutting_down,
FileNumToPathIdMap* pending_outputs, SequenceNumber newest_snapshot,
JobContext* job_context, LogBuffer* log_buffer,
Directory* db_directory, CompressionType output_compression,
Statistics* stats);
~FlushJob() {}
Status Run();
private:
Status WriteLevel0Table(const autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber);
const std::string& dbname_;
ColumnFamilyData* cfd_;
const DBOptions& db_options_;
const MutableCFOptions& mutable_cf_options_;
const EnvOptions& env_options_;
VersionSet* versions_;
port::Mutex* db_mutex_;
std::atomic<bool>* shutting_down_;
FileNumToPathIdMap* pending_outputs_;
SequenceNumber newest_snapshot_;
JobContext* job_context_;
LogBuffer* log_buffer_;
Directory* db_directory_;
CompressionType output_compression_;
Statistics* stats_;
};
} // namespace rocksdb

113
db/flush_job_test.cc Normal file
View File

@ -0,0 +1,113 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/flush_job.h"
#include "db/column_family.h"
#include "db/version_set.h"
#include "rocksdb/cache.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
// TODO(icanadi) Mock out everything else:
// 1. VersionSet
// 2. TableBuilder
// 3. Memtable
class FlushJobTest {
public:
FlushJobTest()
: env_(Env::Default()),
dbname_(test::TmpDir() + "/flush_job_test"),
table_cache_(NewLRUCache(50000, 16, 8)),
versions_(new VersionSet(dbname_, &db_options_, env_options_,
table_cache_.get(), &write_controller_)),
shutting_down_(false) {
ASSERT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
// TODO(icanadi) Remove this once we mock out VersionSet
NewDB();
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back();
ASSERT_OK(versions_->Recover(column_families, false));
}
void NewDB() {
VersionEdit new_db;
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
new_db.SetLastSequence(0);
const std::string manifest = DescriptorFileName(dbname_, 1);
unique_ptr<WritableFile> file;
Status s = env_->NewWritableFile(
manifest, &file, env_->OptimizeForManifestWrite(env_options_));
ASSERT_OK(s);
{
log::Writer log(std::move(file));
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
}
ASSERT_OK(s);
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1, nullptr);
}
Env* env_;
std::string dbname_;
EnvOptions env_options_;
std::shared_ptr<Cache> table_cache_;
WriteController write_controller_;
DBOptions db_options_;
ColumnFamilyOptions cf_options_;
std::unique_ptr<VersionSet> versions_;
port::Mutex mutex_;
std::atomic<bool> shutting_down_;
FileNumToPathIdMap pending_outputs_;
};
TEST(FlushJobTest, Empty) {
JobContext job_context;
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_,
&pending_outputs_, SequenceNumber(), &job_context, nullptr,
nullptr, kNoCompression, nullptr);
ASSERT_OK(flush_job.Run());
}
TEST(FlushJobTest, NonEmpty) {
JobContext job_context;
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
auto new_mem = new MemTable(cfd->internal_comparator(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions());
new_mem->Ref();
for (int i = 1; i < 10000; ++i) {
std::string key(std::to_string(i));
std::string value("value" + std::to_string(i));
new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
}
cfd->imm()->Add(new_mem);
FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(),
db_options_, *cfd->GetLatestMutableCFOptions(),
env_options_, versions_.get(), &mutex_, &shutting_down_,
&pending_outputs_, SequenceNumber(), &job_context, nullptr,
nullptr, kNoCompression, nullptr);
mutex_.Lock();
ASSERT_OK(flush_job.Run());
mutex_.Unlock();
// TODO(icanadi) once you have TableMock, verify that key-values are as
// expected
}
} // namespace rocksdb
int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }

View File

@ -10,6 +10,7 @@
#include <string>
#include <utility>
#include "db/job_context.h"
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/column_family.h"
@ -155,14 +156,14 @@ void ForwardIterator::Cleanup(bool release_sv) {
if (release_sv) {
if (sv_ != nullptr && sv_->Unref()) {
DBImpl::DeletionState deletion_state;
JobContext job_context;
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(deletion_state, false, true);
db_->FindObsoleteFiles(&job_context, false, true);
db_->mutex_.Unlock();
delete sv_;
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
if (job_context.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(job_context);
}
}
}

87
db/job_context.h Normal file
View File

@ -0,0 +1,87 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <string>
#include <vector>
#include "db/column_family.h"
namespace rocksdb {
class MemTable;
struct JobContext {
inline bool HaveSomethingToDelete() const {
return candidate_files.size() || sst_delete_files.size() ||
log_delete_files.size();
}
// Structure to store information for candidate files to delete.
struct CandidateFileInfo {
std::string file_name;
uint32_t path_id;
CandidateFileInfo(std::string name, uint32_t path)
: file_name(std::move(name)), path_id(path) {}
bool operator==(const CandidateFileInfo& other) const {
return file_name == other.file_name && path_id == other.path_id;
}
};
// a list of all files that we'll consider deleting
// (every once in a while this is filled up with all files
// in the DB directory)
std::vector<CandidateFileInfo> candidate_files;
// the list of all live sst files that cannot be deleted
std::vector<FileDescriptor> sst_live;
// a list of sst files that we need to delete
std::vector<FileMetaData*> sst_delete_files;
// a list of log files that we need to delete
std::vector<uint64_t> log_delete_files;
// a list of memtables to be free
autovector<MemTable*> memtables_to_free;
autovector<SuperVersion*> superversions_to_free;
SuperVersion* new_superversion; // if nullptr no new superversion
// the current manifest_file_number, log_number and prev_log_number
// that corresponds to the set of files in 'live'.
uint64_t manifest_file_number, pending_manifest_file_number, log_number,
prev_log_number;
explicit JobContext(bool create_superversion = false) {
manifest_file_number = 0;
pending_manifest_file_number = 0;
log_number = 0;
prev_log_number = 0;
new_superversion = create_superversion ? new SuperVersion() : nullptr;
}
~JobContext() {
// free pending memtables
for (auto m : memtables_to_free) {
delete m;
}
// free superversions
for (auto s : superversions_to_free) {
delete s;
}
// if new_superversion was not used, it will be non-nullptr and needs
// to be freed here
delete new_superversion;
}
};
} // namespace rocksdb

View File

@ -5,6 +5,11 @@
//
#include "db/memtable_list.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <string>
#include "rocksdb/db.h"
#include "db/memtable.h"
@ -161,10 +166,10 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) {
const autovector<MemTable*>& mems, VersionSet* vset, port::Mutex* mu,
uint64_t file_number, FileNumToPathIdMap* pending_outputs,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
mu->AssertHeld();
// flush was sucessful
@ -194,8 +199,8 @@ Status MemTableList::InstallMemtableFlushResults(
break;
}
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%lu started",
cfd->GetName().c_str(), (unsigned long)m->file_number_);
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory);
@ -209,10 +214,9 @@ Status MemTableList::InstallMemtableFlushResults(
uint64_t mem_id = 1; // how many memtables has been flushed.
do {
if (s.ok()) { // commit new state
LogToBuffer(log_buffer,
"[%s] Level-0 commit table #%lu: memtable #%lu done",
cfd->GetName().c_str(), (unsigned long)m->file_number_,
(unsigned long)mem_id);
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " done",
cfd->GetName().c_str(), m->file_number_, mem_id);
current_->Remove(m);
assert(m->file_number_ > 0);
@ -226,10 +230,9 @@ Status MemTableList::InstallMemtableFlushResults(
}
} else {
//commit failed. setup state so that we can flush again.
Log(info_log,
"Level-0 commit table #%lu: memtable #%lu failed",
(unsigned long)m->file_number_,
(unsigned long)mem_id);
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();

View File

@ -114,10 +114,10 @@ class MemTableList {
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer);
const autovector<MemTable*>& m, VersionSet* vset, port::Mutex* mu,
uint64_t file_number, FileNumToPathIdMap* pending_outputs,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().

View File

@ -74,7 +74,7 @@ void PrintStackTraceLine(const char* symbol, void* frame) {
// out source to atos, for the address translation
const int kLineMax = 256;
char cmd[kLineMax];
snprintf(cmd, kLineMax, "xcrun atos %p -p %d 2>&1", frame, pid);
snprintf(cmd, kLineMax, "xcrun atos -d %p -p %d 2>&1", frame, pid);
auto f = popen(cmd, "r");
if (f) {
char line[kLineMax];