Divide db/db_impl.cc
Summary: db_impl.cc is too large to manage. Divide db_impl.cc into db/db_impl.cc, db/db_impl_compaction_flush.cc, db/db_impl_files.cc, db/db_impl_open.cc and db/db_impl_write.cc. Closes https://github.com/facebook/rocksdb/pull/2095 Differential Revision: D4838188 Pulled By: siying fbshipit-source-id: c5f3059
This commit is contained in:
parent
02799ad77a
commit
ce64b8b719
@ -293,6 +293,10 @@ set(SOURCES
|
||||
db/dbformat.cc
|
||||
db/db_filesnapshot.cc
|
||||
db/db_impl.cc
|
||||
db/db_impl_write.cc
|
||||
db/db_impl_compaction_flush.cc
|
||||
db/db_impl_files.cc
|
||||
db/db_impl_open.cc
|
||||
db/db_impl_debug.cc
|
||||
db/db_impl_experimental.cc
|
||||
db/db_impl_readonly.cc
|
||||
|
4096
db/db_impl.cc
4096
db/db_impl.cc
File diff suppressed because it is too large
Load Diff
@ -38,6 +38,7 @@
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/memtablerep.h"
|
||||
#include "rocksdb/status.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
#include "rocksdb/write_buffer_manager.h"
|
||||
#include "table/scoped_arena_iterator.h"
|
||||
@ -1149,6 +1150,10 @@ extern Options SanitizeOptions(const std::string& db,
|
||||
|
||||
extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
|
||||
|
||||
extern CompressionType GetCompressionFlush(
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const MutableCFOptions& mutable_cf_options);
|
||||
|
||||
// Fix user-supplied options to be reasonable
|
||||
template <class T, class V>
|
||||
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
|
||||
|
1731
db/db_impl_compaction_flush.cc
Normal file
1731
db/db_impl_compaction_flush.cc
Normal file
File diff suppressed because it is too large
Load Diff
535
db/db_impl_files.cc
Normal file
535
db/db_impl_files.cc
Normal file
@ -0,0 +1,535 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#include "db/db_impl.h"
|
||||
|
||||
#ifndef __STDC_FORMAT_MACROS
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#endif
|
||||
#include <inttypes.h>
|
||||
#include "db/event_helpers.h"
|
||||
#include "util/file_util.h"
|
||||
#include "util/sst_file_manager_impl.h"
|
||||
|
||||
|
||||
namespace rocksdb {
|
||||
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
|
||||
if (!allow_2pc()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t min_log = 0;
|
||||
|
||||
// we must look through the memtables for two phase transactions
|
||||
// that have been committed but not yet flushed
|
||||
for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (loop_cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto log = loop_cfd->imm()->GetMinLogContainingPrepSection();
|
||||
|
||||
if (log > 0 && (min_log == 0 || log < min_log)) {
|
||||
min_log = log;
|
||||
}
|
||||
|
||||
log = loop_cfd->mem()->GetMinLogContainingPrepSection();
|
||||
|
||||
if (log > 0 && (min_log == 0 || log < min_log)) {
|
||||
min_log = log;
|
||||
}
|
||||
}
|
||||
|
||||
return min_log;
|
||||
}
|
||||
|
||||
void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
|
||||
assert(log != 0);
|
||||
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
|
||||
auto it = prepared_section_completed_.find(log);
|
||||
assert(it != prepared_section_completed_.end());
|
||||
it->second += 1;
|
||||
}
|
||||
|
||||
void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
|
||||
assert(log != 0);
|
||||
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
|
||||
min_log_with_prep_.push(log);
|
||||
auto it = prepared_section_completed_.find(log);
|
||||
if (it == prepared_section_completed_.end()) {
|
||||
prepared_section_completed_[log] = 0;
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
|
||||
|
||||
if (!allow_2pc()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
|
||||
uint64_t min_log = 0;
|
||||
|
||||
// first we look in the prepared heap where we keep
|
||||
// track of transactions that have been prepared (written to WAL)
|
||||
// but not yet committed.
|
||||
while (!min_log_with_prep_.empty()) {
|
||||
min_log = min_log_with_prep_.top();
|
||||
|
||||
auto it = prepared_section_completed_.find(min_log);
|
||||
|
||||
// value was marked as 'deleted' from heap
|
||||
if (it != prepared_section_completed_.end() && it->second > 0) {
|
||||
it->second -= 1;
|
||||
min_log_with_prep_.pop();
|
||||
|
||||
// back to squere one...
|
||||
min_log = 0;
|
||||
continue;
|
||||
} else {
|
||||
// found a valid value
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return min_log;
|
||||
}
|
||||
|
||||
uint64_t DBImpl::MinLogNumberToKeep() {
|
||||
uint64_t log_number = versions_->MinLogNumber();
|
||||
|
||||
if (allow_2pc()) {
|
||||
// if are 2pc we must consider logs containing prepared
|
||||
// sections of outstanding transactions.
|
||||
//
|
||||
// We must check min logs with outstanding prep before we check
|
||||
// logs referneces by memtables because a log referenced by the
|
||||
// first data structure could transition to the second under us.
|
||||
//
|
||||
// TODO(horuff): iterating over all column families under db mutex.
|
||||
// should find more optimial solution
|
||||
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
|
||||
|
||||
if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
|
||||
log_number = min_log_in_prep_heap;
|
||||
}
|
||||
|
||||
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
|
||||
|
||||
if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
|
||||
log_number = min_log_refed_by_mem;
|
||||
}
|
||||
}
|
||||
return log_number;
|
||||
}
|
||||
|
||||
// * Returns the list of live files in 'sst_live'
|
||||
// If it's doing full scan:
|
||||
// * Returns the list of all files in the filesystem in
|
||||
// 'full_scan_candidate_files'.
|
||||
// Otherwise, gets obsolete files from VersionSet.
|
||||
// no_full_scan = true -- never do the full scan using GetChildren()
|
||||
// force = false -- don't force the full scan, except every
|
||||
// mutable_db_options_.delete_obsolete_files_period_micros
|
||||
// force = true -- force the full scan
|
||||
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
||||
bool no_full_scan) {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
// if deletion is disabled, do nothing
|
||||
if (disable_delete_obsolete_files_ > 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool doing_the_full_scan = false;
|
||||
|
||||
// logic for figurint out if we're doing the full scan
|
||||
if (no_full_scan) {
|
||||
doing_the_full_scan = false;
|
||||
} else if (force ||
|
||||
mutable_db_options_.delete_obsolete_files_period_micros == 0) {
|
||||
doing_the_full_scan = true;
|
||||
} else {
|
||||
const uint64_t now_micros = env_->NowMicros();
|
||||
if ((delete_obsolete_files_last_run_ +
|
||||
mutable_db_options_.delete_obsolete_files_period_micros) <
|
||||
now_micros) {
|
||||
doing_the_full_scan = true;
|
||||
delete_obsolete_files_last_run_ = now_micros;
|
||||
}
|
||||
}
|
||||
|
||||
// don't delete files that might be currently written to from compaction
|
||||
// threads
|
||||
// Since job_context->min_pending_output is set, until file scan finishes,
|
||||
// mutex_ cannot be released. Otherwise, we might see no min_pending_output
|
||||
// here but later find newer generated unfinalized files while scannint.
|
||||
if (!pending_outputs_.empty()) {
|
||||
job_context->min_pending_output = *pending_outputs_.begin();
|
||||
} else {
|
||||
// delete all of them
|
||||
job_context->min_pending_output = std::numeric_limits<uint64_t>::max();
|
||||
}
|
||||
|
||||
// Get obsolete files. This function will also update the list of
|
||||
// pending files in VersionSet().
|
||||
versions_->GetObsoleteFiles(&job_context->sst_delete_files,
|
||||
&job_context->manifest_delete_files,
|
||||
job_context->min_pending_output);
|
||||
|
||||
// store the current filenum, lognum, etc
|
||||
job_context->manifest_file_number = versions_->manifest_file_number();
|
||||
job_context->pending_manifest_file_number =
|
||||
versions_->pending_manifest_file_number();
|
||||
job_context->log_number = MinLogNumberToKeep();
|
||||
|
||||
job_context->prev_log_number = versions_->prev_log_number();
|
||||
|
||||
versions_->AddLiveFiles(&job_context->sst_live);
|
||||
if (doing_the_full_scan) {
|
||||
for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
|
||||
path_id++) {
|
||||
// set of all files in the directory. We'll exclude files that are still
|
||||
// alive in the subsequent processings.
|
||||
std::vector<std::string> files;
|
||||
env_->GetChildren(immutable_db_options_.db_paths[path_id].path,
|
||||
&files); // Ignore errors
|
||||
for (std::string file : files) {
|
||||
// TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
|
||||
job_context->full_scan_candidate_files.emplace_back(
|
||||
"/" + file, static_cast<uint32_t>(path_id));
|
||||
}
|
||||
}
|
||||
|
||||
// Add log files in wal_dir
|
||||
if (immutable_db_options_.wal_dir != dbname_) {
|
||||
std::vector<std::string> log_files;
|
||||
env_->GetChildren(immutable_db_options_.wal_dir,
|
||||
&log_files); // Ignore errors
|
||||
for (std::string log_file : log_files) {
|
||||
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
|
||||
}
|
||||
}
|
||||
// Add info log files in db_log_dir
|
||||
if (!immutable_db_options_.db_log_dir.empty() &&
|
||||
immutable_db_options_.db_log_dir != dbname_) {
|
||||
std::vector<std::string> info_log_files;
|
||||
// Ignore errors
|
||||
env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
|
||||
for (std::string log_file : info_log_files) {
|
||||
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// logs_ is empty when called during recovery, in which case there can't yet
|
||||
// be any tracked obsolete logs
|
||||
if (!alive_log_files_.empty() && !logs_.empty()) {
|
||||
uint64_t min_log_number = job_context->log_number;
|
||||
size_t num_alive_log_files = alive_log_files_.size();
|
||||
// find newly obsoleted log files
|
||||
while (alive_log_files_.begin()->number < min_log_number) {
|
||||
auto& earliest = *alive_log_files_.begin();
|
||||
if (immutable_db_options_.recycle_log_file_num >
|
||||
log_recycle_files.size()) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"adding log %" PRIu64 " to recycle list\n",
|
||||
earliest.number);
|
||||
log_recycle_files.push_back(earliest.number);
|
||||
} else {
|
||||
job_context->log_delete_files.push_back(earliest.number);
|
||||
}
|
||||
if (job_context->size_log_to_delete == 0) {
|
||||
job_context->prev_total_log_size = total_log_size_;
|
||||
job_context->num_alive_log_files = num_alive_log_files;
|
||||
}
|
||||
job_context->size_log_to_delete += earliest.size;
|
||||
total_log_size_ -= earliest.size;
|
||||
alive_log_files_.pop_front();
|
||||
// Current log should always stay alive since it can't have
|
||||
// number < MinLogNumber().
|
||||
assert(alive_log_files_.size());
|
||||
}
|
||||
while (!logs_.empty() && logs_.front().number < min_log_number) {
|
||||
auto& log = logs_.front();
|
||||
if (log.getting_synced) {
|
||||
log_sync_cv_.Wait();
|
||||
// logs_ could have changed while we were waiting.
|
||||
continue;
|
||||
}
|
||||
logs_to_free_.push_back(log.ReleaseWriter());
|
||||
logs_.pop_front();
|
||||
}
|
||||
// Current log cannot be obsolete.
|
||||
assert(!logs_.empty());
|
||||
}
|
||||
|
||||
// We're just cleaning up for DB::Write().
|
||||
assert(job_context->logs_to_free.empty());
|
||||
job_context->logs_to_free = logs_to_free_;
|
||||
job_context->log_recycle_files.assign(log_recycle_files.begin(),
|
||||
log_recycle_files.end());
|
||||
logs_to_free_.clear();
|
||||
}
|
||||
|
||||
namespace {
|
||||
bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
|
||||
const JobContext::CandidateFileInfo& second) {
|
||||
if (first.file_name > second.file_name) {
|
||||
return true;
|
||||
} else if (first.file_name < second.file_name) {
|
||||
return false;
|
||||
} else {
|
||||
return (first.path_id > second.path_id);
|
||||
}
|
||||
}
|
||||
}; // namespace
|
||||
|
||||
// Delete obsolete files and log status and information of file deletion
|
||||
void DBImpl::DeleteObsoleteFileImpl(Status file_deletion_status, int job_id,
|
||||
const std::string& fname, FileType type,
|
||||
uint64_t number, uint32_t path_id) {
|
||||
if (type == kTableFile) {
|
||||
file_deletion_status =
|
||||
DeleteSSTFile(&immutable_db_options_, fname, path_id);
|
||||
} else {
|
||||
file_deletion_status = env_->DeleteFile(fname);
|
||||
}
|
||||
if (file_deletion_status.ok()) {
|
||||
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
||||
"[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
|
||||
fname.c_str(), type, number,
|
||||
file_deletion_status.ToString().c_str());
|
||||
} else if (env_->FileExists(fname).IsNotFound()) {
|
||||
ROCKS_LOG_INFO(
|
||||
immutable_db_options_.info_log,
|
||||
"[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
|
||||
" -- %s\n",
|
||||
job_id, fname.c_str(), type, number,
|
||||
file_deletion_status.ToString().c_str());
|
||||
} else {
|
||||
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
||||
"[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
|
||||
job_id, fname.c_str(), type, number,
|
||||
file_deletion_status.ToString().c_str());
|
||||
}
|
||||
if (type == kTableFile) {
|
||||
EventHelpers::LogAndNotifyTableFileDeletion(
|
||||
&event_logger_, job_id, number, fname, file_deletion_status, GetName(),
|
||||
immutable_db_options_.listeners);
|
||||
}
|
||||
}
|
||||
|
||||
// Diffs the files listed in filenames and those that do not
|
||||
// belong to live files are posibly removed. Also, removes all the
|
||||
// files in sst_delete_files and log_delete_files.
|
||||
// It is not necessary to hold the mutex when invoking this method.
|
||||
void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
|
||||
// we'd better have sth to delete
|
||||
assert(state.HaveSomethingToDelete());
|
||||
|
||||
// this checks if FindObsoleteFiles() was run before. If not, don't do
|
||||
// PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also
|
||||
// run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
|
||||
if (state.manifest_file_number == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Now, convert live list to an unordered map, WITHOUT mutex held;
|
||||
// set is slow.
|
||||
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
|
||||
for (const FileDescriptor& fd : state.sst_live) {
|
||||
sst_live_map[fd.GetNumber()] = &fd;
|
||||
}
|
||||
std::unordered_set<uint64_t> log_recycle_files_set(
|
||||
state.log_recycle_files.begin(), state.log_recycle_files.end());
|
||||
|
||||
auto candidate_files = state.full_scan_candidate_files;
|
||||
candidate_files.reserve(
|
||||
candidate_files.size() + state.sst_delete_files.size() +
|
||||
state.log_delete_files.size() + state.manifest_delete_files.size());
|
||||
// We may ignore the dbname when generating the file names.
|
||||
const char* kDumbDbName = "";
|
||||
for (auto file : state.sst_delete_files) {
|
||||
candidate_files.emplace_back(
|
||||
MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
|
||||
file->fd.GetPathId());
|
||||
delete file;
|
||||
}
|
||||
|
||||
for (auto file_num : state.log_delete_files) {
|
||||
if (file_num > 0) {
|
||||
candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), 0);
|
||||
}
|
||||
}
|
||||
for (const auto& filename : state.manifest_delete_files) {
|
||||
candidate_files.emplace_back(filename, 0);
|
||||
}
|
||||
|
||||
// dedup state.candidate_files so we don't try to delete the same
|
||||
// file twice
|
||||
std::sort(candidate_files.begin(), candidate_files.end(),
|
||||
CompareCandidateFile);
|
||||
candidate_files.erase(
|
||||
std::unique(candidate_files.begin(), candidate_files.end()),
|
||||
candidate_files.end());
|
||||
|
||||
if (state.prev_total_log_size > 0) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"[JOB %d] Try to delete WAL files size %" PRIu64
|
||||
", prev total WAL file size %" PRIu64
|
||||
", number of live WAL files %" ROCKSDB_PRIszt ".\n",
|
||||
state.job_id, state.size_log_to_delete,
|
||||
state.prev_total_log_size, state.num_alive_log_files);
|
||||
}
|
||||
|
||||
std::vector<std::string> old_info_log_files;
|
||||
InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
|
||||
dbname_);
|
||||
for (const auto& candidate_file : candidate_files) {
|
||||
std::string to_delete = candidate_file.file_name;
|
||||
uint32_t path_id = candidate_file.path_id;
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
// Ignore file if we cannot recognize it.
|
||||
if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
bool keep = true;
|
||||
switch (type) {
|
||||
case kLogFile:
|
||||
keep = ((number >= state.log_number) ||
|
||||
(number == state.prev_log_number) ||
|
||||
(log_recycle_files_set.find(number) !=
|
||||
log_recycle_files_set.end()));
|
||||
break;
|
||||
case kDescriptorFile:
|
||||
// Keep my manifest file, and any newer incarnations'
|
||||
// (can happen during manifest roll)
|
||||
keep = (number >= state.manifest_file_number);
|
||||
break;
|
||||
case kTableFile:
|
||||
// If the second condition is not there, this makes
|
||||
// DontDeletePendingOutputs fail
|
||||
keep = (sst_live_map.find(number) != sst_live_map.end()) ||
|
||||
number >= state.min_pending_output;
|
||||
break;
|
||||
case kTempFile:
|
||||
// Any temp files that are currently being written to must
|
||||
// be recorded in pending_outputs_, which is inserted into "live".
|
||||
// Also, SetCurrentFile creates a temp file when writing out new
|
||||
// manifest, which is equal to state.pending_manifest_file_number. We
|
||||
// should not delete that file
|
||||
//
|
||||
// TODO(yhchiang): carefully modify the third condition to safely
|
||||
// remove the temp options files.
|
||||
keep = (sst_live_map.find(number) != sst_live_map.end()) ||
|
||||
(number == state.pending_manifest_file_number) ||
|
||||
(to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
|
||||
break;
|
||||
case kInfoLogFile:
|
||||
keep = true;
|
||||
if (number != 0) {
|
||||
old_info_log_files.push_back(to_delete);
|
||||
}
|
||||
break;
|
||||
case kCurrentFile:
|
||||
case kDBLockFile:
|
||||
case kIdentityFile:
|
||||
case kMetaDatabase:
|
||||
case kOptionsFile:
|
||||
keep = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (keep) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string fname;
|
||||
if (type == kTableFile) {
|
||||
// evict from cache
|
||||
TableCache::Evict(table_cache_.get(), number);
|
||||
fname = TableFileName(immutable_db_options_.db_paths, number, path_id);
|
||||
} else {
|
||||
fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) +
|
||||
"/" + to_delete;
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
if (type == kLogFile && (immutable_db_options_.wal_ttl_seconds > 0 ||
|
||||
immutable_db_options_.wal_size_limit_mb > 0)) {
|
||||
wal_manager_.ArchiveWALFile(fname, number);
|
||||
continue;
|
||||
}
|
||||
#endif // !ROCKSDB_LITE
|
||||
|
||||
Status file_deletion_status;
|
||||
if (schedule_only) {
|
||||
InstrumentedMutexLock guard_lock(&mutex_);
|
||||
SchedulePendingPurge(fname, type, number, path_id, state.job_id);
|
||||
} else {
|
||||
DeleteObsoleteFileImpl(file_deletion_status, state.job_id, fname, type,
|
||||
number, path_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Delete old info log files.
|
||||
size_t old_info_log_file_count = old_info_log_files.size();
|
||||
if (old_info_log_file_count != 0 &&
|
||||
old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
|
||||
std::sort(old_info_log_files.begin(), old_info_log_files.end());
|
||||
size_t end =
|
||||
old_info_log_file_count - immutable_db_options_.keep_log_file_num;
|
||||
for (unsigned int i = 0; i <= end; i++) {
|
||||
std::string& to_delete = old_info_log_files.at(i);
|
||||
std::string full_path_to_delete =
|
||||
(immutable_db_options_.db_log_dir.empty()
|
||||
? dbname_
|
||||
: immutable_db_options_.db_log_dir) +
|
||||
"/" + to_delete;
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"[JOB %d] Delete info log file %s\n", state.job_id,
|
||||
full_path_to_delete.c_str());
|
||||
Status s = env_->DeleteFile(full_path_to_delete);
|
||||
if (!s.ok()) {
|
||||
if (env_->FileExists(full_path_to_delete).IsNotFound()) {
|
||||
ROCKS_LOG_INFO(
|
||||
immutable_db_options_.info_log,
|
||||
"[JOB %d] Tried to delete non-existing info log file %s FAILED "
|
||||
"-- %s\n",
|
||||
state.job_id, to_delete.c_str(), s.ToString().c_str());
|
||||
} else {
|
||||
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
|
||||
"[JOB %d] Delete info log file %s FAILED -- %s\n",
|
||||
state.job_id, to_delete.c_str(),
|
||||
s.ToString().c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
#ifndef ROCKSDB_LITE
|
||||
wal_manager_.PurgeObsoleteWALFiles();
|
||||
#endif // ROCKSDB_LITE
|
||||
LogFlush(immutable_db_options_.info_log);
|
||||
}
|
||||
|
||||
void DBImpl::DeleteObsoleteFiles() {
|
||||
mutex_.AssertHeld();
|
||||
JobContext job_context(next_job_id_.fetch_add(1));
|
||||
FindObsoleteFiles(&job_context, true);
|
||||
|
||||
mutex_.Unlock();
|
||||
if (job_context.HaveSomethingToDelete()) {
|
||||
PurgeObsoleteFiles(job_context);
|
||||
}
|
||||
job_context.Clean();
|
||||
mutex_.Lock();
|
||||
}
|
||||
} // namespace rocksdb
|
1081
db/db_impl_open.cc
Normal file
1081
db/db_impl_open.cc
Normal file
File diff suppressed because it is too large
Load Diff
819
db/db_impl_write.cc
Normal file
819
db/db_impl_write.cc
Normal file
@ -0,0 +1,819 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
#include "db/db_impl.h"
|
||||
|
||||
#ifndef __STDC_FORMAT_MACROS
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#endif
|
||||
#include <inttypes.h>
|
||||
#include "util/options_helper.h"
|
||||
#include "util/perf_context_imp.h"
|
||||
#include "util/sync_point.h"
|
||||
|
||||
namespace rocksdb {
|
||||
// Convenience methods
|
||||
Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& val) {
|
||||
return DB::Put(o, column_family, key, val);
|
||||
}
|
||||
|
||||
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& val) {
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
if (!cfh->cfd()->ioptions()->merge_operator) {
|
||||
return Status::NotSupported("Provide a merge_operator when opening DB");
|
||||
} else {
|
||||
return DB::Merge(o, column_family, key, val);
|
||||
}
|
||||
}
|
||||
|
||||
Status DBImpl::Delete(const WriteOptions& write_options,
|
||||
ColumnFamilyHandle* column_family, const Slice& key) {
|
||||
return DB::Delete(write_options, column_family, key);
|
||||
}
|
||||
|
||||
Status DBImpl::SingleDelete(const WriteOptions& write_options,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& key) {
|
||||
return DB::SingleDelete(write_options, column_family, key);
|
||||
}
|
||||
|
||||
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
|
||||
return WriteImpl(write_options, my_batch, nullptr, nullptr);
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
|
||||
WriteBatch* my_batch,
|
||||
WriteCallback* callback) {
|
||||
return WriteImpl(write_options, my_batch, callback, nullptr);
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
Status DBImpl::WriteImpl(const WriteOptions& write_options,
|
||||
WriteBatch* my_batch, WriteCallback* callback,
|
||||
uint64_t* log_used, uint64_t log_ref,
|
||||
bool disable_memtable) {
|
||||
if (my_batch == nullptr) {
|
||||
return Status::Corruption("Batch is nullptr!");
|
||||
}
|
||||
|
||||
Status status;
|
||||
|
||||
PERF_TIMER_GUARD(write_pre_and_post_process_time);
|
||||
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
|
||||
disable_memtable);
|
||||
|
||||
if (!write_options.disableWAL) {
|
||||
RecordTick(stats_, WRITE_WITH_WAL);
|
||||
}
|
||||
|
||||
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
|
||||
|
||||
write_thread_.JoinBatchGroup(&w);
|
||||
if (w.state == WriteThread::STATE_PARALLEL_FOLLOWER) {
|
||||
// we are a non-leader in a parallel group
|
||||
PERF_TIMER_GUARD(write_memtable_time);
|
||||
|
||||
if (w.ShouldWriteToMemtable()) {
|
||||
ColumnFamilyMemTablesImpl column_family_memtables(
|
||||
versions_->GetColumnFamilySet());
|
||||
WriteBatchInternal::SetSequence(w.batch, w.sequence);
|
||||
w.status = WriteBatchInternal::InsertInto(
|
||||
&w, &column_family_memtables, &flush_scheduler_,
|
||||
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
|
||||
true /*concurrent_memtable_writes*/);
|
||||
}
|
||||
|
||||
if (write_thread_.CompleteParallelWorker(&w)) {
|
||||
// we're responsible for early exit
|
||||
auto last_sequence = w.parallel_group->last_sequence;
|
||||
versions_->SetLastSequence(last_sequence);
|
||||
write_thread_.EarlyExitParallelGroup(&w);
|
||||
}
|
||||
assert(w.state == WriteThread::STATE_COMPLETED);
|
||||
// STATE_COMPLETED conditional below handles exit
|
||||
|
||||
status = w.FinalStatus();
|
||||
}
|
||||
if (w.state == WriteThread::STATE_COMPLETED) {
|
||||
if (log_used != nullptr) {
|
||||
*log_used = w.log_used;
|
||||
}
|
||||
// write is complete and leader has updated sequence
|
||||
return w.FinalStatus();
|
||||
}
|
||||
// else we are the leader of the write batch group
|
||||
assert(w.state == WriteThread::STATE_GROUP_LEADER);
|
||||
|
||||
// Once reaches this point, the current writer "w" will try to do its write
|
||||
// job. It may also pick up some of the remaining writers in the "writers_"
|
||||
// when it finds suitable, and finish them in the same write batch.
|
||||
// This is how a write job could be done by the other writer.
|
||||
WriteContext write_context;
|
||||
WriteThread::Writer* last_writer = &w;
|
||||
autovector<WriteThread::Writer*> write_group;
|
||||
bool logs_getting_synced = false;
|
||||
|
||||
mutex_.Lock();
|
||||
|
||||
bool need_log_sync = !write_options.disableWAL && write_options.sync;
|
||||
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
|
||||
status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced,
|
||||
&write_context);
|
||||
uint64_t last_sequence = versions_->LastSequence();
|
||||
log::Writer* cur_log_writer = logs_.back().writer;
|
||||
|
||||
mutex_.Unlock();
|
||||
|
||||
// Add to log and apply to memtable. We can release the lock
|
||||
// during this phase since &w is currently responsible for logging
|
||||
// and protects against concurrent loggers and concurrent writes
|
||||
// into memtables
|
||||
|
||||
bool exit_completed_early = false;
|
||||
last_batch_group_size_ =
|
||||
write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group);
|
||||
|
||||
if (status.ok()) {
|
||||
// Rules for when we can update the memtable concurrently
|
||||
// 1. supported by memtable
|
||||
// 2. Puts are not okay if inplace_update_support
|
||||
// 3. Deletes or SingleDeletes are not okay if filtering deletes
|
||||
// (controlled by both batch and memtable setting)
|
||||
// 4. Merges are not okay
|
||||
//
|
||||
// Rules 1..3 are enforced by checking the options
|
||||
// during startup (CheckConcurrentWritesSupported), so if
|
||||
// options.allow_concurrent_memtable_write is true then they can be
|
||||
// assumed to be true. Rule 4 is checked for each batch. We could
|
||||
// relax rules 2 and 3 if we could prevent write batches from referring
|
||||
// more than once to a particular key.
|
||||
bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
|
||||
write_group.size() > 1;
|
||||
int total_count = 0;
|
||||
uint64_t total_byte_size = 0;
|
||||
for (auto writer : write_group) {
|
||||
if (writer->CheckCallback(this)) {
|
||||
if (writer->ShouldWriteToMemtable()) {
|
||||
total_count += WriteBatchInternal::Count(writer->batch);
|
||||
parallel = parallel && !writer->batch->HasMerge();
|
||||
}
|
||||
|
||||
if (writer->ShouldWriteToWAL()) {
|
||||
total_byte_size = WriteBatchInternal::AppendedByteSize(
|
||||
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const SequenceNumber current_sequence = last_sequence + 1;
|
||||
last_sequence += total_count;
|
||||
|
||||
// Update stats while we are an exclusive group leader, so we know
|
||||
// that nobody else can be writing to these particular stats.
|
||||
// We're optimistic, updating the stats before we successfully
|
||||
// commit. That lets us release our leader status early in
|
||||
// some cases.
|
||||
auto stats = default_cf_internal_stats_;
|
||||
stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count);
|
||||
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
|
||||
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
|
||||
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
|
||||
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
|
||||
RecordTick(stats_, WRITE_DONE_BY_SELF);
|
||||
auto write_done_by_other = write_group.size() - 1;
|
||||
if (write_done_by_other > 0) {
|
||||
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER,
|
||||
write_done_by_other);
|
||||
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
|
||||
}
|
||||
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
|
||||
|
||||
if (write_options.disableWAL) {
|
||||
has_unpersisted_data_.store(true, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
PERF_TIMER_STOP(write_pre_and_post_process_time);
|
||||
|
||||
if (status.ok() && !write_options.disableWAL) {
|
||||
PERF_TIMER_GUARD(write_wal_time);
|
||||
status = WriteToWAL(write_group, cur_log_writer, need_log_sync,
|
||||
need_log_dir_sync, current_sequence);
|
||||
if (log_used != nullptr) {
|
||||
*log_used = logfile_number_;
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
PERF_TIMER_GUARD(write_memtable_time);
|
||||
|
||||
if (!parallel) {
|
||||
status = WriteBatchInternal::InsertInto(
|
||||
write_group, current_sequence, column_family_memtables_.get(),
|
||||
&flush_scheduler_, write_options.ignore_missing_column_families,
|
||||
0 /*log_number*/, this);
|
||||
|
||||
if (status.ok()) {
|
||||
// There were no write failures. Set leader's status
|
||||
// in case the write callback returned a non-ok status.
|
||||
status = w.FinalStatus();
|
||||
}
|
||||
|
||||
} else {
|
||||
WriteThread::ParallelGroup pg;
|
||||
pg.leader = &w;
|
||||
pg.last_writer = last_writer;
|
||||
pg.last_sequence = last_sequence;
|
||||
pg.early_exit_allowed = !need_log_sync;
|
||||
pg.running.store(static_cast<uint32_t>(write_group.size()),
|
||||
std::memory_order_relaxed);
|
||||
write_thread_.LaunchParallelFollowers(&pg, current_sequence);
|
||||
|
||||
if (w.ShouldWriteToMemtable()) {
|
||||
// do leader write
|
||||
ColumnFamilyMemTablesImpl column_family_memtables(
|
||||
versions_->GetColumnFamilySet());
|
||||
assert(w.sequence == current_sequence);
|
||||
WriteBatchInternal::SetSequence(w.batch, w.sequence);
|
||||
w.status = WriteBatchInternal::InsertInto(
|
||||
&w, &column_family_memtables, &flush_scheduler_,
|
||||
write_options.ignore_missing_column_families, 0 /*log_number*/,
|
||||
this, true /*concurrent_memtable_writes*/);
|
||||
}
|
||||
|
||||
// CompleteParallelWorker returns true if this thread should
|
||||
// handle exit, false means somebody else did
|
||||
exit_completed_early = !write_thread_.CompleteParallelWorker(&w);
|
||||
status = w.FinalStatus();
|
||||
}
|
||||
|
||||
if (!exit_completed_early && w.status.ok()) {
|
||||
versions_->SetLastSequence(last_sequence);
|
||||
if (!need_log_sync) {
|
||||
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
|
||||
exit_completed_early = true;
|
||||
}
|
||||
}
|
||||
|
||||
// A non-OK status here indicates that the state implied by the
|
||||
// WAL has diverged from the in-memory state. This could be
|
||||
// because of a corrupt write_batch (very bad), or because the
|
||||
// client specified an invalid column family and didn't specify
|
||||
// ignore_missing_column_families.
|
||||
//
|
||||
// Is setting bg_error_ enough here? This will at least stop
|
||||
// compaction and fail any further writes.
|
||||
if (!status.ok() && bg_error_.ok() && !w.CallbackFailed()) {
|
||||
bg_error_ = status;
|
||||
}
|
||||
}
|
||||
}
|
||||
PERF_TIMER_START(write_pre_and_post_process_time);
|
||||
|
||||
if (immutable_db_options_.paranoid_checks && !status.ok() &&
|
||||
!w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) {
|
||||
mutex_.Lock();
|
||||
if (bg_error_.ok()) {
|
||||
bg_error_ = status; // stop compaction & fail any further writes
|
||||
}
|
||||
mutex_.Unlock();
|
||||
}
|
||||
|
||||
if (logs_getting_synced) {
|
||||
mutex_.Lock();
|
||||
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
|
||||
mutex_.Unlock();
|
||||
}
|
||||
|
||||
if (!exit_completed_early) {
|
||||
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status);
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
|
||||
bool need_log_sync, bool* logs_getting_synced,
|
||||
WriteContext* write_context) {
|
||||
mutex_.AssertHeld();
|
||||
assert(write_context != nullptr && logs_getting_synced != nullptr);
|
||||
Status status;
|
||||
|
||||
assert(!single_column_family_mode_ ||
|
||||
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1);
|
||||
if (UNLIKELY(status.ok() && !single_column_family_mode_ &&
|
||||
total_log_size_ > GetMaxTotalWalSize())) {
|
||||
status = HandleWALFull(write_context);
|
||||
}
|
||||
|
||||
if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) {
|
||||
// Before a new memtable is added in SwitchMemtable(),
|
||||
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
|
||||
// thread is writing to another DB with the same write buffer, they may also
|
||||
// be flushed. We may end up with flushing much more DBs than needed. It's
|
||||
// suboptimal but still correct.
|
||||
status = HandleWriteBufferFull(write_context);
|
||||
}
|
||||
|
||||
if (UNLIKELY(status.ok() && !bg_error_.ok())) {
|
||||
status = bg_error_;
|
||||
}
|
||||
|
||||
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
|
||||
status = ScheduleFlushes(write_context);
|
||||
}
|
||||
|
||||
if (UNLIKELY(status.ok() && (write_controller_.IsStopped() ||
|
||||
write_controller_.NeedsDelay()))) {
|
||||
PERF_TIMER_GUARD(write_delay_time);
|
||||
// We don't know size of curent batch so that we always use the size
|
||||
// for previous one. It might create a fairness issue that expiration
|
||||
// might happen for smaller writes but larger writes can go through.
|
||||
// Can optimize it if it is an issue.
|
||||
status = DelayWrite(last_batch_group_size_, write_options);
|
||||
}
|
||||
|
||||
if (status.ok() && need_log_sync) {
|
||||
while (logs_.front().getting_synced) {
|
||||
log_sync_cv_.Wait();
|
||||
}
|
||||
for (auto& log : logs_) {
|
||||
assert(!log.getting_synced);
|
||||
log.getting_synced = true;
|
||||
}
|
||||
*logs_getting_synced = true;
|
||||
}
|
||||
|
||||
return status;
|
||||
}
|
||||
|
||||
Status DBImpl::WriteToWAL(const autovector<WriteThread::Writer*>& write_group,
|
||||
log::Writer* log_writer, bool need_log_sync,
|
||||
bool need_log_dir_sync, SequenceNumber sequence) {
|
||||
Status status;
|
||||
|
||||
WriteBatch* merged_batch = nullptr;
|
||||
size_t write_with_wal = 0;
|
||||
if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL() &&
|
||||
write_group[0]->batch->GetWalTerminationPoint().is_cleared()) {
|
||||
// we simply write the first WriteBatch to WAL if the group only
|
||||
// contains one batch, that batch should be written to the WAL,
|
||||
// and the batch is not wanting to be truncated
|
||||
merged_batch = write_group[0]->batch;
|
||||
write_group[0]->log_used = logfile_number_;
|
||||
write_with_wal = 1;
|
||||
} else {
|
||||
// WAL needs all of the batches flattened into a single batch.
|
||||
// We could avoid copying here with an iov-like AddRecord
|
||||
// interface
|
||||
merged_batch = &tmp_batch_;
|
||||
for (auto writer : write_group) {
|
||||
if (writer->ShouldWriteToWAL()) {
|
||||
WriteBatchInternal::Append(merged_batch, writer->batch,
|
||||
/*WAL_only*/ true);
|
||||
write_with_wal++;
|
||||
}
|
||||
writer->log_used = logfile_number_;
|
||||
}
|
||||
}
|
||||
|
||||
WriteBatchInternal::SetSequence(merged_batch, sequence);
|
||||
|
||||
Slice log_entry = WriteBatchInternal::Contents(merged_batch);
|
||||
status = log_writer->AddRecord(log_entry);
|
||||
total_log_size_ += log_entry.size();
|
||||
alive_log_files_.back().AddSize(log_entry.size());
|
||||
log_empty_ = false;
|
||||
uint64_t log_size = log_entry.size();
|
||||
|
||||
if (status.ok() && need_log_sync) {
|
||||
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
|
||||
// It's safe to access logs_ with unlocked mutex_ here because:
|
||||
// - we've set getting_synced=true for all logs,
|
||||
// so other threads won't pop from logs_ while we're here,
|
||||
// - only writer thread can push to logs_, and we're in
|
||||
// writer thread, so no one will push to logs_,
|
||||
// - as long as other threads don't modify it, it's safe to read
|
||||
// from std::deque from multiple threads concurrently.
|
||||
for (auto& log : logs_) {
|
||||
status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (status.ok() && need_log_dir_sync) {
|
||||
// We only sync WAL directory the first time WAL syncing is
|
||||
// requested, so that in case users never turn on WAL sync,
|
||||
// we can avoid the disk I/O in the write code path.
|
||||
status = directories_.GetWalDir()->Fsync();
|
||||
}
|
||||
}
|
||||
|
||||
if (merged_batch == &tmp_batch_) {
|
||||
tmp_batch_.Clear();
|
||||
}
|
||||
if (status.ok()) {
|
||||
auto stats = default_cf_internal_stats_;
|
||||
if (need_log_sync) {
|
||||
stats->AddDBStats(InternalStats::WAL_FILE_SYNCED, 1);
|
||||
RecordTick(stats_, WAL_FILE_SYNCED);
|
||||
}
|
||||
stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
|
||||
RecordTick(stats_, WAL_FILE_BYTES, log_size);
|
||||
stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal);
|
||||
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
Status DBImpl::HandleWALFull(WriteContext* write_context) {
|
||||
mutex_.AssertHeld();
|
||||
assert(write_context != nullptr);
|
||||
Status status;
|
||||
|
||||
if (alive_log_files_.begin()->getting_flushed) {
|
||||
return status;
|
||||
}
|
||||
|
||||
auto oldest_alive_log = alive_log_files_.begin()->number;
|
||||
auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep();
|
||||
|
||||
if (allow_2pc() &&
|
||||
oldest_log_with_uncommited_prep > 0 &&
|
||||
oldest_log_with_uncommited_prep <= oldest_alive_log) {
|
||||
if (unable_to_flush_oldest_log_) {
|
||||
// we already attempted to flush all column families dependent on
|
||||
// the oldest alive log but the log still contained uncommited transactions.
|
||||
// the oldest alive log STILL contains uncommited transaction so there
|
||||
// is still nothing that we can do.
|
||||
return status;
|
||||
} else {
|
||||
ROCKS_LOG_WARN(
|
||||
immutable_db_options_.info_log,
|
||||
"Unable to release oldest log due to uncommited transaction");
|
||||
unable_to_flush_oldest_log_ = true;
|
||||
}
|
||||
} else {
|
||||
// we only mark this log as getting flushed if we have successfully
|
||||
// flushed all data in this log. If this log contains outstanding prepared
|
||||
// transactions then we cannot flush this log until those transactions are commited.
|
||||
unable_to_flush_oldest_log_ = false;
|
||||
alive_log_files_.begin()->getting_flushed = true;
|
||||
}
|
||||
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"Flushing all column families with data in WAL number %" PRIu64
|
||||
". Total log size is %" PRIu64
|
||||
" while max_total_wal_size is %" PRIu64,
|
||||
oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
|
||||
// no need to refcount because drop is happening in write thread, so can't
|
||||
// happen while we're in the write thread
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
|
||||
status = SwitchMemtable(cfd, write_context);
|
||||
if (!status.ok()) {
|
||||
break;
|
||||
}
|
||||
cfd->imm()->FlushRequested();
|
||||
SchedulePendingFlush(cfd);
|
||||
}
|
||||
}
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
return status;
|
||||
}
|
||||
|
||||
Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
|
||||
mutex_.AssertHeld();
|
||||
assert(write_context != nullptr);
|
||||
Status status;
|
||||
|
||||
// Before a new memtable is added in SwitchMemtable(),
|
||||
// write_buffer_manager_->ShouldFlush() will keep returning true. If another
|
||||
// thread is writing to another DB with the same write buffer, they may also
|
||||
// be flushed. We may end up with flushing much more DBs than needed. It's
|
||||
// suboptimal but still correct.
|
||||
ROCKS_LOG_INFO(
|
||||
immutable_db_options_.info_log,
|
||||
"Flushing column family with largest mem table size. Write buffer is "
|
||||
"using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
|
||||
write_buffer_manager_->memory_usage(),
|
||||
write_buffer_manager_->buffer_size());
|
||||
// no need to refcount because drop is happening in write thread, so can't
|
||||
// happen while we're in the write thread
|
||||
ColumnFamilyData* cfd_picked = nullptr;
|
||||
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
|
||||
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
if (cfd->IsDropped()) {
|
||||
continue;
|
||||
}
|
||||
if (!cfd->mem()->IsEmpty()) {
|
||||
// We only consider active mem table, hoping immutable memtable is
|
||||
// already in the process of flushing.
|
||||
uint64_t seq = cfd->mem()->GetCreationSeq();
|
||||
if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
|
||||
cfd_picked = cfd;
|
||||
seq_num_for_cf_picked = seq;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (cfd_picked != nullptr) {
|
||||
status = SwitchMemtable(cfd_picked, write_context);
|
||||
if (status.ok()) {
|
||||
cfd_picked->imm()->FlushRequested();
|
||||
SchedulePendingFlush(cfd_picked);
|
||||
MaybeScheduleFlushOrCompaction();
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
uint64_t DBImpl::GetMaxTotalWalSize() const {
|
||||
mutex_.AssertHeld();
|
||||
return mutable_db_options_.max_total_wal_size == 0
|
||||
? 4 * max_total_in_memory_state_
|
||||
: mutable_db_options_.max_total_wal_size;
|
||||
}
|
||||
|
||||
// REQUIRES: mutex_ is held
|
||||
// REQUIRES: this thread is currently at the front of the writer queue
|
||||
Status DBImpl::DelayWrite(uint64_t num_bytes,
|
||||
const WriteOptions& write_options) {
|
||||
uint64_t time_delayed = 0;
|
||||
bool delayed = false;
|
||||
{
|
||||
StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
|
||||
uint64_t delay = write_controller_.GetDelay(env_, num_bytes);
|
||||
if (delay > 0) {
|
||||
if (write_options.no_slowdown) {
|
||||
return Status::Incomplete();
|
||||
}
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
|
||||
|
||||
mutex_.Unlock();
|
||||
// We will delay the write until we have slept for delay ms or
|
||||
// we don't need a delay anymore
|
||||
const uint64_t kDelayInterval = 1000;
|
||||
uint64_t stall_end = sw.start_time() + delay;
|
||||
while (write_controller_.NeedsDelay()) {
|
||||
if (env_->NowMicros() >= stall_end) {
|
||||
// We already delayed this write `delay` microseconds
|
||||
break;
|
||||
}
|
||||
|
||||
delayed = true;
|
||||
// Sleep for 0.001 seconds
|
||||
env_->SleepForMicroseconds(kDelayInterval);
|
||||
}
|
||||
mutex_.Lock();
|
||||
}
|
||||
|
||||
while (bg_error_.ok() && write_controller_.IsStopped()) {
|
||||
if (write_options.no_slowdown) {
|
||||
return Status::Incomplete();
|
||||
}
|
||||
delayed = true;
|
||||
TEST_SYNC_POINT("DBImpl::DelayWrite:Wait");
|
||||
bg_cv_.Wait();
|
||||
}
|
||||
}
|
||||
assert(!delayed || !write_options.no_slowdown);
|
||||
if (delayed) {
|
||||
default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_STALL_MICROS,
|
||||
time_delayed);
|
||||
RecordTick(stats_, STALL_MICROS, time_delayed);
|
||||
}
|
||||
|
||||
return bg_error_;
|
||||
}
|
||||
|
||||
Status DBImpl::ScheduleFlushes(WriteContext* context) {
|
||||
ColumnFamilyData* cfd;
|
||||
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
|
||||
auto status = SwitchMemtable(cfd, context);
|
||||
if (cfd->Unref()) {
|
||||
delete cfd;
|
||||
}
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* cfd,
|
||||
const MemTableInfo& mem_table_info) {
|
||||
if (immutable_db_options_.listeners.size() == 0U) {
|
||||
return;
|
||||
}
|
||||
if (shutting_down_.load(std::memory_order_acquire)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto listener : immutable_db_options_.listeners) {
|
||||
listener->OnMemTableSealed(mem_table_info);
|
||||
}
|
||||
}
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
// REQUIRES: mutex_ is held
|
||||
// REQUIRES: this thread is currently at the front of the writer queue
|
||||
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
|
||||
mutex_.AssertHeld();
|
||||
unique_ptr<WritableFile> lfile;
|
||||
log::Writer* new_log = nullptr;
|
||||
MemTable* new_mem = nullptr;
|
||||
|
||||
// Attempt to switch to a new memtable and trigger flush of old.
|
||||
// Do this without holding the dbmutex lock.
|
||||
assert(versions_->prev_log_number() == 0);
|
||||
bool creating_new_log = !log_empty_;
|
||||
uint64_t recycle_log_number = 0;
|
||||
if (creating_new_log && immutable_db_options_.recycle_log_file_num &&
|
||||
!log_recycle_files.empty()) {
|
||||
recycle_log_number = log_recycle_files.front();
|
||||
log_recycle_files.pop_front();
|
||||
}
|
||||
uint64_t new_log_number =
|
||||
creating_new_log ? versions_->NewFileNumber() : logfile_number_;
|
||||
SuperVersion* new_superversion = nullptr;
|
||||
const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions();
|
||||
|
||||
// Set current_memtble_info for memtable sealed callback
|
||||
#ifndef ROCKSDB_LITE
|
||||
MemTableInfo memtable_info;
|
||||
memtable_info.cf_name = cfd->GetName();
|
||||
memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber();
|
||||
memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber();
|
||||
memtable_info.num_entries = cfd->mem()->num_entries();
|
||||
memtable_info.num_deletes = cfd->mem()->num_deletes();
|
||||
#endif // ROCKSDB_LITE
|
||||
// Log this later after lock release. It may be outdated, e.g., if background
|
||||
// flush happens before logging, but that should be ok.
|
||||
int num_imm_unflushed = cfd->imm()->NumNotFlushed();
|
||||
DBOptions db_options =
|
||||
BuildDBOptions(immutable_db_options_, mutable_db_options_);
|
||||
const auto preallocate_block_size =
|
||||
GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
|
||||
mutex_.Unlock();
|
||||
Status s;
|
||||
{
|
||||
if (creating_new_log) {
|
||||
EnvOptions opt_env_opt =
|
||||
env_->OptimizeForLogWrite(env_options_, db_options);
|
||||
if (recycle_log_number) {
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"reusing log %" PRIu64 " from recycle list\n",
|
||||
recycle_log_number);
|
||||
s = env_->ReuseWritableFile(
|
||||
LogFileName(immutable_db_options_.wal_dir, new_log_number),
|
||||
LogFileName(immutable_db_options_.wal_dir, recycle_log_number),
|
||||
&lfile, opt_env_opt);
|
||||
} else {
|
||||
s = NewWritableFile(
|
||||
env_, LogFileName(immutable_db_options_.wal_dir, new_log_number),
|
||||
&lfile, opt_env_opt);
|
||||
}
|
||||
if (s.ok()) {
|
||||
// Our final size should be less than write_buffer_size
|
||||
// (compression, etc) but err on the side of caution.
|
||||
|
||||
// use preallocate_block_size instead
|
||||
// of calling GetWalPreallocateBlockSize()
|
||||
lfile->SetPreallocationBlockSize(preallocate_block_size);
|
||||
unique_ptr<WritableFileWriter> file_writer(
|
||||
new WritableFileWriter(std::move(lfile), opt_env_opt));
|
||||
new_log =
|
||||
new log::Writer(std::move(file_writer), new_log_number,
|
||||
immutable_db_options_.recycle_log_file_num > 0);
|
||||
}
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
SequenceNumber seq = versions_->LastSequence();
|
||||
new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
|
||||
new_superversion = new SuperVersion();
|
||||
}
|
||||
|
||||
#ifndef ROCKSDB_LITE
|
||||
// PLEASE NOTE: We assume that there are no failable operations
|
||||
// after lock is acquired below since we are already notifying
|
||||
// client about mem table becoming immutable.
|
||||
NotifyOnMemTableSealed(cfd, memtable_info);
|
||||
#endif //ROCKSDB_LITE
|
||||
}
|
||||
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
||||
"[%s] New memtable created with log file: #%" PRIu64
|
||||
". Immutable memtables: %d.\n",
|
||||
cfd->GetName().c_str(), new_log_number, num_imm_unflushed);
|
||||
mutex_.Lock();
|
||||
if (!s.ok()) {
|
||||
// how do we fail if we're not creating new log?
|
||||
assert(creating_new_log);
|
||||
assert(!new_mem);
|
||||
assert(!new_log);
|
||||
return s;
|
||||
}
|
||||
if (creating_new_log) {
|
||||
logfile_number_ = new_log_number;
|
||||
assert(new_log != nullptr);
|
||||
log_empty_ = true;
|
||||
log_dir_synced_ = false;
|
||||
logs_.emplace_back(logfile_number_, new_log);
|
||||
alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
|
||||
}
|
||||
for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
|
||||
// all this is just optimization to delete logs that
|
||||
// are no longer needed -- if CF is empty, that means it
|
||||
// doesn't need that particular log to stay alive, so we just
|
||||
// advance the log number. no need to persist this in the manifest
|
||||
if (loop_cfd->mem()->GetFirstSequenceNumber() == 0 &&
|
||||
loop_cfd->imm()->NumNotFlushed() == 0) {
|
||||
if (creating_new_log) {
|
||||
loop_cfd->SetLogNumber(logfile_number_);
|
||||
}
|
||||
loop_cfd->mem()->SetCreationSeq(versions_->LastSequence());
|
||||
}
|
||||
}
|
||||
|
||||
cfd->mem()->SetNextLogNumber(logfile_number_);
|
||||
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
|
||||
new_mem->Ref();
|
||||
cfd->SetMemtable(new_mem);
|
||||
context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork(
|
||||
cfd, new_superversion, mutable_cf_options));
|
||||
return s;
|
||||
}
|
||||
|
||||
size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
|
||||
mutex_.AssertHeld();
|
||||
size_t bsize = write_buffer_size / 10 + write_buffer_size;
|
||||
// Some users might set very high write_buffer_size and rely on
|
||||
// max_total_wal_size or other parameters to control the WAL size.
|
||||
if (mutable_db_options_.max_total_wal_size > 0) {
|
||||
bsize = std::min<size_t>(bsize, mutable_db_options_.max_total_wal_size);
|
||||
}
|
||||
if (immutable_db_options_.db_write_buffer_size > 0) {
|
||||
bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);
|
||||
}
|
||||
if (immutable_db_options_.write_buffer_manager &&
|
||||
immutable_db_options_.write_buffer_manager->enabled()) {
|
||||
bsize = std::min<size_t>(
|
||||
bsize, immutable_db_options_.write_buffer_manager->buffer_size());
|
||||
}
|
||||
|
||||
return bsize;
|
||||
}
|
||||
|
||||
// Default implementations of convenience methods that subclasses of DB
|
||||
// can call if they wish
|
||||
Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& value) {
|
||||
// Pre-allocate size of write batch conservatively.
|
||||
// 8 bytes are taken by header, 4 bytes for count, 1 byte for type,
|
||||
// and we allocate 11 extra bytes for key length, as well as value length.
|
||||
WriteBatch batch(key.size() + value.size() + 24);
|
||||
batch.Put(column_family, key, value);
|
||||
return Write(opt, &batch);
|
||||
}
|
||||
|
||||
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
|
||||
const Slice& key) {
|
||||
WriteBatch batch;
|
||||
batch.Delete(column_family, key);
|
||||
return Write(opt, &batch);
|
||||
}
|
||||
|
||||
Status DB::SingleDelete(const WriteOptions& opt,
|
||||
ColumnFamilyHandle* column_family, const Slice& key) {
|
||||
WriteBatch batch;
|
||||
batch.SingleDelete(column_family, key);
|
||||
return Write(opt, &batch);
|
||||
}
|
||||
|
||||
Status DB::DeleteRange(const WriteOptions& opt,
|
||||
ColumnFamilyHandle* column_family,
|
||||
const Slice& begin_key, const Slice& end_key) {
|
||||
WriteBatch batch;
|
||||
batch.DeleteRange(column_family, begin_key, end_key);
|
||||
return Write(opt, &batch);
|
||||
}
|
||||
|
||||
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
|
||||
const Slice& key, const Slice& value) {
|
||||
WriteBatch batch;
|
||||
batch.Merge(column_family, key, value);
|
||||
return Write(opt, &batch);
|
||||
}
|
||||
} // namespace rocksdb
|
Loading…
x
Reference in New Issue
Block a user