3c327ac2d0
Summary: Closes https://github.com/facebook/rocksdb/pull/2589 Differential Revision: D5431502 Pulled By: siying fbshipit-source-id: 8ebf8c87883daa9daa54b2303d11ce01ab1f6f75
1130 lines
40 KiB
C++
1130 lines
40 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#include "db/db_impl.h"
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
#include <inttypes.h>
|
|
|
|
#include "db/builder.h"
|
|
#include "options/options_helper.h"
|
|
#include "rocksdb/wal_filter.h"
|
|
#include "table/block_based_table_factory.h"
|
|
#include "util/rate_limiter.h"
|
|
#include "util/sst_file_manager_impl.h"
|
|
#include "util/sync_point.h"
|
|
|
|
namespace rocksdb {
|
|
Options SanitizeOptions(const std::string& dbname,
|
|
const Options& src) {
|
|
auto db_options = SanitizeOptions(dbname, DBOptions(src));
|
|
ImmutableDBOptions immutable_db_options(db_options);
|
|
auto cf_options =
|
|
SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
|
|
return Options(db_options, cf_options);
|
|
}
|
|
|
|
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
|
|
DBOptions result(src);
|
|
|
|
// result.max_open_files means an "infinite" open files.
|
|
if (result.max_open_files != -1) {
|
|
int max_max_open_files = port::GetMaxOpenFiles();
|
|
if (max_max_open_files == -1) {
|
|
max_max_open_files = 0x400000;
|
|
}
|
|
ClipToRange(&result.max_open_files, 20, max_max_open_files);
|
|
}
|
|
|
|
if (result.info_log == nullptr) {
|
|
Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
|
|
if (!s.ok()) {
|
|
// No place suitable for logging
|
|
result.info_log = nullptr;
|
|
}
|
|
}
|
|
|
|
if (!result.write_buffer_manager) {
|
|
result.write_buffer_manager.reset(
|
|
new WriteBufferManager(result.db_write_buffer_size));
|
|
}
|
|
auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes,
|
|
result.max_background_compactions,
|
|
result.max_background_jobs,
|
|
true /* parallelize_compactions */);
|
|
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
|
|
Env::Priority::LOW);
|
|
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
|
|
Env::Priority::HIGH);
|
|
|
|
if (result.rate_limiter.get() != nullptr) {
|
|
if (result.bytes_per_sync == 0) {
|
|
result.bytes_per_sync = 1024 * 1024;
|
|
}
|
|
}
|
|
|
|
if (result.delayed_write_rate == 0) {
|
|
if (result.rate_limiter.get() != nullptr) {
|
|
result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
|
|
}
|
|
if (result.delayed_write_rate == 0) {
|
|
result.delayed_write_rate = 16 * 1024 * 1024;
|
|
}
|
|
}
|
|
|
|
if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
|
|
result.recycle_log_file_num = false;
|
|
}
|
|
|
|
if (result.recycle_log_file_num &&
|
|
(result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery ||
|
|
result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
|
|
// kPointInTimeRecovery is indistinguishable from
|
|
// kTolerateCorruptedTailRecords in recycle mode since we define
|
|
// the "end" of the log as the first corrupt record we encounter.
|
|
// kAbsoluteConsistency doesn't make sense because even a clean
|
|
// shutdown leaves old junk at the end of the log file.
|
|
result.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
|
|
}
|
|
|
|
if (result.wal_dir.empty()) {
|
|
// Use dbname as default
|
|
result.wal_dir = dbname;
|
|
}
|
|
if (result.wal_dir.back() == '/') {
|
|
result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
|
|
}
|
|
|
|
if (result.db_paths.size() == 0) {
|
|
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
|
|
}
|
|
|
|
if (result.use_direct_io_for_flush_and_compaction &&
|
|
result.compaction_readahead_size == 0) {
|
|
TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
|
|
result.compaction_readahead_size = 1024 * 1024 * 2;
|
|
}
|
|
|
|
if (result.compaction_readahead_size > 0 ||
|
|
result.use_direct_io_for_flush_and_compaction) {
|
|
result.new_table_reader_for_compaction_inputs = true;
|
|
}
|
|
|
|
// Force flush on DB open if 2PC is enabled, since with 2PC we have no
|
|
// guarantee that consecutive log files have consecutive sequence id, which
|
|
// make recovery complicated.
|
|
if (result.allow_2pc) {
|
|
result.avoid_flush_during_recovery = false;
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
namespace {
|
|
|
|
Status SanitizeOptionsByTable(
|
|
const DBOptions& db_opts,
|
|
const std::vector<ColumnFamilyDescriptor>& column_families) {
|
|
Status s;
|
|
for (auto cf : column_families) {
|
|
s = cf.options.table_factory->SanitizeOptions(db_opts, cf.options);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
static Status ValidateOptions(
|
|
const DBOptions& db_options,
|
|
const std::vector<ColumnFamilyDescriptor>& column_families) {
|
|
Status s;
|
|
|
|
for (auto& cfd : column_families) {
|
|
s = CheckCompressionSupported(cfd.options);
|
|
if (s.ok() && db_options.allow_concurrent_memtable_write) {
|
|
s = CheckConcurrentWritesSupported(cfd.options);
|
|
}
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (db_options.db_paths.size() > 1) {
|
|
if ((cfd.options.compaction_style != kCompactionStyleUniversal) &&
|
|
(cfd.options.compaction_style != kCompactionStyleLevel)) {
|
|
return Status::NotSupported(
|
|
"More than one DB paths are only supported in "
|
|
"universal and level compaction styles. ");
|
|
}
|
|
}
|
|
if (cfd.options.compaction_options_fifo.ttl > 0) {
|
|
if (db_options.max_open_files != -1) {
|
|
return Status::NotSupported(
|
|
"FIFO Compaction with TTL is only supported when files are always "
|
|
"kept open (set max_open_files = -1). ");
|
|
}
|
|
if (cfd.options.table_factory->Name() !=
|
|
BlockBasedTableFactory().Name()) {
|
|
return Status::NotSupported(
|
|
"FIFO Compaction with TTL is only supported in "
|
|
"Block-Based Table format. ");
|
|
}
|
|
}
|
|
}
|
|
|
|
if (db_options.db_paths.size() > 4) {
|
|
return Status::NotSupported(
|
|
"More than four DB paths are not supported yet. ");
|
|
}
|
|
|
|
if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
|
|
// Protect against assert in PosixMMapReadableFile constructor
|
|
return Status::NotSupported(
|
|
"If memory mapped reads (allow_mmap_reads) are enabled "
|
|
"then direct I/O reads (use_direct_reads) must be disabled. ");
|
|
}
|
|
|
|
if (db_options.allow_mmap_writes &&
|
|
db_options.use_direct_io_for_flush_and_compaction) {
|
|
return Status::NotSupported(
|
|
"If memory mapped writes (allow_mmap_writes) are enabled "
|
|
"then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
|
|
"be disabled. ");
|
|
}
|
|
|
|
if (db_options.keep_log_file_num == 0) {
|
|
return Status::InvalidArgument("keep_log_file_num must be greater than 0");
|
|
}
|
|
|
|
return Status::OK();
|
|
}
|
|
} // namespace
|
|
Status DBImpl::NewDB() {
|
|
VersionEdit new_db;
|
|
new_db.SetLogNumber(0);
|
|
new_db.SetNextFile(2);
|
|
new_db.SetLastSequence(0);
|
|
|
|
Status s;
|
|
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
|
|
const std::string manifest = DescriptorFileName(dbname_, 1);
|
|
{
|
|
unique_ptr<WritableFile> file;
|
|
EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
|
|
s = NewWritableFile(env_, manifest, &file, env_options);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
file->SetPreallocationBlockSize(
|
|
immutable_db_options_.manifest_preallocation_size);
|
|
unique_ptr<WritableFileWriter> file_writer(
|
|
new WritableFileWriter(std::move(file), env_options));
|
|
log::Writer log(std::move(file_writer), 0, false);
|
|
std::string record;
|
|
new_db.EncodeTo(&record);
|
|
s = log.AddRecord(record);
|
|
if (s.ok()) {
|
|
s = SyncManifest(env_, &immutable_db_options_, log.file());
|
|
}
|
|
}
|
|
if (s.ok()) {
|
|
// Make "CURRENT" file that points to the new manifest file.
|
|
s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
|
|
} else {
|
|
env_->DeleteFile(manifest);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DBImpl::Directories::CreateAndNewDirectory(
|
|
Env* env, const std::string& dirname,
|
|
std::unique_ptr<Directory>* directory) const {
|
|
// We call CreateDirIfMissing() as the directory may already exist (if we
|
|
// are reopening a DB), when this happens we don't want creating the
|
|
// directory to cause an error. However, we need to check if creating the
|
|
// directory fails or else we may get an obscure message about the lock
|
|
// file not existing. One real-world example of this occurring is if
|
|
// env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
|
|
// when dbname_ is "dir/db" but when "dir" doesn't exist.
|
|
Status s = env->CreateDirIfMissing(dirname);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
return env->NewDirectory(dirname, directory);
|
|
}
|
|
|
|
Status DBImpl::Directories::SetDirectories(
|
|
Env* env, const std::string& dbname, const std::string& wal_dir,
|
|
const std::vector<DbPath>& data_paths) {
|
|
Status s = CreateAndNewDirectory(env, dbname, &db_dir_);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
if (!wal_dir.empty() && dbname != wal_dir) {
|
|
s = CreateAndNewDirectory(env, wal_dir, &wal_dir_);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
|
|
data_dirs_.clear();
|
|
for (auto& p : data_paths) {
|
|
const std::string db_path = p.path;
|
|
if (db_path == dbname) {
|
|
data_dirs_.emplace_back(nullptr);
|
|
} else {
|
|
std::unique_ptr<Directory> path_directory;
|
|
s = CreateAndNewDirectory(env, db_path, &path_directory);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
data_dirs_.emplace_back(path_directory.release());
|
|
}
|
|
}
|
|
assert(data_dirs_.size() == data_paths.size());
|
|
return Status::OK();
|
|
}
|
|
|
|
Status DBImpl::Recover(
|
|
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
|
|
bool error_if_log_file_exist, bool error_if_data_exists_in_logs) {
|
|
mutex_.AssertHeld();
|
|
|
|
bool is_new_db = false;
|
|
assert(db_lock_ == nullptr);
|
|
if (!read_only) {
|
|
Status s = directories_.SetDirectories(env_, dbname_,
|
|
immutable_db_options_.wal_dir,
|
|
immutable_db_options_.db_paths);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
s = env_->LockFile(LockFileName(dbname_), &db_lock_);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
s = env_->FileExists(CurrentFileName(dbname_));
|
|
if (s.IsNotFound()) {
|
|
if (immutable_db_options_.create_if_missing) {
|
|
s = NewDB();
|
|
is_new_db = true;
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
} else {
|
|
return Status::InvalidArgument(
|
|
dbname_, "does not exist (create_if_missing is false)");
|
|
}
|
|
} else if (s.ok()) {
|
|
if (immutable_db_options_.error_if_exists) {
|
|
return Status::InvalidArgument(
|
|
dbname_, "exists (error_if_exists is true)");
|
|
}
|
|
} else {
|
|
// Unexpected error reading file
|
|
assert(s.IsIOError());
|
|
return s;
|
|
}
|
|
// Check for the IDENTITY file and create it if not there
|
|
s = env_->FileExists(IdentityFileName(dbname_));
|
|
if (s.IsNotFound()) {
|
|
s = SetIdentityFile(env_, dbname_);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
} else if (!s.ok()) {
|
|
assert(s.IsIOError());
|
|
return s;
|
|
}
|
|
}
|
|
|
|
Status s = versions_->Recover(column_families, read_only);
|
|
if (immutable_db_options_.paranoid_checks && s.ok()) {
|
|
s = CheckConsistency();
|
|
}
|
|
if (s.ok()) {
|
|
SequenceNumber next_sequence(kMaxSequenceNumber);
|
|
default_cf_handle_ = new ColumnFamilyHandleImpl(
|
|
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
|
|
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
|
|
single_column_family_mode_ =
|
|
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
|
|
|
|
// Recover from all newer log files than the ones named in the
|
|
// descriptor (new log files may have been added by the previous
|
|
// incarnation without registering them in the descriptor).
|
|
//
|
|
// Note that prev_log_number() is no longer used, but we pay
|
|
// attention to it in case we are recovering a database
|
|
// produced by an older version of rocksdb.
|
|
std::vector<std::string> filenames;
|
|
s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
std::vector<uint64_t> logs;
|
|
for (size_t i = 0; i < filenames.size(); i++) {
|
|
uint64_t number;
|
|
FileType type;
|
|
if (ParseFileName(filenames[i], &number, &type) && type == kLogFile) {
|
|
if (is_new_db) {
|
|
return Status::Corruption(
|
|
"While creating a new Db, wal_dir contains "
|
|
"existing log file: ",
|
|
filenames[i]);
|
|
} else {
|
|
logs.push_back(number);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (logs.size() > 0) {
|
|
if (error_if_log_file_exist) {
|
|
return Status::Corruption(
|
|
"The db was opened in readonly mode with error_if_log_file_exist"
|
|
"flag but a log file already exists");
|
|
} else if (error_if_data_exists_in_logs) {
|
|
for (auto& log : logs) {
|
|
std::string fname = LogFileName(immutable_db_options_.wal_dir, log);
|
|
uint64_t bytes;
|
|
s = env_->GetFileSize(fname, &bytes);
|
|
if (s.ok()) {
|
|
if (bytes > 0) {
|
|
return Status::Corruption(
|
|
"error_if_data_exists_in_logs is set but there are data "
|
|
" in log files.");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!logs.empty()) {
|
|
// Recover in the order in which the logs were generated
|
|
std::sort(logs.begin(), logs.end());
|
|
s = RecoverLogFiles(logs, &next_sequence, read_only);
|
|
if (!s.ok()) {
|
|
// Clear memtables if recovery failed
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
|
|
kMaxSequenceNumber);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Initial value
|
|
max_total_in_memory_state_ = 0;
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
|
|
max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
|
|
mutable_cf_options->max_write_buffer_number;
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
// REQUIRES: log_numbers are sorted in ascending order
|
|
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
|
|
SequenceNumber* next_sequence, bool read_only) {
|
|
struct LogReporter : public log::Reader::Reporter {
|
|
Env* env;
|
|
Logger* info_log;
|
|
const char* fname;
|
|
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
|
|
virtual void Corruption(size_t bytes, const Status& s) override {
|
|
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
|
|
(this->status == nullptr ? "(ignoring error) " : ""),
|
|
fname, static_cast<int>(bytes), s.ToString().c_str());
|
|
if (this->status != nullptr && this->status->ok()) {
|
|
*this->status = s;
|
|
}
|
|
}
|
|
};
|
|
|
|
mutex_.AssertHeld();
|
|
Status status;
|
|
std::unordered_map<int, VersionEdit> version_edits;
|
|
// no need to refcount because iteration is under mutex
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
VersionEdit edit;
|
|
edit.SetColumnFamily(cfd->GetID());
|
|
version_edits.insert({cfd->GetID(), edit});
|
|
}
|
|
int job_id = next_job_id_.fetch_add(1);
|
|
{
|
|
auto stream = event_logger_.Log();
|
|
stream << "job" << job_id << "event"
|
|
<< "recovery_started";
|
|
stream << "log_files";
|
|
stream.StartArray();
|
|
for (auto log_number : log_numbers) {
|
|
stream << log_number;
|
|
}
|
|
stream.EndArray();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
if (immutable_db_options_.wal_filter != nullptr) {
|
|
std::map<std::string, uint32_t> cf_name_id_map;
|
|
std::map<uint32_t, uint64_t> cf_lognumber_map;
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
cf_name_id_map.insert(
|
|
std::make_pair(cfd->GetName(), cfd->GetID()));
|
|
cf_lognumber_map.insert(
|
|
std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
|
|
}
|
|
|
|
immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
|
|
cf_name_id_map);
|
|
}
|
|
#endif
|
|
|
|
bool stop_replay_by_wal_filter = false;
|
|
bool stop_replay_for_corruption = false;
|
|
bool flushed = false;
|
|
for (auto log_number : log_numbers) {
|
|
// The previous incarnation may not have written any MANIFEST
|
|
// records after allocating this log number. So we manually
|
|
// update the file number allocation counter in VersionSet.
|
|
versions_->MarkFileNumberUsedDuringRecovery(log_number);
|
|
// Open the log file
|
|
std::string fname = LogFileName(immutable_db_options_.wal_dir, log_number);
|
|
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"Recovering log #%" PRIu64 " mode %d", log_number,
|
|
immutable_db_options_.wal_recovery_mode);
|
|
auto logFileDropped = [this, &fname]() {
|
|
uint64_t bytes;
|
|
if (env_->GetFileSize(fname, &bytes).ok()) {
|
|
auto info_log = immutable_db_options_.info_log.get();
|
|
ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
|
|
static_cast<int>(bytes));
|
|
}
|
|
};
|
|
if (stop_replay_by_wal_filter) {
|
|
logFileDropped();
|
|
continue;
|
|
}
|
|
|
|
unique_ptr<SequentialFileReader> file_reader;
|
|
{
|
|
unique_ptr<SequentialFile> file;
|
|
status = env_->NewSequentialFile(fname, &file,
|
|
env_->OptimizeForLogRead(env_options_));
|
|
if (!status.ok()) {
|
|
MaybeIgnoreError(&status);
|
|
if (!status.ok()) {
|
|
return status;
|
|
} else {
|
|
// Fail with one log file, but that's ok.
|
|
// Try next one.
|
|
continue;
|
|
}
|
|
}
|
|
file_reader.reset(new SequentialFileReader(std::move(file)));
|
|
}
|
|
|
|
// Create the log reader.
|
|
LogReporter reporter;
|
|
reporter.env = env_;
|
|
reporter.info_log = immutable_db_options_.info_log.get();
|
|
reporter.fname = fname.c_str();
|
|
if (!immutable_db_options_.paranoid_checks ||
|
|
immutable_db_options_.wal_recovery_mode ==
|
|
WALRecoveryMode::kSkipAnyCorruptedRecords) {
|
|
reporter.status = nullptr;
|
|
} else {
|
|
reporter.status = &status;
|
|
}
|
|
// We intentially make log::Reader do checksumming even if
|
|
// paranoid_checks==false so that corruptions cause entire commits
|
|
// to be skipped instead of propagating bad information (like overly
|
|
// large sequence numbers).
|
|
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
|
|
&reporter, true /*checksum*/, 0 /*initial_offset*/,
|
|
log_number);
|
|
|
|
// Determine if we should tolerate incomplete records at the tail end of the
|
|
// Read all the records and add to a memtable
|
|
std::string scratch;
|
|
Slice record;
|
|
WriteBatch batch;
|
|
|
|
while (!stop_replay_by_wal_filter &&
|
|
reader.ReadRecord(&record, &scratch,
|
|
immutable_db_options_.wal_recovery_mode) &&
|
|
status.ok()) {
|
|
if (record.size() < WriteBatchInternal::kHeader) {
|
|
reporter.Corruption(record.size(),
|
|
Status::Corruption("log record too small"));
|
|
continue;
|
|
}
|
|
WriteBatchInternal::SetContents(&batch, record);
|
|
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
|
|
|
|
if (immutable_db_options_.wal_recovery_mode ==
|
|
WALRecoveryMode::kPointInTimeRecovery) {
|
|
// In point-in-time recovery mode, if sequence id of log files are
|
|
// consecutive, we continue recovery despite corruption. This could
|
|
// happen when we open and write to a corrupted DB, where sequence id
|
|
// will start from the last sequence id we recovered.
|
|
if (sequence == *next_sequence) {
|
|
stop_replay_for_corruption = false;
|
|
}
|
|
if (stop_replay_for_corruption) {
|
|
logFileDropped();
|
|
break;
|
|
}
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
if (immutable_db_options_.wal_filter != nullptr) {
|
|
WriteBatch new_batch;
|
|
bool batch_changed = false;
|
|
|
|
WalFilter::WalProcessingOption wal_processing_option =
|
|
immutable_db_options_.wal_filter->LogRecordFound(
|
|
log_number, fname, batch, &new_batch, &batch_changed);
|
|
|
|
switch (wal_processing_option) {
|
|
case WalFilter::WalProcessingOption::kContinueProcessing:
|
|
// do nothing, proceeed normally
|
|
break;
|
|
case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
|
|
// skip current record
|
|
continue;
|
|
case WalFilter::WalProcessingOption::kStopReplay:
|
|
// skip current record and stop replay
|
|
stop_replay_by_wal_filter = true;
|
|
continue;
|
|
case WalFilter::WalProcessingOption::kCorruptedRecord: {
|
|
status =
|
|
Status::Corruption("Corruption reported by Wal Filter ",
|
|
immutable_db_options_.wal_filter->Name());
|
|
MaybeIgnoreError(&status);
|
|
if (!status.ok()) {
|
|
reporter.Corruption(record.size(), status);
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
default: {
|
|
assert(false); // unhandled case
|
|
status = Status::NotSupported(
|
|
"Unknown WalProcessingOption returned"
|
|
" by Wal Filter ",
|
|
immutable_db_options_.wal_filter->Name());
|
|
MaybeIgnoreError(&status);
|
|
if (!status.ok()) {
|
|
return status;
|
|
} else {
|
|
// Ignore the error with current record processing.
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (batch_changed) {
|
|
// Make sure that the count in the new batch is
|
|
// within the orignal count.
|
|
int new_count = WriteBatchInternal::Count(&new_batch);
|
|
int original_count = WriteBatchInternal::Count(&batch);
|
|
if (new_count > original_count) {
|
|
ROCKS_LOG_FATAL(
|
|
immutable_db_options_.info_log,
|
|
"Recovering log #%" PRIu64
|
|
" mode %d log filter %s returned "
|
|
"more records (%d) than original (%d) which is not allowed. "
|
|
"Aborting recovery.",
|
|
log_number, immutable_db_options_.wal_recovery_mode,
|
|
immutable_db_options_.wal_filter->Name(), new_count,
|
|
original_count);
|
|
status = Status::NotSupported(
|
|
"More than original # of records "
|
|
"returned by Wal Filter ",
|
|
immutable_db_options_.wal_filter->Name());
|
|
return status;
|
|
}
|
|
// Set the same sequence number in the new_batch
|
|
// as the original batch.
|
|
WriteBatchInternal::SetSequence(&new_batch,
|
|
WriteBatchInternal::Sequence(&batch));
|
|
batch = new_batch;
|
|
}
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
// If column family was not found, it might mean that the WAL write
|
|
// batch references to the column family that was dropped after the
|
|
// insert. We don't want to fail the whole write batch in that case --
|
|
// we just ignore the update.
|
|
// That's why we set ignore missing column families to true
|
|
bool has_valid_writes = false;
|
|
status = WriteBatchInternal::InsertInto(
|
|
&batch, column_family_memtables_.get(), &flush_scheduler_, true,
|
|
log_number, this, false /* concurrent_memtable_writes */,
|
|
next_sequence, &has_valid_writes);
|
|
MaybeIgnoreError(&status);
|
|
if (!status.ok()) {
|
|
// We are treating this as a failure while reading since we read valid
|
|
// blocks that do not form coherent data
|
|
reporter.Corruption(record.size(), status);
|
|
continue;
|
|
}
|
|
|
|
if (has_valid_writes && !read_only) {
|
|
// we can do this because this is called before client has access to the
|
|
// DB and there is only a single thread operating on DB
|
|
ColumnFamilyData* cfd;
|
|
|
|
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
|
|
cfd->Unref();
|
|
// If this asserts, it means that InsertInto failed in
|
|
// filtering updates to already-flushed column families
|
|
assert(cfd->GetLogNumber() <= log_number);
|
|
auto iter = version_edits.find(cfd->GetID());
|
|
assert(iter != version_edits.end());
|
|
VersionEdit* edit = &iter->second;
|
|
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
|
|
if (!status.ok()) {
|
|
// Reflect errors immediately so that conditions like full
|
|
// file-systems cause the DB::Open() to fail.
|
|
return status;
|
|
}
|
|
flushed = true;
|
|
|
|
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
|
|
*next_sequence);
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!status.ok()) {
|
|
if (immutable_db_options_.wal_recovery_mode ==
|
|
WALRecoveryMode::kSkipAnyCorruptedRecords) {
|
|
// We should ignore all errors unconditionally
|
|
status = Status::OK();
|
|
} else if (immutable_db_options_.wal_recovery_mode ==
|
|
WALRecoveryMode::kPointInTimeRecovery) {
|
|
// We should ignore the error but not continue replaying
|
|
status = Status::OK();
|
|
stop_replay_for_corruption = true;
|
|
ROCKS_LOG_INFO(immutable_db_options_.info_log,
|
|
"Point in time recovered to log #%" PRIu64
|
|
" seq #%" PRIu64,
|
|
log_number, *next_sequence);
|
|
} else {
|
|
assert(immutable_db_options_.wal_recovery_mode ==
|
|
WALRecoveryMode::kTolerateCorruptedTailRecords ||
|
|
immutable_db_options_.wal_recovery_mode ==
|
|
WALRecoveryMode::kAbsoluteConsistency);
|
|
return status;
|
|
}
|
|
}
|
|
|
|
flush_scheduler_.Clear();
|
|
auto last_sequence = *next_sequence - 1;
|
|
if ((*next_sequence != kMaxSequenceNumber) &&
|
|
(versions_->LastSequence() <= last_sequence)) {
|
|
versions_->SetLastToBeWrittenSequence(last_sequence);
|
|
versions_->SetLastSequence(last_sequence);
|
|
}
|
|
}
|
|
|
|
// True if there's any data in the WALs; if not, we can skip re-processing
|
|
// them later
|
|
bool data_seen = false;
|
|
if (!read_only) {
|
|
// no need to refcount since client still doesn't have access
|
|
// to the DB and can not drop column families while we iterate
|
|
auto max_log_number = log_numbers.back();
|
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
|
auto iter = version_edits.find(cfd->GetID());
|
|
assert(iter != version_edits.end());
|
|
VersionEdit* edit = &iter->second;
|
|
|
|
if (cfd->GetLogNumber() > max_log_number) {
|
|
// Column family cfd has already flushed the data
|
|
// from all logs. Memtable has to be empty because
|
|
// we filter the updates based on log_number
|
|
// (in WriteBatch::InsertInto)
|
|
assert(cfd->mem()->GetFirstSequenceNumber() == 0);
|
|
assert(edit->NumEntries() == 0);
|
|
continue;
|
|
}
|
|
|
|
// flush the final memtable (if non-empty)
|
|
if (cfd->mem()->GetFirstSequenceNumber() != 0) {
|
|
// If flush happened in the middle of recovery (e.g. due to memtable
|
|
// being full), we flush at the end. Otherwise we'll need to record
|
|
// where we were on last flush, which make the logic complicated.
|
|
if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
|
|
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
|
|
if (!status.ok()) {
|
|
// Recovery failed
|
|
break;
|
|
}
|
|
flushed = true;
|
|
|
|
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
|
|
versions_->LastSequence());
|
|
}
|
|
data_seen = true;
|
|
}
|
|
|
|
// write MANIFEST with update
|
|
// writing log_number in the manifest means that any log file
|
|
// with number strongly less than (log_number + 1) is already
|
|
// recovered and should be ignored on next reincarnation.
|
|
// Since we already recovered max_log_number, we want all logs
|
|
// with numbers `<= max_log_number` (includes this one) to be ignored
|
|
if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
|
|
edit->SetLogNumber(max_log_number + 1);
|
|
}
|
|
// we must mark the next log number as used, even though it's
|
|
// not actually used. that is because VersionSet assumes
|
|
// VersionSet::next_file_number_ always to be strictly greater than any
|
|
// log number
|
|
versions_->MarkFileNumberUsedDuringRecovery(max_log_number + 1);
|
|
status = versions_->LogAndApply(
|
|
cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
|
|
if (!status.ok()) {
|
|
// Recovery failed
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (data_seen && !flushed) {
|
|
// Mark these as alive so they'll be considered for deletion later by
|
|
// FindObsoleteFiles()
|
|
if (concurrent_prepare_) {
|
|
log_write_mutex_.Lock();
|
|
}
|
|
for (auto log_number : log_numbers) {
|
|
alive_log_files_.push_back(LogFileNumberSize(log_number));
|
|
}
|
|
if (concurrent_prepare_) {
|
|
log_write_mutex_.Unlock();
|
|
}
|
|
}
|
|
|
|
event_logger_.Log() << "job" << job_id << "event"
|
|
<< "recovery_finished";
|
|
|
|
return status;
|
|
}
|
|
|
|
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
|
MemTable* mem, VersionEdit* edit) {
|
|
mutex_.AssertHeld();
|
|
const uint64_t start_micros = env_->NowMicros();
|
|
FileMetaData meta;
|
|
auto pending_outputs_inserted_elem =
|
|
CaptureCurrentFileNumberInPendingOutputs();
|
|
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
|
ReadOptions ro;
|
|
ro.total_order_seek = true;
|
|
Arena arena;
|
|
Status s;
|
|
TableProperties table_properties;
|
|
{
|
|
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
|
"[%s] [WriteLevel0TableForRecovery]"
|
|
" Level-0 table #%" PRIu64 ": started",
|
|
cfd->GetName().c_str(), meta.fd.GetNumber());
|
|
|
|
// Get the latest mutable cf options while the mutex is still locked
|
|
const MutableCFOptions mutable_cf_options =
|
|
*cfd->GetLatestMutableCFOptions();
|
|
bool paranoid_file_checks =
|
|
cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
|
|
|
|
int64_t _current_time = 0;
|
|
env_->GetCurrentTime(&_current_time); // ignore error
|
|
const uint64_t current_time = static_cast<uint64_t>(_current_time);
|
|
|
|
{
|
|
mutex_.Unlock();
|
|
|
|
SequenceNumber earliest_write_conflict_snapshot;
|
|
std::vector<SequenceNumber> snapshot_seqs =
|
|
snapshots_.GetAll(&earliest_write_conflict_snapshot);
|
|
|
|
EnvOptions optimized_env_options =
|
|
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
|
|
s = BuildTable(
|
|
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
|
|
optimized_env_options, cfd->table_cache(), iter.get(),
|
|
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
|
|
&meta, cfd->internal_comparator(),
|
|
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
|
snapshot_seqs, earliest_write_conflict_snapshot,
|
|
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
|
|
cfd->ioptions()->compression_opts, paranoid_file_checks,
|
|
cfd->internal_stats(), TableFileCreationReason::kRecovery,
|
|
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
|
|
-1 /* level */, current_time);
|
|
LogFlush(immutable_db_options_.info_log);
|
|
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
|
|
"[%s] [WriteLevel0TableForRecovery]"
|
|
" Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
|
|
cfd->GetName().c_str(), meta.fd.GetNumber(),
|
|
meta.fd.GetFileSize(), s.ToString().c_str());
|
|
mutex_.Lock();
|
|
}
|
|
}
|
|
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
|
|
|
|
// Note that if file_size is zero, the file has been deleted and
|
|
// should not be added to the manifest.
|
|
int level = 0;
|
|
if (s.ok() && meta.fd.GetFileSize() > 0) {
|
|
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
|
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
|
meta.smallest_seqno, meta.largest_seqno,
|
|
meta.marked_for_compaction);
|
|
}
|
|
|
|
InternalStats::CompactionStats stats(1);
|
|
stats.micros = env_->NowMicros() - start_micros;
|
|
stats.bytes_written = meta.fd.GetFileSize();
|
|
stats.num_output_files = 1;
|
|
cfd->internal_stats()->AddCompactionStats(level, stats);
|
|
cfd->internal_stats()->AddCFStats(
|
|
InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
|
|
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
|
|
return s;
|
|
}
|
|
|
|
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
|
|
DBOptions db_options(options);
|
|
ColumnFamilyOptions cf_options(options);
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
|
|
if (s.ok()) {
|
|
assert(handles.size() == 1);
|
|
// i can delete the handle since DBImpl is always holding a reference to
|
|
// default column family
|
|
delete handles[0];
|
|
}
|
|
return s;
|
|
}
|
|
|
|
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
|
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
|
|
Status s = SanitizeOptionsByTable(db_options, column_families);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
s = ValidateOptions(db_options, column_families);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
*dbptr = nullptr;
|
|
handles->clear();
|
|
|
|
size_t max_write_buffer_size = 0;
|
|
for (auto cf : column_families) {
|
|
max_write_buffer_size =
|
|
std::max(max_write_buffer_size, cf.options.write_buffer_size);
|
|
}
|
|
|
|
DBImpl* impl = new DBImpl(db_options, dbname);
|
|
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
|
|
if (s.ok()) {
|
|
for (auto db_path : impl->immutable_db_options_.db_paths) {
|
|
s = impl->env_->CreateDirIfMissing(db_path.path);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!s.ok()) {
|
|
delete impl;
|
|
return s;
|
|
}
|
|
|
|
s = impl->CreateArchivalDirectory();
|
|
if (!s.ok()) {
|
|
delete impl;
|
|
return s;
|
|
}
|
|
impl->mutex_.Lock();
|
|
// Handles create_if_missing, error_if_exists
|
|
s = impl->Recover(column_families);
|
|
if (s.ok()) {
|
|
uint64_t new_log_number = impl->versions_->NewFileNumber();
|
|
unique_ptr<WritableFile> lfile;
|
|
EnvOptions soptions(db_options);
|
|
EnvOptions opt_env_options =
|
|
impl->immutable_db_options_.env->OptimizeForLogWrite(
|
|
soptions, BuildDBOptions(impl->immutable_db_options_,
|
|
impl->mutable_db_options_));
|
|
s = NewWritableFile(
|
|
impl->immutable_db_options_.env,
|
|
LogFileName(impl->immutable_db_options_.wal_dir, new_log_number),
|
|
&lfile, opt_env_options);
|
|
if (s.ok()) {
|
|
lfile->SetPreallocationBlockSize(
|
|
impl->GetWalPreallocateBlockSize(max_write_buffer_size));
|
|
{
|
|
InstrumentedMutexLock wl(&impl->log_write_mutex_);
|
|
impl->logfile_number_ = new_log_number;
|
|
unique_ptr<WritableFileWriter> file_writer(
|
|
new WritableFileWriter(std::move(lfile), opt_env_options));
|
|
impl->logs_.emplace_back(
|
|
new_log_number,
|
|
new log::Writer(
|
|
std::move(file_writer), new_log_number,
|
|
impl->immutable_db_options_.recycle_log_file_num > 0));
|
|
}
|
|
|
|
// set column family handles
|
|
for (auto cf : column_families) {
|
|
auto cfd =
|
|
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
|
|
if (cfd != nullptr) {
|
|
handles->push_back(
|
|
new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
|
|
impl->NewThreadStatusCfInfo(cfd);
|
|
} else {
|
|
if (db_options.create_missing_column_families) {
|
|
// missing column family, create it
|
|
ColumnFamilyHandle* handle;
|
|
impl->mutex_.Unlock();
|
|
s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
|
|
impl->mutex_.Lock();
|
|
if (s.ok()) {
|
|
handles->push_back(handle);
|
|
} else {
|
|
break;
|
|
}
|
|
} else {
|
|
s = Status::InvalidArgument("Column family not found: ", cf.name);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (s.ok()) {
|
|
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
|
|
delete impl->InstallSuperVersionAndScheduleWork(
|
|
cfd, nullptr, *cfd->GetLatestMutableCFOptions());
|
|
}
|
|
if (impl->concurrent_prepare_) {
|
|
impl->log_write_mutex_.Lock();
|
|
}
|
|
impl->alive_log_files_.push_back(
|
|
DBImpl::LogFileNumberSize(impl->logfile_number_));
|
|
if (impl->concurrent_prepare_) {
|
|
impl->log_write_mutex_.Unlock();
|
|
}
|
|
impl->DeleteObsoleteFiles();
|
|
s = impl->directories_.GetDbDir()->Fsync();
|
|
}
|
|
}
|
|
|
|
if (s.ok()) {
|
|
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
|
|
if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
|
|
auto* vstorage = cfd->current()->storage_info();
|
|
for (int i = 1; i < vstorage->num_levels(); ++i) {
|
|
int num_files = vstorage->NumLevelFiles(i);
|
|
if (num_files > 0) {
|
|
s = Status::InvalidArgument(
|
|
"Not all files are at level 0. Cannot "
|
|
"open with FIFO compaction style.");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (!cfd->mem()->IsSnapshotSupported()) {
|
|
impl->is_snapshot_supported_ = false;
|
|
}
|
|
if (cfd->ioptions()->merge_operator != nullptr &&
|
|
!cfd->mem()->IsMergeOperatorSupported()) {
|
|
s = Status::InvalidArgument(
|
|
"The memtable of column family %s does not support merge operator "
|
|
"its options.merge_operator is non-null", cfd->GetName().c_str());
|
|
}
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
TEST_SYNC_POINT("DBImpl::Open:Opened");
|
|
Status persist_options_status;
|
|
if (s.ok()) {
|
|
// Persist RocksDB Options before scheduling the compaction.
|
|
// The WriteOptionsFile() will release and lock the mutex internally.
|
|
persist_options_status = impl->WriteOptionsFile(
|
|
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
|
|
|
|
*dbptr = impl;
|
|
impl->opened_successfully_ = true;
|
|
impl->MaybeScheduleFlushOrCompaction();
|
|
}
|
|
impl->mutex_.Unlock();
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
auto sfm = static_cast<SstFileManagerImpl*>(
|
|
impl->immutable_db_options_.sst_file_manager.get());
|
|
if (s.ok() && sfm) {
|
|
// Notify SstFileManager about all sst files that already exist in
|
|
// db_paths[0] when the DB is opened.
|
|
auto& db_path = impl->immutable_db_options_.db_paths[0];
|
|
std::vector<std::string> existing_files;
|
|
impl->immutable_db_options_.env->GetChildren(db_path.path, &existing_files);
|
|
for (auto& file_name : existing_files) {
|
|
uint64_t file_number;
|
|
FileType file_type;
|
|
std::string file_path = db_path.path + "/" + file_name;
|
|
if (ParseFileName(file_name, &file_number, &file_type) &&
|
|
file_type == kTableFile) {
|
|
sfm->OnAddFile(file_path);
|
|
}
|
|
}
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
if (s.ok()) {
|
|
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
|
|
LogFlush(impl->immutable_db_options_.info_log);
|
|
if (!persist_options_status.ok()) {
|
|
s = Status::IOError(
|
|
"DB::Open() failed --- Unable to persist Options file",
|
|
persist_options_status.ToString());
|
|
}
|
|
}
|
|
if (!s.ok()) {
|
|
for (auto* h : *handles) {
|
|
delete h;
|
|
}
|
|
handles->clear();
|
|
delete impl;
|
|
*dbptr = nullptr;
|
|
}
|
|
return s;
|
|
}
|
|
} // namespace rocksdb
|