Support for single-primary, multi-secondary instances ()

Summary:
This PR allows RocksDB to run in single-primary, multi-secondary process mode.
The writer is a regular RocksDB (e.g. an `DBImpl`) instance playing the role of a primary.
Multiple `DBImplSecondary` processes (secondaries) share the same set of SST files, MANIFEST, WAL files with the primary. Secondaries tail the MANIFEST of the primary and apply updates to their own in-memory state of the file system, e.g. `VersionStorageInfo`.

This PR has several components:
1. (Originally in ). Add a `PathNotFound` subcode to `IOError` to denote the failure when a secondary tries to open a file which has been deleted by the primary.

2. (Similar to ). Add `FragmentBufferedReader` to handle partially-read, trailing record at the end of a log from where future read can continue.

3. (Originally in  and ). Add implementation of the secondary, i.e. `DBImplSecondary`.
3.1 Tail the primary's MANIFEST during recovery.
3.2 Tail the primary's MANIFEST during normal processing by calling `ReadAndApply`.
3.3 Tailing WAL will be in a future PR.

4. Add an example in 'examples/multi_processes_example.cc' to demonstrate the usage of secondary RocksDB instance in a multi-process setting. Instructions to run the example can be found at the beginning of the source code.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4899

Differential Revision: D14510945

Pulled By: riversand963

fbshipit-source-id: 4ac1c5693e6012ad23f7b4b42d3c374fecbe8886
This commit is contained in:
Yanqin Jin 2019-03-26 16:41:31 -07:00 committed by Facebook Github Bot
parent 2a5463ae84
commit 9358178edc
32 changed files with 2597 additions and 283 deletions

@ -489,6 +489,7 @@ set(SOURCES
db/db_impl_debug.cc
db/db_impl_experimental.cc
db/db_impl_readonly.cc
db/db_impl_secondary.cc
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
@ -873,6 +874,7 @@ if(WITH_TESTS)
db/db_options_test.cc
db/db_properties_test.cc
db/db_range_del_test.cc
db/db_secondary_test.cc
db/db_sst_test.cc
db/db_statistics_test.cc
db/db_table_properties_test.cc

@ -2,24 +2,17 @@
## Unreleased
### New Features
* Introduce two more stats levels, kExceptHistogramOrTimers and kExceptTimers.
* Added a feature to perform data-block sampling for compressibility, and report stats to user.
* Add support for trace filtering.
### Public API Change
* Remove bundled fbson library.
* statistics.stats_level_ becomes atomic. It is preferred to use statistics.set_stats_level() and statistics.get_stats_level() to access it.
* Introduce a new IOError subcode, PathNotFound, to indicate trying to open a nonexistent file or directory for read.
* Add initial support for multiple db instances sharing the same data in single-writer, multi-reader mode.
### Bug Fixes
* Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms.
* Fix SstFileReader not able to open file ingested with write_glbal_seqno=true.
## Unreleased
### New Features
* Added a feature to perform data-block sampling for compressibility, and report stats to user.
### Public API Change
### Bug fixes
## 6.0.0 (2/19/2019)
### New Features
* Enabled checkpoint on readonly db (DBImplReadOnly).

@ -443,6 +443,7 @@ TESTS = \
db_merge_operator_test \
db_options_test \
db_range_del_test \
db_secondary_test \
db_sst_test \
db_tailing_iter_test \
db_io_failure_test \
@ -547,6 +548,7 @@ TESTS = \
range_tombstone_fragmenter_test \
range_del_aggregator_test \
sst_file_reader_test \
db_secondary_test \
PARALLEL_TEST = \
backupable_db_test \
@ -1571,6 +1573,9 @@ range_tombstone_fragmenter_test: db/range_tombstone_fragmenter_test.o db/db_test
sst_file_reader_test: table/sst_file_reader_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_secondary_test: db/db_secondary_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
#-------------------------------------------------
# make install related stuff
INSTALL_PATH ?= /usr/local

@ -98,6 +98,7 @@ cpp_library(
"db/db_impl_files.cc",
"db/db_impl_open.cc",
"db/db_impl_readonly.cc",
"db/db_impl_secondary.cc",
"db/db_impl_write.cc",
"db/db_info_dumper.cc",
"db/db_iter.cc",
@ -605,6 +606,11 @@ ROCKS_TESTS = [
"db/db_range_del_test.cc",
"serial",
],
[
"db_secondary_test",
"db/db_secondary_test.cc",
"serial",
],
[
"db_sst_test",
"db/db_sst_test.cc",

@ -148,18 +148,21 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
immutable_db_options_(initial_db_options_),
mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.statistics.get()),
db_lock_(nullptr),
mutex_(stats_, env_, DB_MUTEX_WAIT_MICROS,
immutable_db_options_.use_adaptive_mutex),
default_cf_handle_(nullptr),
max_total_in_memory_state_(0),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, immutable_db_options_)),
db_lock_(nullptr),
shutting_down_(false),
bg_cv_(&mutex_),
logfile_number_(0),
log_dir_synced_(false),
log_empty_(true),
default_cf_handle_(nullptr),
log_sync_cv_(&mutex_),
total_log_size_(0),
max_total_in_memory_state_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_),
@ -186,9 +189,6 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
next_job_id_(1),
has_unpersisted_data_(false),
unable_to_release_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, immutable_db_options_)),
num_running_ingest_file_(0),
#ifndef ROCKSDB_LITE
wal_manager_(immutable_db_options_, env_options_, seq_per_batch),

@ -758,6 +758,29 @@ class DBImpl : public DB {
std::unique_ptr<Tracer> tracer_;
InstrumentedMutex trace_mutex_;
// State below is protected by mutex_
// With two_write_queues enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
// logs_, logfile_number_. Refer to the definition of each variable below for
// more description.
mutable InstrumentedMutex mutex_;
ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_;
// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
// If true, we have only one (default) column family. We use this to optimize
// some code-paths
bool single_column_family_mode_;
// The options to access storage files
const EnvOptions env_options_;
// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;
// Except in DB::Open(), WriteOptionsFile can only be called when:
// Persist options to options file.
// If need_mutex_lock = false, the method will lock DB mutex.
@ -845,6 +868,14 @@ class DBImpl : public DB {
// Actual implementation of Close()
Status CloseImpl();
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
virtual Status Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
private:
friend class DB;
friend class ErrorHandler;
@ -893,13 +924,6 @@ class DBImpl : public DB {
struct PrepickedCompaction;
struct PurgeFileInfo;
// Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit.
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
Status ResumeImpl();
void MaybeIgnoreError(Status* s) const;
@ -1216,12 +1240,6 @@ class DBImpl : public DB {
// and log_empty_. Refer to the definition of each variable below for more
// details.
InstrumentedMutex log_write_mutex_;
// State below is protected by mutex_
// With two_write_queues enabled, some of the variables that accessed during
// WriteToWAL need different synchronization: log_empty_, alive_log_files_,
// logs_, logfile_number_. Refer to the definition of each variable below for
// more description.
mutable InstrumentedMutex mutex_;
std::atomic<bool> shutting_down_;
// This condition variable is signaled on these conditions:
@ -1253,8 +1271,7 @@ class DBImpl : public DB {
// read and writes are protected by log_write_mutex_ instead. This is to avoid
// expesnive mutex_ lock during WAL write, which update log_empty_.
bool log_empty_;
ColumnFamilyHandleImpl* default_cf_handle_;
InternalStats* default_cf_internal_stats_;
std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number)
@ -1321,12 +1338,7 @@ class DBImpl : public DB {
WriteBatch cached_recoverable_state_;
std::atomic<bool> cached_recoverable_state_empty_ = {true};
std::atomic<uint64_t> total_log_size_;
// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
// If true, we have only one (default) column family. We use this to optimize
// some code-paths
bool single_column_family_mode_;
// If this is non-empty, we need to delete these log files in background
// threads. Protected by db mutex.
autovector<log::Writer*> logs_to_free_;
@ -1545,12 +1557,6 @@ class DBImpl : public DB {
std::string db_absolute_path_;
// The options to access storage files
const EnvOptions env_options_;
// Additonal options for compaction and flush
EnvOptions env_options_for_compaction_;
// Number of running IngestExternalFile() calls.
// REQUIRES: mutex held
int num_running_ingest_file_;

@ -629,8 +629,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// to be skipped instead of propagating bad information (like overly
// large sequence numbers).
log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
&reporter, true /*checksum*/, log_number,
false /* retry_after_eof */);
&reporter, true /*checksum*/, log_number);
// Determine if we should tolerate incomplete records at the tail end of the
// Read all the records and add to a memtable

356
db/db_impl_secondary.cc Normal file

@ -0,0 +1,356 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_impl_secondary.h"
#include "db/db_iter.h"
#include "db/merge_context.h"
#include "monitoring/perf_context_imp.h"
#include "util/auto_roll_logger.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
DBImplSecondary::DBImplSecondary(const DBOptions& db_options,
const std::string& dbname)
: DBImpl(db_options, dbname) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Opening the db in secondary mode");
LogFlush(immutable_db_options_.info_log);
}
DBImplSecondary::~DBImplSecondary() {}
Status DBImplSecondary::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool /*readonly*/, bool /*error_if_log_file_exist*/,
bool /*error_if_data_exists_in_logs*/) {
mutex_.AssertHeld();
Status s;
s = static_cast<ReactiveVersionSet*>(versions_.get())
->Recover(column_families, &manifest_reader_, &manifest_reporter_,
&manifest_reader_status_);
if (!s.ok()) {
return s;
}
if (immutable_db_options_.paranoid_checks && s.ok()) {
s = CheckConsistency();
}
// Initial max_total_in_memory_state_ before recovery logs.
max_total_in_memory_state_ = 0;
for (auto cfd : *versions_->GetColumnFamilySet()) {
auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
mutable_cf_options->max_write_buffer_number;
}
if (s.ok()) {
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
single_column_family_mode_ =
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
}
// TODO: attempt to recover from WAL files.
return s;
}
// Implementation of the DB interface
Status DBImplSecondary::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
return GetImpl(read_options, column_family, key, value);
}
Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, env_);
StopWatch sw(env_, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
if (tracer_) {
InstrumentedMutexLock lock(&trace_mutex_);
if (tracer_) {
tracer_->Get(column_family, key);
}
}
// Acquire SuperVersion
SuperVersion* super_version = GetAndRefSuperVersion(cfd);
SequenceNumber snapshot = versions_->LastSequence();
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0;
Status s;
LookupKey lkey(key, snapshot);
PERF_TIMER_STOP(get_snapshot_time);
bool done = false;
if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
} else if ((s.ok() || s.IsMergeInProgress()) &&
super_version->imm->Get(
lkey, pinnable_val->GetSelf(), &s, &merge_context,
&max_covering_tombstone_seq, read_options)) {
done = true;
pinnable_val->PinSelf();
RecordTick(stats_, MEMTABLE_HIT);
}
if (!done && !s.ok() && !s.IsMergeInProgress()) {
ReturnAndCleanupSuperVersion(cfd, super_version);
return s;
}
if (!done) {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->Get(read_options, lkey, pinnable_val, &s,
&merge_context, &max_covering_tombstone_seq);
RecordTick(stats_, MEMTABLE_MISS);
}
{
PERF_TIMER_GUARD(get_post_process_time);
ReturnAndCleanupSuperVersion(cfd, super_version);
RecordTick(stats_, NUMBER_KEYS_READ);
size_t size = pinnable_val->size();
RecordTick(stats_, BYTES_READ, size);
RecordTimeToHistogram(stats_, BYTES_PER_READ, size);
PERF_COUNTER_ADD(get_read_bytes, size);
}
return s;
}
Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options,
ColumnFamilyHandle* column_family) {
if (read_options.managed) {
return NewErrorIterator(
Status::NotSupported("Managed iterator is not supported anymore."));
}
if (read_options.read_tier == kPersistedTier) {
return NewErrorIterator(Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators."));
}
Iterator* result = nullptr;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
ReadCallback* read_callback = nullptr; // No read callback provided.
if (read_options.tailing) {
return NewErrorIterator(Status::NotSupported(
"tailing iterator not supported in secondary mode"));
} else if (read_options.snapshot != nullptr) {
// TODO (yanqin) support snapshot.
return NewErrorIterator(
Status::NotSupported("snapshot not supported in secondary mode"));
} else {
auto snapshot = versions_->LastSequence();
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
}
return result;
}
ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl(
const ReadOptions& read_options, ColumnFamilyData* cfd,
SequenceNumber snapshot, ReadCallback* read_callback) {
assert(nullptr != cfd);
SuperVersion* super_version = cfd->GetReferencedSuperVersion(&mutex_);
auto db_iter = NewArenaWrappedDbIterator(
env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options,
snapshot,
super_version->mutable_cf_options.max_sequential_skip_in_iterations,
super_version->version_number, read_callback);
auto internal_iter =
NewInternalIterator(read_options, cfd, super_version, db_iter->GetArena(),
db_iter->GetRangeDelAggregator(), snapshot);
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
Status DBImplSecondary::NewIterators(
const ReadOptions& read_options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) {
if (read_options.managed) {
return Status::NotSupported("Managed iterator is not supported anymore.");
}
if (read_options.read_tier == kPersistedTier) {
return Status::NotSupported(
"ReadTier::kPersistedData is not yet supported in iterators.");
}
ReadCallback* read_callback = nullptr; // No read callback provided.
if (iterators == nullptr) {
return Status::InvalidArgument("iterators not allowed to be nullptr");
}
iterators->clear();
iterators->reserve(column_families.size());
if (read_options.tailing) {
return Status::NotSupported(
"tailing iterator not supported in secondary mode");
} else if (read_options.snapshot != nullptr) {
// TODO (yanqin) support snapshot.
return Status::NotSupported("snapshot not supported in secondary mode");
} else {
SequenceNumber read_seq = versions_->LastSequence();
for (auto cfh : column_families) {
ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
iterators->push_back(
NewIteratorImpl(read_options, cfd, read_seq, read_callback));
}
}
return Status::OK();
}
Status DBImplSecondary::TryCatchUpWithPrimary() {
assert(versions_.get() != nullptr);
assert(manifest_reader_.get() != nullptr);
Status s;
std::unordered_set<ColumnFamilyData*> cfds_changed;
InstrumentedMutexLock lock_guard(&mutex_);
s = static_cast<ReactiveVersionSet*>(versions_.get())
->ReadAndApply(&mutex_, &manifest_reader_, &cfds_changed);
if (s.ok()) {
SuperVersionContext sv_context(true /* create_superversion */);
for (auto cfd : cfds_changed) {
sv_context.NewSuperVersion();
cfd->InstallSuperVersion(&sv_context, &mutex_);
}
sv_context.Clean();
}
return s;
}
Status DB::OpenAsSecondary(const Options& options, const std::string& dbname,
const std::string& secondary_path, DB** dbptr) {
*dbptr = nullptr;
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
std::vector<ColumnFamilyHandle*> handles;
Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path,
column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
delete handles[0];
}
return s;
}
Status DB::OpenAsSecondary(
const DBOptions& db_options, const std::string& dbname,
const std::string& secondary_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
*dbptr = nullptr;
if (db_options.max_open_files != -1) {
// TODO (yanqin) maybe support max_open_files != -1 by creating hard links
// on SST files so that db secondary can still have access to old SSTs
// while primary instance may delete original.
return Status::InvalidArgument("require max_open_files to be -1");
}
DBOptions tmp_opts(db_options);
if (nullptr == tmp_opts.info_log) {
Env* env = tmp_opts.env;
assert(env != nullptr);
std::string secondary_abs_path;
env->GetAbsolutePath(secondary_path, &secondary_abs_path);
std::string fname = InfoLogFileName(secondary_path, secondary_abs_path,
tmp_opts.db_log_dir);
env->CreateDirIfMissing(secondary_path);
if (tmp_opts.log_file_time_to_roll > 0 || tmp_opts.max_log_file_size > 0) {
AutoRollLogger* result = new AutoRollLogger(
env, secondary_path, tmp_opts.db_log_dir, tmp_opts.max_log_file_size,
tmp_opts.log_file_time_to_roll, tmp_opts.info_log_level);
Status s = result->GetStatus();
if (!s.ok()) {
delete result;
} else {
tmp_opts.info_log.reset(result);
}
}
if (nullptr == tmp_opts.info_log) {
env->RenameFile(
fname, OldInfoLogFileName(secondary_path, env->NowMicros(),
secondary_abs_path, tmp_opts.db_log_dir));
Status s = env->NewLogger(fname, &(tmp_opts.info_log));
if (tmp_opts.info_log != nullptr) {
tmp_opts.info_log->SetInfoLogLevel(tmp_opts.info_log_level);
}
}
}
assert(tmp_opts.info_log != nullptr);
handles->clear();
DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname);
impl->versions_.reset(new ReactiveVersionSet(
dbname, &impl->immutable_db_options_, impl->env_options_,
impl->table_cache_.get(), impl->write_buffer_manager_,
&impl->write_controller_));
impl->column_family_memtables_.reset(
new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet()));
impl->mutex_.Lock();
Status s = impl->Recover(column_families, true, false, false);
if (s.ok()) {
for (auto cf : column_families) {
auto cfd =
impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
if (nullptr == cfd) {
s = Status::InvalidArgument("Column family not found: ", cf.name);
break;
}
handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
}
}
SuperVersionContext sv_context(true /* create_superversion */);
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
sv_context.NewSuperVersion();
cfd->InstallSuperVersion(&sv_context, &impl->mutex_);
}
}
impl->mutex_.Unlock();
sv_context.Clean();
if (s.ok()) {
*dbptr = impl;
for (auto h : *handles) {
impl->NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(h)->cfd());
}
} else {
for (auto h : *handles) {
delete h;
}
handles->clear();
delete impl;
}
return s;
}
#else // !ROCKSDB_LITE
Status DB::OpenAsSecondary(const Options& /*options*/,
const std::string& /*name*/,
const std::string& /*secondary_path*/,
DB** /*dbptr*/) {
return Status::NotSupported("Not supported in ROCKSDB_LITE.");
}
Status DB::OpenAsSecondary(
const DBOptions& /*db_options*/, const std::string& /*dbname*/,
const std::string& /*secondary_path*/,
const std::vector<ColumnFamilyDescriptor>& /*column_families*/,
std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) {
return Status::NotSupported("Not supported in ROCKSDB_LITE.");
}
#endif // !ROCKSDB_LITE
} // namespace rocksdb

151
db/db_impl_secondary.h Normal file

@ -0,0 +1,151 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "db/db_impl.h"
namespace rocksdb {
class DBImplSecondary : public DBImpl {
public:
DBImplSecondary(const DBOptions& options, const std::string& dbname);
~DBImplSecondary() override;
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only, bool error_if_log_file_exist,
bool error_if_data_exists_in_logs) override;
// Implementations of the DB interface
using DB::Get;
Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value) override;
Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* value);
using DBImpl::NewIterator;
Iterator* NewIterator(const ReadOptions&,
ColumnFamilyHandle* column_family) override;
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
ColumnFamilyData* cfd,
SequenceNumber snapshot,
ReadCallback* read_callback);
Status NewIterators(const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families,
std::vector<Iterator*>* iterators) override;
using DBImpl::Put;
Status Put(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Merge;
Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Delete;
Status Delete(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::SingleDelete;
Status SingleDelete(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
Status Write(const WriteOptions& /*options*/,
WriteBatch* /*updates*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::CompactRange;
Status CompactRange(const CompactRangeOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/,
const Slice* /*begin*/, const Slice* /*end*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::CompactFiles;
Status CompactFiles(
const CompactionOptions& /*compact_options*/,
ColumnFamilyHandle* /*column_family*/,
const std::vector<std::string>& /*input_file_names*/,
const int /*output_level*/, const int /*output_path_id*/ = -1,
std::vector<std::string>* const /*output_file_names*/ = nullptr,
CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
Status DisableFileDeletions() override {
return Status::NotSupported("Not supported operation in read only mode.");
}
Status EnableFileDeletions(bool /*force*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
Status GetLiveFiles(std::vector<std::string>&,
uint64_t* /*manifest_file_size*/,
bool /*flush_memtable*/ = true) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Flush;
Status Flush(const FlushOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::SyncWAL;
Status SyncWAL() override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DB::IngestExternalFile;
Status IngestExternalFile(
ColumnFamilyHandle* /*column_family*/,
const std::vector<std::string>& /*external_files*/,
const IngestExternalFileOptions& /*ingestion_options*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
// Try to catch up with the primary by reading as much as possible from the
// log files until there is nothing more to read or encounters an error. If
// the amount of information in the log files to process is huge, this
// method can take long time due to all the I/O and CPU costs.
Status TryCatchUpWithPrimary() override;
private:
friend class DB;
// No copying allowed
DBImplSecondary(const DBImplSecondary&);
void operator=(const DBImplSecondary&);
using DBImpl::Recover;
std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
std::unique_ptr<Status> manifest_reader_status_;
};
} // namespace rocksdb
#endif // !ROCKSDB_LITE

480
db/db_secondary_test.cc Normal file

@ -0,0 +1,480 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl_secondary.h"
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/fault_injection_test_env.h"
#include "util/sync_point.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
class DBSecondaryTest : public DBTestBase {
public:
DBSecondaryTest()
: DBTestBase("/db_secondary_test"),
secondary_path_(),
handles_secondary_(),
db_secondary_(nullptr) {
secondary_path_ =
test::PerThreadDBPath(env_, "/db_secondary_test_secondary");
}
~DBSecondaryTest() override {
CloseSecondary();
if (getenv("KEEP_DB") != nullptr) {
fprintf(stdout, "Secondary DB is still at %s\n", secondary_path_.c_str());
} else {
Options options;
options.env = env_;
EXPECT_OK(DestroyDB(secondary_path_, options));
}
}
protected:
Status ReopenAsSecondary(const Options& options) {
return DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_);
}
void OpenSecondary(const Options& options);
void OpenSecondaryWithColumnFamilies(
const std::vector<std::string>& column_families, const Options& options);
void CloseSecondary() {
for (auto h : handles_secondary_) {
db_secondary_->DestroyColumnFamilyHandle(h);
}
handles_secondary_.clear();
delete db_secondary_;
db_secondary_ = nullptr;
}
DBImplSecondary* db_secondary_full() {
return static_cast<DBImplSecondary*>(db_secondary_);
}
void CheckFileTypeCounts(const std::string& dir, int expected_log,
int expected_sst, int expected_manifest) const;
std::string secondary_path_;
std::vector<ColumnFamilyHandle*> handles_secondary_;
DB* db_secondary_;
};
void DBSecondaryTest::OpenSecondary(const Options& options) {
Status s =
DB::OpenAsSecondary(options, dbname_, secondary_path_, &db_secondary_);
ASSERT_OK(s);
}
void DBSecondaryTest::OpenSecondaryWithColumnFamilies(
const std::vector<std::string>& column_families, const Options& options) {
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options);
for (const auto& cf_name : column_families) {
cf_descs.emplace_back(cf_name, options);
}
Status s = DB::OpenAsSecondary(options, dbname_, secondary_path_, cf_descs,
&handles_secondary_, &db_secondary_);
ASSERT_OK(s);
}
void DBSecondaryTest::CheckFileTypeCounts(const std::string& dir,
int expected_log, int expected_sst,
int expected_manifest) const {
std::vector<std::string> filenames;
env_->GetChildren(dir, &filenames);
int log_cnt = 0, sst_cnt = 0, manifest_cnt = 0;
for (auto file : filenames) {
uint64_t number;
FileType type;
if (ParseFileName(file, &number, &type)) {
log_cnt += (type == kLogFile);
sst_cnt += (type == kTableFile);
manifest_cnt += (type == kDescriptorFile);
}
}
ASSERT_EQ(expected_log, log_cnt);
ASSERT_EQ(expected_sst, sst_cnt);
ASSERT_EQ(expected_manifest, manifest_cnt);
}
TEST_F(DBSecondaryTest, ReopenAsSecondary) {
Options options;
options.env = env_;
Reopen(options);
ASSERT_OK(Put("foo", "foo_value"));
ASSERT_OK(Put("bar", "bar_value"));
ASSERT_OK(dbfull()->Flush(FlushOptions()));
Close();
ASSERT_OK(ReopenAsSecondary(options));
ASSERT_EQ("foo_value", Get("foo"));
ASSERT_EQ("bar_value", Get("bar"));
ReadOptions ropts;
ropts.verify_checksums = true;
auto db1 = static_cast<DBImplSecondary*>(db_);
ASSERT_NE(nullptr, db1);
Iterator* iter = db1->NewIterator(ropts);
ASSERT_NE(nullptr, iter);
size_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (0 == count) {
ASSERT_EQ("bar", iter->key().ToString());
ASSERT_EQ("bar_value", iter->value().ToString());
} else if (1 == count) {
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ("foo_value", iter->value().ToString());
}
++count;
}
delete iter;
ASSERT_EQ(2, count);
}
TEST_F(DBSecondaryTest, OpenAsSecondary) {
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Reopen(options);
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
ASSERT_OK(Flush());
}
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ReadOptions ropts;
ropts.verify_checksums = true;
const auto verify_db_func = [&](const std::string& foo_val,
const std::string& bar_val) {
std::string value;
ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
ASSERT_EQ(foo_val, value);
ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
ASSERT_EQ(bar_val, value);
Iterator* iter = db_secondary_->NewIterator(ropts);
ASSERT_NE(nullptr, iter);
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ(foo_val, iter->value().ToString());
iter->Seek("bar");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar", iter->key().ToString());
ASSERT_EQ(bar_val, iter->value().ToString());
size_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
++count;
}
ASSERT_EQ(2, count);
delete iter;
};
verify_db_func("foo_value2", "bar_value2");
ASSERT_OK(Put("foo", "new_foo_value"));
ASSERT_OK(Put("bar", "new_bar_value"));
ASSERT_OK(Flush());
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
verify_db_func("new_foo_value", "new_bar_value");
}
TEST_F(DBSecondaryTest, OpenWithNonExistColumnFamily) {
Options options;
options.env = env_;
CreateAndReopenWithCF({"pikachu"}, options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
std::vector<ColumnFamilyDescriptor> cf_descs;
cf_descs.emplace_back(kDefaultColumnFamilyName, options1);
cf_descs.emplace_back("pikachu", options1);
cf_descs.emplace_back("eevee", options1);
Status s = DB::OpenAsSecondary(options1, dbname_, secondary_path_, cf_descs,
&handles_secondary_, &db_secondary_);
ASSERT_NOK(s);
}
TEST_F(DBSecondaryTest, OpenWithSubsetOfColumnFamilies) {
Options options;
options.env = env_;
CreateAndReopenWithCF({"pikachu"}, options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
ASSERT_EQ(0, handles_secondary_.size());
ASSERT_NE(nullptr, db_secondary_);
ASSERT_OK(Put(0 /*cf*/, "foo", "foo_value"));
ASSERT_OK(Put(1 /*cf*/, "foo", "foo_value"));
ASSERT_OK(Flush(0 /*cf*/));
ASSERT_OK(Flush(1 /*cf*/));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
ReadOptions ropts;
ropts.verify_checksums = true;
std::string value;
ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
ASSERT_EQ("foo_value", value);
}
TEST_F(DBSecondaryTest, SwitchToNewManifestDuringOpen) {
Options options;
options.env = env_;
Reopen(options);
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->LoadDependency(
{{"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:0",
"VersionSet::ProcessManifestWrites:BeforeNewManifest"},
{"VersionSet::ProcessManifestWrites:AfterNewManifest",
"ReactiveVersionSet::MaybeSwitchManifest:AfterGetCurrentManifestPath:"
"1"}});
SyncPoint::GetInstance()->EnableProcessing();
// Make sure db calls RecoverLogFiles so as to trigger a manifest write,
// which causes the db to switch to a new MANIFEST upon start.
port::Thread ro_db_thread([&]() {
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
CloseSecondary();
});
Reopen(options);
ro_db_thread.join();
}
TEST_F(DBSecondaryTest, MissingTableFileDuringOpen) {
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Reopen(options);
for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
ASSERT_OK(dbfull()->Flush(FlushOptions()));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
ReadOptions ropts;
ropts.verify_checksums = true;
std::string value;
ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
ASSERT_EQ("foo_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
value);
ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
ASSERT_EQ("bar_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
value);
Iterator* iter = db_secondary_->NewIterator(ropts);
ASSERT_NE(nullptr, iter);
iter->Seek("bar");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar", iter->key().ToString());
ASSERT_EQ("bar_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
iter->value().ToString());
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ("foo_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
iter->value().ToString());
size_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
++count;
}
ASSERT_EQ(2, count);
delete iter;
}
TEST_F(DBSecondaryTest, MissingTableFile) {
int table_files_not_exist = 0;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers",
[&](void* arg) {
Status s = *reinterpret_cast<Status*>(arg);
if (s.IsPathNotFound()) {
++table_files_not_exist;
} else if (!s.ok()) {
assert(false); // Should not reach here
}
});
SyncPoint::GetInstance()->EnableProcessing();
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Reopen(options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
ASSERT_OK(Put("foo", "foo_value" + std::to_string(i)));
ASSERT_OK(Put("bar", "bar_value" + std::to_string(i)));
ASSERT_OK(dbfull()->Flush(FlushOptions()));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_NE(nullptr, db_secondary_full());
ReadOptions ropts;
ropts.verify_checksums = true;
std::string value;
ASSERT_NOK(db_secondary_->Get(ropts, "foo", &value));
ASSERT_NOK(db_secondary_->Get(ropts, "bar", &value));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
ASSERT_EQ(options.level0_file_num_compaction_trigger, table_files_not_exist);
ASSERT_OK(db_secondary_->Get(ropts, "foo", &value));
ASSERT_EQ("foo_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
value);
ASSERT_OK(db_secondary_->Get(ropts, "bar", &value));
ASSERT_EQ("bar_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
value);
Iterator* iter = db_secondary_->NewIterator(ropts);
ASSERT_NE(nullptr, iter);
iter->Seek("bar");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bar", iter->key().ToString());
ASSERT_EQ("bar_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
iter->value().ToString());
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("foo", iter->key().ToString());
ASSERT_EQ("foo_value" +
std::to_string(options.level0_file_num_compaction_trigger - 1),
iter->value().ToString());
size_t count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
++count;
}
ASSERT_EQ(2, count);
delete iter;
}
TEST_F(DBSecondaryTest, PrimaryDropColumnFamily) {
Options options;
options.env = env_;
const std::string kCfName1 = "pikachu";
CreateAndReopenWithCF({kCfName1}, options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondaryWithColumnFamilies({kCfName1}, options1);
ASSERT_EQ(2, handles_secondary_.size());
ASSERT_OK(Put(1 /*cf*/, "foo", "foo_val_1"));
ASSERT_OK(Flush(1 /*cf*/));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
ReadOptions ropts;
ropts.verify_checksums = true;
std::string value;
ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
ASSERT_EQ("foo_val_1", value);
ASSERT_OK(dbfull()->DropColumnFamily(handles_[1]));
Close();
CheckFileTypeCounts(dbname_, 1, 0, 1);
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
value.clear();
ASSERT_OK(db_secondary_->Get(ropts, handles_secondary_[1], "foo", &value));
ASSERT_EQ("foo_val_1", value);
}
TEST_F(DBSecondaryTest, SwitchManifest) {
Options options;
options.env = env_;
options.level0_file_num_compaction_trigger = 4;
Reopen(options);
Options options1;
options1.env = env_;
options1.max_open_files = -1;
OpenSecondary(options1);
const int kNumFiles = options.level0_file_num_compaction_trigger - 1;
// Keep it smaller than 10 so that key0, key1, ..., key9 are sorted as 0, 1,
// ..., 9.
const int kNumKeys = 10;
// Create two sst
for (int i = 0; i != kNumFiles; ++i) {
for (int j = 0; j != kNumKeys; ++j) {
ASSERT_OK(Put("key" + std::to_string(j), "value_" + std::to_string(i)));
}
ASSERT_OK(Flush());
}
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
const auto& range_scan_db = [&]() {
ReadOptions tmp_ropts;
tmp_ropts.total_order_seek = true;
tmp_ropts.verify_checksums = true;
std::unique_ptr<Iterator> iter(db_secondary_->NewIterator(tmp_ropts));
int cnt = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++cnt) {
ASSERT_EQ("key" + std::to_string(cnt), iter->key().ToString());
ASSERT_EQ("value_" + std::to_string(kNumFiles - 1),
iter->value().ToString());
}
};
range_scan_db();
// While secondary instance still keeps old MANIFEST open, we close primary,
// restart primary, performs full compaction, close again, restart again so
// that next time secondary tries to catch up with primary, the secondary
// will skip the MANIFEST in middle.
Reopen(options);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
Reopen(options);
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
ASSERT_OK(db_secondary_->TryCatchUpWithPrimary());
range_scan_db();
}
#endif //! ROCKSDB_LITE
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -24,8 +24,7 @@ Reader::Reporter::~Reporter() {
Reader::Reader(std::shared_ptr<Logger> info_log,
std::unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t log_num,
bool retry_after_eof)
Reporter* reporter, bool checksum, uint64_t log_num)
: info_log_(info_log),
file_(std::move(_file)),
reporter_(reporter),
@ -38,8 +37,7 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
last_record_offset_(0),
end_of_buffer_offset_(0),
log_number_(log_num),
recycled_(false),
retry_after_eof_(retry_after_eof) {}
recycled_(false) {}
Reader::~Reader() {
delete[] backing_store_;
@ -207,14 +205,14 @@ void Reader::UnmarkEOF() {
if (read_error_) {
return;
}
eof_ = false;
// If retry_after_eof_ is true, we have to proceed to read anyway.
if (!retry_after_eof_ && eof_offset_ == 0) {
if (eof_offset_ == 0) {
return;
}
UnmarkEOFInternal();
}
void Reader::UnmarkEOFInternal() {
// If the EOF was in the middle of a block (a partial block was read) we have
// to read the rest of the block as ReadPhysicalRecord can only read full
// blocks and expects the file position indicator to be aligned to the start
@ -292,12 +290,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) {
} else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
eof_ = true;
eof_offset_ = buffer_.size();
TEST_SYNC_POINT("LogReader::ReadMore:FirstEOF");
}
return true;
} else if (retry_after_eof_ && !read_error_) {
UnmarkEOF();
return !read_error_;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
@ -355,24 +349,16 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
}
}
if (header_size + length > buffer_.size()) {
if (!retry_after_eof_) {
*drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
return kBadRecordLen;
}
// If the end of the file has been reached without reading |length|
// bytes of payload, assume the writer died in the middle of writing the
// record. Don't report a corruption unless requested.
if (*drop_size) {
return kBadHeader;
}
} else {
int r = kEof;
if (!ReadMore(drop_size, &r)) {
return r;
}
continue;
*drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
return kBadRecordLen;
}
// If the end of the file has been reached without reading |length|
// bytes of payload, assume the writer died in the middle of writing the
// record. Don't report a corruption unless requested.
if (*drop_size) {
return kBadHeader;
}
return kEof;
}
@ -409,5 +395,229 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
}
}
bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode /*unused*/) {
assert(record != nullptr);
assert(scratch != nullptr);
record->clear();
scratch->clear();
uint64_t prospective_record_offset = 0;
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
size_t drop_size = 0;
unsigned int fragment_type_or_err = 0; // Initialize to make compiler happy
Slice fragment;
while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) {
switch (fragment_type_or_err) {
case kFullType:
case kRecyclableFullType:
if (in_fragmented_record_ && !fragments_.empty()) {
ReportCorruption(fragments_.size(), "partial record without end(1)");
}
fragments_.clear();
*record = fragment;
prospective_record_offset = physical_record_offset;
last_record_offset_ = prospective_record_offset;
in_fragmented_record_ = false;
return true;
case kFirstType:
case kRecyclableFirstType:
if (in_fragmented_record_ || !fragments_.empty()) {
ReportCorruption(fragments_.size(), "partial record without end(2)");
}
prospective_record_offset = physical_record_offset;
fragments_.assign(fragment.data(), fragment.size());
in_fragmented_record_ = true;
break;
case kMiddleType:
case kRecyclableMiddleType:
if (!in_fragmented_record_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
fragments_.append(fragment.data(), fragment.size());
}
break;
case kLastType:
case kRecyclableLastType:
if (!in_fragmented_record_) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
fragments_.append(fragment.data(), fragment.size());
scratch->assign(fragments_.data(), fragments_.size());
fragments_.clear();
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
in_fragmented_record_ = false;
return true;
}
break;
case kBadHeader:
case kBadRecord:
case kEof:
case kOldRecord:
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;
case kBadRecordChecksum:
if (recycled_) {
fragments_.clear();
return false;
}
ReportCorruption(drop_size, "checksum mismatch");
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u",
fragment_type_or_err);
ReportCorruption(
fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0),
buf);
in_fragmented_record_ = false;
fragments_.clear();
break;
}
}
}
return false;
}
void FragmentBufferedReader::UnmarkEOF() {
if (read_error_) {
return;
}
eof_ = false;
UnmarkEOFInternal();
}
bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) {
if (!eof_ && !read_error_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
read_error_ = true;
*error = kEof;
return false;
} else if (buffer_.size() < static_cast<size_t>(kBlockSize)) {
eof_ = true;
eof_offset_ = buffer_.size();
TEST_SYNC_POINT_CALLBACK(
"FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr);
}
return true;
} else if (!read_error_) {
UnmarkEOF();
}
if (!read_error_) {
return true;
}
*error = kEof;
*drop_size = buffer_.size();
if (buffer_.size() > 0) {
*error = kBadHeader;
}
buffer_.clear();
return false;
}
// return true if the caller should process the fragment_type_or_err.
bool FragmentBufferedReader::TryReadFragment(
Slice* fragment, size_t* drop_size, unsigned int* fragment_type_or_err) {
assert(fragment != nullptr);
assert(drop_size != nullptr);
assert(fragment_type_or_err != nullptr);
while (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
size_t old_size = buffer_.size();
int error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
} else if (old_size == buffer_.size()) {
return false;
}
}
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
int header_size = kHeaderSize;
if (type >= kRecyclableFullType && type <= kRecyclableLastType) {
if (end_of_buffer_offset_ - buffer_.size() == 0) {
recycled_ = true;
}
header_size = kRecyclableHeaderSize;
while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) {
size_t old_size = buffer_.size();
int error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
} else if (old_size == buffer_.size()) {
return false;
}
}
const uint32_t log_num = DecodeFixed32(header + 7);
if (log_num != log_number_) {
*fragment_type_or_err = kOldRecord;
return true;
}
}
while (header_size + length > buffer_.size()) {
size_t old_size = buffer_.size();
int error = kEof;
if (!TryReadMore(drop_size, &error)) {
*fragment_type_or_err = error;
return false;
} else if (old_size == buffer_.size()) {
return false;
}
}
if (type == kZeroType && length == 0) {
buffer_.clear();
*fragment_type_or_err = kBadRecord;
return true;
}
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6);
if (actual_crc != expected_crc) {
*drop_size = buffer_.size();
buffer_.clear();
*fragment_type_or_err = kBadRecordChecksum;
return true;
}
}
buffer_.remove_prefix(header_size + length);
*fragment = Slice(header + header_size, length);
*fragment_type_or_err = type;
return true;
}
} // namespace log
} // namespace rocksdb

@ -53,18 +53,18 @@ class Reader {
Reader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in
std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
bool checksum, uint64_t log_num, bool retry_after_eof);
bool checksum, uint64_t log_num);
~Reader();
virtual ~Reader();
// Read the next record into *record. Returns true if read
// successfully, false if we hit end of the input. May use
// "*scratch" as temporary storage. The contents filled in *record
// will only be valid until the next mutating operation on this
// reader or the next mutation to *scratch.
bool ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords);
virtual bool ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords);
// Returns the physical offset of the last record returned by ReadRecord.
//
@ -76,21 +76,28 @@ class Reader {
return eof_;
}
// returns true if the reader has encountered read error.
bool hasReadError() const { return read_error_; }
// when we know more data has been written to the file. we can use this
// function to force the reader to look again in the file.
// Also aligns the file position indicator to the start of the next block
// by reading the rest of the data from the EOF position to the end of the
// block that was partially read.
void UnmarkEOF();
virtual void UnmarkEOF();
SequentialFileReader* file() { return file_.get(); }
private:
Reporter* GetReporter() const { return reporter_; }
protected:
std::shared_ptr<Logger> info_log_;
const std::unique_ptr<SequentialFileReader> file_;
Reporter* const reporter_;
bool const checksum_;
char* const backing_store_;
// Internal state variables used for reading records
Slice buffer_;
bool eof_; // Last Read() indicated EOF by returning < kBlockSize
bool read_error_; // Error occurred while reading from file
@ -110,11 +117,6 @@ class Reader {
// Whether this is a recycled log file
bool recycled_;
// Whether retry after encountering EOF
// TODO (yanqin) add support for retry policy, e.g. sleep, max retry limit,
// etc.
const bool retry_after_eof_;
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,
@ -139,15 +141,47 @@ class Reader {
// Read some more
bool ReadMore(size_t* drop_size, int *error);
void UnmarkEOFInternal();
// Reports dropped bytes to the reporter.
// buffer_ must be updated to remove the dropped bytes prior to invocation.
void ReportCorruption(size_t bytes, const char* reason);
void ReportDrop(size_t bytes, const Status& reason);
private:
// No copying allowed
Reader(const Reader&);
void operator=(const Reader&);
};
class FragmentBufferedReader : public Reader {
public:
FragmentBufferedReader(std::shared_ptr<Logger> info_log,
// @lint-ignore TXT2 T25377293 Grandfathered in
std::unique_ptr<SequentialFileReader>&& _file,
Reporter* reporter, bool checksum, uint64_t log_num)
: Reader(info_log, std::move(_file), reporter, checksum, log_num),
fragments_(),
in_fragmented_record_(false) {}
~FragmentBufferedReader() override {}
bool ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords) override;
void UnmarkEOF() override;
private:
std::string fragments_;
bool in_fragmented_record_;
bool TryReadFragment(Slice* result, size_t* drop_size,
unsigned int* fragment_type_or_err);
bool TryReadMore(size_t* drop_size, int* error);
// No copy allowed
FragmentBufferedReader(const FragmentBufferedReader&);
void operator=(const FragmentBufferedReader&);
};
} // namespace log
} // namespace rocksdb

@ -43,7 +43,10 @@ static std::string RandomSkewedString(int i, Random* rnd) {
return BigString(NumberString(i), rnd->Skewed(17));
}
class LogTest : public ::testing::TestWithParam<int> {
// Param type is tuple<int, bool>
// get<0>(tuple): non-zero if recycling log, zero if regular log
// get<1>(tuple): true if allow retry after read EOF, false otherwise
class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
private:
class StringSource : public SequentialFile {
public:
@ -53,16 +56,20 @@ class LogTest : public ::testing::TestWithParam<int> {
bool force_eof_;
size_t force_eof_position_;
bool returned_partial_;
explicit StringSource(Slice& contents) :
contents_(contents),
force_error_(false),
force_error_position_(0),
force_eof_(false),
force_eof_position_(0),
returned_partial_(false) { }
bool fail_after_read_partial_;
explicit StringSource(Slice& contents, bool fail_after_read_partial)
: contents_(contents),
force_error_(false),
force_error_position_(0),
force_eof_(false),
force_eof_position_(0),
returned_partial_(false),
fail_after_read_partial_(fail_after_read_partial) {}
Status Read(size_t n, Slice* result, char* scratch) override {
EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
if (fail_after_read_partial_) {
EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
}
if (force_error_) {
if (force_error_position_ >= n) {
@ -139,7 +146,7 @@ class LogTest : public ::testing::TestWithParam<int> {
}
void reset_source_contents() {
auto src = dynamic_cast<StringSource*>(reader_.file()->file());
auto src = dynamic_cast<StringSource*>(reader_->file()->file());
assert(src);
src->contents_ = dest_contents();
}
@ -149,11 +156,10 @@ class LogTest : public ::testing::TestWithParam<int> {
std::unique_ptr<SequentialFileReader> source_holder_;
ReportCollector report_;
Writer writer_;
Reader reader_;
std::unique_ptr<Reader> reader_;
// Record metadata for testing initial offset functionality
static size_t initial_offset_record_sizes_[];
uint64_t initial_offset_last_record_offsets_[4];
protected:
bool allow_retry_read_;
public:
LogTest()
@ -161,18 +167,18 @@ class LogTest : public ::testing::TestWithParam<int> {
dest_holder_(test::GetWritableFileWriter(
new test::StringSink(&reader_contents_), "" /* don't care */)),
source_holder_(test::GetSequentialFileReader(
new StringSource(reader_contents_), "" /* file name */)),
writer_(std::move(dest_holder_), 123, GetParam()),
reader_(nullptr, std::move(source_holder_), &report_,
true /* checksum */, 123 /* log_number */,
false /* retry_after_eof */) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
initial_offset_last_record_offsets_[0] = 0;
initial_offset_last_record_offsets_[1] = header_size + 10000;
initial_offset_last_record_offsets_[2] = 2 * (header_size + 10000);
initial_offset_last_record_offsets_[3] = 2 * (header_size + 10000) +
(2 * log::kBlockSize - 1000) +
3 * header_size;
new StringSource(reader_contents_, !std::get<1>(GetParam())),
"" /* file name */)),
writer_(std::move(dest_holder_), 123, std::get<0>(GetParam())),
allow_retry_read_(std::get<1>(GetParam())) {
if (allow_retry_read_) {
reader_.reset(new FragmentBufferedReader(
nullptr, std::move(source_holder_), &report_, true /* checksum */,
123 /* log_number */));
} else {
reader_.reset(new Reader(nullptr, std::move(source_holder_), &report_,
true /* checksum */, 123 /* log_number */));
}
}
Slice* get_reader_contents() { return &reader_contents_; }
@ -189,7 +195,9 @@ class LogTest : public ::testing::TestWithParam<int> {
WALRecoveryMode::kTolerateCorruptedTailRecords) {
std::string scratch;
Slice record;
if (reader_.ReadRecord(&record, &scratch, wal_recovery_mode)) {
bool ret = false;
ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode);
if (ret) {
return record.ToString();
} else {
return "EOF";
@ -221,7 +229,7 @@ class LogTest : public ::testing::TestWithParam<int> {
}
void ForceError(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file()->file());
auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->force_error_ = true;
src->force_error_position_ = position;
}
@ -235,20 +243,18 @@ class LogTest : public ::testing::TestWithParam<int> {
}
void ForceEOF(size_t position = 0) {
auto src = dynamic_cast<StringSource*>(reader_.file()->file());
auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->force_eof_ = true;
src->force_eof_position_ = position;
}
void UnmarkEOF() {
auto src = dynamic_cast<StringSource*>(reader_.file()->file());
auto src = dynamic_cast<StringSource*>(reader_->file()->file());
src->returned_partial_ = false;
reader_.UnmarkEOF();
reader_->UnmarkEOF();
}
bool IsEOF() {
return reader_.IsEOF();
}
bool IsEOF() { return reader_->IsEOF(); }
// Returns OK iff recorded error message contains "msg"
std::string MatchError(const std::string& msg) const {
@ -258,23 +264,8 @@ class LogTest : public ::testing::TestWithParam<int> {
return "OK";
}
}
void WriteInitialOffsetLog() {
for (int i = 0; i < 4; i++) {
std::string record(initial_offset_record_sizes_[i],
static_cast<char>('a' + i));
Write(record);
}
}
};
size_t LogTest::initial_offset_record_sizes_[] =
{10000, // Two sizable records in first block
10000,
2 * log::kBlockSize - 1000, // Span three blocks
1};
TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
TEST_P(LogTest, ReadWrite) {
@ -312,7 +303,8 @@ TEST_P(LogTest, Fragmentation) {
TEST_P(LogTest, MarginalTrailer) {
// Make a trailer that is exactly the same length as an empty record.
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
int header_size =
std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
@ -326,7 +318,8 @@ TEST_P(LogTest, MarginalTrailer) {
TEST_P(LogTest, MarginalTrailer2) {
// Make a trailer that is exactly the same length as an empty record.
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
int header_size =
std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size), WrittenBytes());
@ -339,7 +332,8 @@ TEST_P(LogTest, MarginalTrailer2) {
}
TEST_P(LogTest, ShortTrailer) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
int header_size =
std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
@ -352,7 +346,8 @@ TEST_P(LogTest, ShortTrailer) {
}
TEST_P(LogTest, AlignedEof) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
int header_size =
std::get<0>(GetParam()) ? kRecyclableHeaderSize : kHeaderSize;
const int n = kBlockSize - 2 * header_size + 4;
Write(BigString("foo", n));
ASSERT_EQ((unsigned int)(kBlockSize - header_size + 4), WrittenBytes());
@ -403,6 +398,11 @@ TEST_P(LogTest, TruncatedTrailingRecordIsIgnored) {
}
TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
if (allow_retry_read_) {
// If read retry is allowed, then truncated trailing record should not
// raise an error.
return;
}
Write("foo");
ShrinkSize(4); // Drop all payload as well as a header byte
ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
@ -412,13 +412,20 @@ TEST_P(LogTest, TruncatedTrailingRecordIsNotIgnored) {
}
TEST_P(LogTest, BadLength) {
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
if (allow_retry_read_) {
// If read retry is allowed, then we should not raise an error when the
// record length specified in header is longer than data currently
// available. It's possible that the body of the record is not written yet.
return;
}
bool recyclable_log = (std::get<0>(GetParam()) != 0);
int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
const int kPayloadSize = kBlockSize - header_size;
Write(BigString("bar", kPayloadSize));
Write("foo");
// Least significant size byte is stored in header[4].
IncrementByte(4, 1);
if (!GetParam()) {
if (!recyclable_log) {
ASSERT_EQ("foo", Read());
ASSERT_EQ(kBlockSize, DroppedBytes());
ASSERT_EQ("OK", MatchError("bad record length"));
@ -428,6 +435,12 @@ TEST_P(LogTest, BadLength) {
}
TEST_P(LogTest, BadLengthAtEndIsIgnored) {
if (allow_retry_read_) {
// If read retry is allowed, then we should not raise an error when the
// record length specified in header is longer than data currently
// available. It's possible that the body of the record is not written yet.
return;
}
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read());
@ -436,6 +449,12 @@ TEST_P(LogTest, BadLengthAtEndIsIgnored) {
}
TEST_P(LogTest, BadLengthAtEndIsNotIgnored) {
if (allow_retry_read_) {
// If read retry is allowed, then we should not raise an error when the
// record length specified in header is longer than data currently
// available. It's possible that the body of the record is not written yet.
return;
}
Write("foo");
ShrinkSize(1);
ASSERT_EQ("EOF", Read(WALRecoveryMode::kAbsoluteConsistency));
@ -447,7 +466,8 @@ TEST_P(LogTest, ChecksumMismatch) {
Write("foooooo");
IncrementByte(0, 14);
ASSERT_EQ("EOF", Read());
if (!GetParam()) {
bool recyclable_log = (std::get<0>(GetParam()) != 0);
if (!recyclable_log) {
ASSERT_EQ(14U, DroppedBytes());
ASSERT_EQ("OK", MatchError("checksum mismatch"));
} else {
@ -458,8 +478,10 @@ TEST_P(LogTest, ChecksumMismatch) {
TEST_P(LogTest, UnexpectedMiddleType) {
Write("foo");
SetByte(6, static_cast<char>(GetParam() ? kRecyclableMiddleType : kMiddleType));
FixChecksum(0, 3, !!GetParam());
bool recyclable_log = (std::get<0>(GetParam()) != 0);
SetByte(6, static_cast<char>(recyclable_log ? kRecyclableMiddleType
: kMiddleType));
FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
@ -467,8 +489,10 @@ TEST_P(LogTest, UnexpectedMiddleType) {
TEST_P(LogTest, UnexpectedLastType) {
Write("foo");
SetByte(6, static_cast<char>(GetParam() ? kRecyclableLastType : kLastType));
FixChecksum(0, 3, !!GetParam());
bool recyclable_log = (std::get<0>(GetParam()) != 0);
SetByte(6,
static_cast<char>(recyclable_log ? kRecyclableLastType : kLastType));
FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
ASSERT_EQ("OK", MatchError("missing start"));
@ -477,8 +501,10 @@ TEST_P(LogTest, UnexpectedLastType) {
TEST_P(LogTest, UnexpectedFullType) {
Write("foo");
Write("bar");
SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
FixChecksum(0, 3, !!GetParam());
bool recyclable_log = (std::get<0>(GetParam()) != 0);
SetByte(
6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ("bar", Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
@ -488,8 +514,10 @@ TEST_P(LogTest, UnexpectedFullType) {
TEST_P(LogTest, UnexpectedFirstType) {
Write("foo");
Write(BigString("bar", 100000));
SetByte(6, static_cast<char>(GetParam() ? kRecyclableFirstType : kFirstType));
FixChecksum(0, 3, !!GetParam());
bool recyclable_log = (std::get<0>(GetParam()) != 0);
SetByte(
6, static_cast<char>(recyclable_log ? kRecyclableFirstType : kFirstType));
FixChecksum(0, 3, !!recyclable_log);
ASSERT_EQ(BigString("bar", 100000), Read());
ASSERT_EQ("EOF", Read());
ASSERT_EQ(3U, DroppedBytes());
@ -506,6 +534,11 @@ TEST_P(LogTest, MissingLastIsIgnored) {
}
TEST_P(LogTest, MissingLastIsNotIgnored) {
if (allow_retry_read_) {
// If read retry is allowed, then truncated trailing record should not
// raise an error.
return;
}
Write(BigString("bar", kBlockSize));
// Remove the LAST block, including header.
ShrinkSize(14);
@ -524,6 +557,11 @@ TEST_P(LogTest, PartialLastIsIgnored) {
}
TEST_P(LogTest, PartialLastIsNotIgnored) {
if (allow_retry_read_) {
// If read retry is allowed, then truncated trailing record should not
// raise an error.
return;
}
Write(BigString("bar", kBlockSize));
// Cause a bad record length in the LAST block.
ShrinkSize(1);
@ -550,7 +588,8 @@ TEST_P(LogTest, ErrorJoinsRecords) {
SetByte(offset, 'x');
}
if (!GetParam()) {
bool recyclable_log = (std::get<0>(GetParam()) != 0);
if (!recyclable_log) {
ASSERT_EQ("correct", Read());
ASSERT_EQ("EOF", Read());
size_t dropped = DroppedBytes();
@ -564,7 +603,8 @@ TEST_P(LogTest, ErrorJoinsRecords) {
TEST_P(LogTest, ClearEofSingleBlock) {
Write("foo");
Write("bar");
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
bool recyclable_log = (std::get<0>(GetParam()) != 0);
int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
ForceEOF(3 + header_size + 2);
ASSERT_EQ("foo", Read());
UnmarkEOF();
@ -579,7 +619,8 @@ TEST_P(LogTest, ClearEofSingleBlock) {
TEST_P(LogTest, ClearEofMultiBlock) {
size_t num_full_blocks = 5;
int header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
bool recyclable_log = (std::get<0>(GetParam()) != 0);
int header_size = recyclable_log ? kRecyclableHeaderSize : kHeaderSize;
size_t n = (kBlockSize - header_size) * num_full_blocks + 25;
Write(BigString("foo", n));
Write(BigString("bar", n));
@ -628,7 +669,8 @@ TEST_P(LogTest, ClearEofError2) {
}
TEST_P(LogTest, Recycle) {
if (!GetParam()) {
bool recyclable_log = (std::get<0>(GetParam()) != 0);
if (!recyclable_log) {
return; // test is only valid for recycled logs
}
Write("foo");
@ -651,7 +693,11 @@ TEST_P(LogTest, Recycle) {
ASSERT_EQ("EOF", Read());
}
INSTANTIATE_TEST_CASE_P(bool, LogTest, ::testing::Values(0, 2));
INSTANTIATE_TEST_CASE_P(bool, LogTest,
::testing::Values(std::make_tuple(0, false),
std::make_tuple(0, true),
std::make_tuple(1, false),
std::make_tuple(1, true)));
class RetriableLogTest : public ::testing::TestWithParam<int> {
private:
@ -677,7 +723,7 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
std::unique_ptr<WritableFileWriter> writer_;
std::unique_ptr<SequentialFileReader> reader_;
ReportCollector report_;
std::unique_ptr<Reader> log_reader_;
std::unique_ptr<FragmentBufferedReader> log_reader_;
public:
RetriableLogTest()
@ -716,9 +762,9 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
if (s.ok()) {
reader_.reset(new SequentialFileReader(std::move(seq_file), log_file_));
assert(reader_ != nullptr);
log_reader_.reset(new Reader(nullptr, std::move(reader_), &report_,
true /* checksum */, 123 /* log_number */,
true /* retry_after_eof */));
log_reader_.reset(new FragmentBufferedReader(
nullptr, std::move(reader_), &report_, true /* checksum */,
123 /* log_number */));
assert(log_reader_ != nullptr);
}
return s;
@ -738,14 +784,17 @@ class RetriableLogTest : public ::testing::TestWithParam<int> {
writer_->Sync(true);
}
std::string Read() {
auto wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
bool TryRead(std::string* result) {
assert(result != nullptr);
result->clear();
std::string scratch;
Slice record;
if (log_reader_->ReadRecord(&record, &scratch, wal_recovery_mode)) {
return record.ToString();
bool r = log_reader_->ReadRecord(&record, &scratch);
if (r) {
result->assign(record.data(), record.size());
return true;
} else {
return "Read error";
return false;
}
}
};
@ -754,12 +803,17 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) {
ASSERT_OK(SetupTestEnv());
std::vector<int> remaining_bytes_in_last_record;
size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
bool eof = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RetriableLogTest::TailLog:AfterPart1",
"RetriableLogTest::TailLog:BeforeReadRecord"},
{"LogReader::ReadMore:FirstEOF",
{"FragmentBufferedLogReader::TryReadMore:FirstEOF",
"RetriableLogTest::TailLog:BeforePart2"}});
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"FragmentBufferedLogReader::TryReadMore:FirstEOF",
[&](void* /*arg*/) { eof = true; });
SyncPoint::GetInstance()->EnableProcessing();
size_t delta = header_size - 1;
@ -779,23 +833,30 @@ TEST_P(RetriableLogTest, TailLog_PartialHeader) {
std::string record;
port::Thread log_reader_thread([&]() {
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
record = Read();
while (!TryRead(&record)) {
}
});
log_reader_thread.join();
log_writer_thread.join();
ASSERT_EQ("foo", record);
ASSERT_TRUE(eof);
}
TEST_P(RetriableLogTest, TailLog_FullHeader) {
ASSERT_OK(SetupTestEnv());
std::vector<int> remaining_bytes_in_last_record;
size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
bool eof = false;
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RetriableLogTest::TailLog:AfterPart1",
"RetriableLogTest::TailLog:BeforeReadRecord"},
{"LogReader::ReadMore:FirstEOF",
{"FragmentBufferedLogReader::TryReadMore:FirstEOF",
"RetriableLogTest::TailLog:BeforePart2"}});
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"FragmentBufferedLogReader::TryReadMore:FirstEOF",
[&](void* /*arg*/) { eof = true; });
SyncPoint::GetInstance()->EnableProcessing();
size_t delta = header_size + 1;
@ -810,18 +871,45 @@ TEST_P(RetriableLogTest, TailLog_FullHeader) {
TEST_SYNC_POINT("RetriableLogTest::TailLog:AfterPart1");
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforePart2");
Write(Slice(part2));
ASSERT_TRUE(eof);
});
std::string record;
port::Thread log_reader_thread([&]() {
TEST_SYNC_POINT("RetriableLogTest::TailLog:BeforeReadRecord");
record = Read();
while (!TryRead(&record)) {
}
});
log_reader_thread.join();
log_writer_thread.join();
ASSERT_EQ("foo", record);
}
TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
// Clear all sync point callbacks even if this test does not use sync point.
// It is necessary, otherwise the execute of this test may hit a sync point
// with which a callback is registered. The registered callback may access
// some dead variable, causing segfault.
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
ASSERT_OK(SetupTestEnv());
size_t header_size = GetParam() ? kRecyclableHeaderSize : kHeaderSize;
size_t delta = header_size - 1;
size_t old_sz = contents().size();
Encode("foo-bar");
size_t new_sz = contents().size();
std::string part1 = contents().substr(old_sz, delta);
std::string part2 =
contents().substr(old_sz + delta, new_sz - old_sz - delta);
Write(Slice(part1));
std::string record;
ASSERT_FALSE(TryRead(&record));
ASSERT_TRUE(record.empty());
Write(Slice(part2));
ASSERT_TRUE(TryRead(&record));
ASSERT_EQ("foo-bar", record);
}
INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
} // namespace log

@ -364,8 +364,7 @@ class Repairer {
// propagating bad information (like overly large sequence
// numbers).
log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
true /*enable checksum*/, log,
false /* retry_after_eof */);
true /*enable checksum*/, log);
// Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) {

@ -315,8 +315,7 @@ Status TransactionLogIteratorImpl::OpenLogReader(const LogFile* logFile) {
assert(file);
currentLogReader_.reset(
new log::Reader(options_->info_log, std::move(file), &reporter_,
read_options_.verify_checksums_, logFile->LogNumber(),
false /* retry_after_eof */));
read_options_.verify_checksums_, logFile->LogNumber()));
return Status::OK();
}
} // namespace rocksdb

@ -364,10 +364,10 @@ class VersionBuilder::Rep {
CheckConsistency(vstorage);
}
void LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor) {
Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor) {
assert(table_cache_ != nullptr);
size_t table_cache_capacity = table_cache_->get_cache()->GetCapacity();
@ -394,7 +394,8 @@ class VersionBuilder::Rep {
size_t table_cache_usage = table_cache_->get_cache()->GetUsage();
if (table_cache_usage >= load_limit) {
return;
// TODO (yanqin) find a suitable status code.
return Status::OK();
} else {
max_load = load_limit - table_cache_usage;
}
@ -402,11 +403,15 @@ class VersionBuilder::Rep {
// <file metadata, level>
std::vector<std::pair<FileMetaData*, int>> files_meta;
std::vector<Status> statuses;
for (int level = 0; level < num_levels_; level++) {
for (auto& file_meta_pair : levels_[level].added_files) {
auto* file_meta = file_meta_pair.second;
assert(!file_meta->table_reader_handle);
files_meta.emplace_back(file_meta, level);
// If the file has been opened before, just skip it.
if (!file_meta->table_reader_handle) {
files_meta.emplace_back(file_meta, level);
statuses.emplace_back(Status::OK());
}
if (files_meta.size() >= max_load) {
break;
}
@ -426,7 +431,7 @@ class VersionBuilder::Rep {
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
table_cache_->FindTable(
statuses[file_idx] = table_cache_->FindTable(
env_options_, *(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle, prefix_extractor,
false /*no_io */, true /* record_read_stats */,
@ -448,6 +453,12 @@ class VersionBuilder::Rep {
for (auto& t : threads) {
t.join();
}
for (const auto& s : statuses) {
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
@ -487,14 +498,13 @@ void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
}
void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats,
int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor) {
rep_->LoadTableHandlers(internal_stats, max_threads,
prefetch_index_and_filter_in_cache, is_initial_load,
prefix_extractor);
Status VersionBuilder::LoadTableHandlers(
InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache, bool is_initial_load,
const SliceTransform* prefix_extractor) {
return rep_->LoadTableHandlers(internal_stats, max_threads,
prefetch_index_and_filter_in_cache,
is_initial_load, prefix_extractor);
}
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,

@ -33,10 +33,10 @@ class VersionBuilder {
bool CheckConsistencyForNumLevels();
void Apply(VersionEdit* edit);
void SaveTo(VersionStorageInfo* vstorage);
void LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor);
Status LoadTableHandlers(InternalStats* internal_stats, int max_threads,
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor);
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f);
private:

@ -313,6 +313,7 @@ class VersionEdit {
std::string DebugJSON(int edit_num, bool hex_key = false) const;
private:
friend class ReactiveVersionSet;
friend class VersionSet;
friend class Version;

@ -712,6 +712,7 @@ void LevelIterator::InitFileIterator(size_t new_file_index) {
}
}
}
} // anonymous namespace
// A wrapper of version builder which references the current version in
// constructor and unref it in the destructor.
@ -726,16 +727,14 @@ class BaseReferencedVersionBuilder {
version_->Ref();
}
~BaseReferencedVersionBuilder() {
delete version_builder_;
version_->Unref();
}
VersionBuilder* version_builder() { return version_builder_; }
VersionBuilder* version_builder() { return version_builder_.get(); }
private:
VersionBuilder* version_builder_;
std::unique_ptr<VersionBuilder> version_builder_;
Version* version_;
};
} // anonymous namespace
Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
const FileMetaData* file_meta,
@ -2936,7 +2935,7 @@ Status VersionSet::ProcessManifestWrites(
} else if (group_start != std::numeric_limits<size_t>::max()) {
group_start = std::numeric_limits<size_t>::max();
}
LogAndApplyHelper(last_writer->cfd, builder, version, e, mu);
LogAndApplyHelper(last_writer->cfd, builder, e, mu);
batch_edits.push_back(e);
}
}
@ -2990,6 +2989,7 @@ Status VersionSet::ProcessManifestWrites(
assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size) {
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
new_descriptor_log = true;
@ -3098,6 +3098,7 @@ Status VersionSet::ProcessManifestWrites(
if (s.ok() && new_descriptor_log) {
s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
db_directory);
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
}
if (s.ok()) {
@ -3225,7 +3226,7 @@ Status VersionSet::ProcessManifestWrites(
return s;
}
// 'datas' is gramatically incorrect. We still use this notation is to indicate
// 'datas' is gramatically incorrect. We still use this notation to indicate
// that this variable represents a collection of column_family_data.
Status VersionSet::LogAndApply(
const autovector<ColumnFamilyData*>& column_family_datas,
@ -3325,8 +3326,8 @@ void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
}
void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
VersionBuilder* builder, Version* /*v*/,
VersionEdit* edit, InstrumentedMutex* mu) {
VersionBuilder* builder, VersionEdit* edit,
InstrumentedMutex* mu) {
#ifdef NDEBUG
(void)cfd;
#endif
@ -3353,16 +3354,16 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
builder->Apply(edit);
}
Status VersionSet::ApplyOneVersionEdit(
Status VersionSet::ApplyOneVersionEditToBuilder(
VersionEdit& edit,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
bool* have_log_number, uint64_t* /* log_number */,
bool* have_prev_log_number, uint64_t* previous_log_number,
bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
uint32_t* max_column_family) {
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
builders,
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
// Not found means that user didn't supply that column
// family option AND we encountered column family add
// record. Once we encounter column family drop record,
@ -3392,14 +3393,14 @@ Status VersionSet::ApplyOneVersionEdit(
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
builders.insert(std::make_pair(
edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(cfd))));
}
} else if (edit.is_column_family_drop_) {
if (cf_in_builders) {
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
delete builder->second;
builders.erase(builder);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr);
@ -3433,7 +3434,18 @@ Status VersionSet::ApplyOneVersionEdit(
assert(builder != builders.end());
builder->second->version_builder()->Apply(&edit);
}
return ExtractInfoFromVersionEdit(
cfd, edit, have_log_number, log_number, have_prev_log_number,
previous_log_number, have_next_file, next_file, have_last_sequence,
last_sequence, min_log_number_to_keep, max_column_family);
}
Status VersionSet::ExtractInfoFromVersionEdit(
ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family) {
if (cfd != nullptr) {
if (edit.has_log_number_) {
if (cfd->GetLogNumber() > edit.log_number_) {
@ -3444,6 +3456,7 @@ Status VersionSet::ApplyOneVersionEdit(
} else {
cfd->SetLogNumber(edit.log_number_);
*have_log_number = true;
*log_number = edit.log_number_;
}
}
if (edit.has_comparator_ &&
@ -3480,6 +3493,31 @@ Status VersionSet::ApplyOneVersionEdit(
return Status::OK();
}
Status VersionSet::GetCurrentManifestPath(std::string* manifest_path) {
assert(manifest_path != nullptr);
std::string fname;
Status s = ReadFileToString(env_, CurrentFileName(dbname_), &fname);
if (!s.ok()) {
return s;
}
if (fname.empty() || fname.back() != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
// remove the trailing '\n'
fname.resize(fname.size() - 1);
FileType type;
bool parse_ok = ParseFileName(fname, &manifest_file_number_, &type);
if (!parse_ok || type != kDescriptorFile) {
return Status::Corruption("CURRENT file corrupted");
}
*manifest_path = dbname_;
if (dbname_.back() != '/') {
manifest_path->push_back('/');
}
*manifest_path += fname;
return Status::OK();
}
Status VersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
bool read_only) {
@ -3493,43 +3531,28 @@ Status VersionSet::Recover(
std::unordered_map<int, std::string> column_families_not_found;
// Read "CURRENT" file, which contains a pointer to the current manifest file
std::string manifest_filename;
Status s = ReadFileToString(
env_, CurrentFileName(dbname_), &manifest_filename
);
std::string manifest_path;
Status s = GetCurrentManifestPath(&manifest_path);
if (!s.ok()) {
return s;
}
if (manifest_filename.empty() ||
manifest_filename.back() != '\n') {
return Status::Corruption("CURRENT file does not end with newline");
}
// remove the trailing '\n'
manifest_filename.resize(manifest_filename.size() - 1);
FileType type;
bool parse_ok =
ParseFileName(manifest_filename, &manifest_file_number_, &type);
if (!parse_ok || type != kDescriptorFile) {
return Status::Corruption("CURRENT file corrupted");
}
ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
manifest_filename.c_str());
manifest_path.c_str());
manifest_filename = dbname_ + "/" + manifest_filename;
std::unique_ptr<SequentialFileReader> manifest_file_reader;
{
std::unique_ptr<SequentialFile> manifest_file;
s = env_->NewSequentialFile(manifest_filename, &manifest_file,
s = env_->NewSequentialFile(manifest_path, &manifest_file,
env_->OptimizeForManifestRead(env_options_));
if (!s.ok()) {
return s;
}
manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_filename));
new SequentialFileReader(std::move(manifest_file), manifest_path));
}
uint64_t current_manifest_file_size;
s = env_->GetFileSize(manifest_filename, &current_manifest_file_size);
s = env_->GetFileSize(manifest_path, &current_manifest_file_size);
if (!s.ok()) {
return s;
}
@ -3544,7 +3567,8 @@ Status VersionSet::Recover(
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
uint64_t min_log_number_to_keep = 0;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
// add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
@ -3559,14 +3583,15 @@ Status VersionSet::Recover(
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(default_cfd))));
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
true /* checksum */, 0 /* log_number */,
false /* retry_after_eof */);
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
std::vector<VersionEdit> replay_buffer;
@ -3597,7 +3622,7 @@ Status VersionSet::Recover(
TEST_SYNC_POINT_CALLBACK("VersionSet::Recover:LastInAtomicGroup",
&edit);
for (auto& e : replay_buffer) {
s = ApplyOneVersionEdit(
s = ApplyOneVersionEditToBuilder(
e, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
@ -3618,7 +3643,7 @@ Status VersionSet::Recover(
s = Status::Corruption("corrupted atomic group");
break;
}
s = ApplyOneVersionEdit(
s = ApplyOneVersionEditToBuilder(
edit, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
@ -3689,7 +3714,7 @@ Status VersionSet::Recover(
assert(cfd->initialized());
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto* builder = builders_iter->second->version_builder();
auto builder = builders_iter->second->version_builder();
// unlimited table cache. Pre-load table handle now.
// Need to do it out of the mutex.
@ -3725,7 +3750,7 @@ Status VersionSet::Recover(
"prev_log_number is %lu,"
"max_column_family is %u,"
"min_log_number_to_keep is %lu\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
manifest_path.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
@ -3740,10 +3765,6 @@ Status VersionSet::Recover(
}
}
for (auto& builder : builders) {
delete builder.second;
}
return s;
}
@ -3781,8 +3802,7 @@ Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */,
false /* retry_after_eof */);
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -3928,7 +3948,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
uint64_t previous_log_number = 0;
int count = 0;
std::unordered_map<uint32_t, std::string> comparators;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
// add default column family
VersionEdit default_cf_edit;
@ -3936,14 +3957,15 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(default_cfd))));
{
VersionSet::LogReporter reporter;
reporter.status = &s;
log::Reader reader(nullptr, std::move(file_reader), &reporter,
true /* checksum */, 0 /* log_number */,
false /* retry_after_eof */);
true /* checksum */, 0 /* log_number */);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
@ -3978,8 +4000,9 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
}
cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
builders.insert(std::make_pair(
edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(cfd))));
} else if (edit.is_column_family_drop_) {
if (!cf_in_builders) {
s = Status::Corruption(
@ -3987,7 +4010,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
break;
}
auto builder_iter = builders.find(edit.column_family_);
delete builder_iter->second;
builders.erase(builder_iter);
comparators.erase(edit.column_family_);
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
@ -4087,11 +4109,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
delete v;
}
// Free builders
for (auto& builder : builders) {
delete builder.second;
}
next_file_number_.store(next_file + 1);
last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
@ -4583,4 +4600,405 @@ uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
return total_files_size;
}
ReactiveVersionSet::ReactiveVersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
const EnvOptions& _env_options,
Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller)
: VersionSet(dbname, _db_options, _env_options, table_cache,
write_buffer_manager, write_controller) {}
ReactiveVersionSet::~ReactiveVersionSet() {}
Status ReactiveVersionSet::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
std::unique_ptr<Status>* manifest_reader_status) {
assert(manifest_reader != nullptr);
assert(manifest_reporter != nullptr);
assert(manifest_reader_status != nullptr);
std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
for (const auto& cf : column_families) {
cf_name_to_options.insert({cf.name, cf.options});
}
// add default column family
auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
if (default_cf_iter == cf_name_to_options.end()) {
return Status::InvalidArgument("Default column family not specified");
}
VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
uint64_t min_log_number_to_keep = 0;
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
builders;
std::unordered_map<int, std::string> column_families_not_found;
builders.insert(
std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
new BaseReferencedVersionBuilder(default_cfd))));
manifest_reader_status->reset(new Status());
manifest_reporter->reset(new LogReporter());
static_cast<LogReporter*>(manifest_reporter->get())->status =
manifest_reader_status->get();
Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
log::Reader* reader = manifest_reader->get();
int retry = 0;
while (s.ok() && retry < 1) {
assert(reader != nullptr);
Slice record;
std::string scratch;
while (s.ok() && reader->ReadRecord(&record, &scratch)) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
s = ApplyOneVersionEditToBuilder(
edit, cf_name_to_options, column_families_not_found, builders,
&have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
&have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
}
if (s.ok()) {
bool enough = have_next_file && have_log_number && have_last_sequence;
if (enough) {
for (const auto& cf : column_families) {
auto cfd = column_family_set_->GetColumnFamily(cf.name);
if (cfd == nullptr) {
enough = false;
break;
}
}
}
if (enough) {
for (const auto& cf : column_families) {
auto cfd = column_family_set_->GetColumnFamily(cf.name);
assert(cfd != nullptr);
if (!cfd->IsDropped()) {
auto builder_iter = builders.find(cfd->GetID());
assert(builder_iter != builders.end());
auto builder = builder_iter->second->version_builder();
assert(builder != nullptr);
s = builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */,
true /* is_initial_load */,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
if (!s.ok()) {
enough = false;
if (s.IsPathNotFound()) {
s = Status::OK();
}
break;
}
}
}
}
if (enough) {
break;
}
}
++retry;
}
if (s.ok()) {
if (!have_prev_log_number) {
previous_log_number = 0;
}
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
MarkFileNumberUsed(previous_log_number);
MarkFileNumberUsed(log_number);
for (auto cfd : *column_family_set_) {
assert(builders.count(cfd->GetID()) > 0);
auto builder = builders[cfd->GetID()]->version_builder();
if (!builder->CheckConsistencyForNumLevels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
}
}
}
if (s.ok()) {
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
assert(cfd->initialized());
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto* builder = builders_iter->second->version_builder();
Version* v = new Version(cfd, this, env_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(v->storage_info());
// Install recovered version
v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
!(db_options_->skip_stats_update_on_db_open));
AppendVersion(cfd, v);
}
next_file_number_.store(next_file + 1);
last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number;
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
ROCKS_LOG_INFO(db_options_->info_log,
"Column family [%s] (ID %u), log number is %" PRIu64 "\n",
cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
}
}
return s;
}
Status ReactiveVersionSet::ReadAndApply(
InstrumentedMutex* mu,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
std::unordered_set<ColumnFamilyData*>* cfds_changed) {
assert(manifest_reader != nullptr);
assert(cfds_changed != nullptr);
mu->AssertHeld();
Status s;
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t previous_log_number = 0;
uint32_t max_column_family = 0;
uint64_t min_log_number_to_keep = 0;
while (s.ok()) {
Slice record;
std::string scratch;
log::Reader* reader = manifest_reader->get();
std::string old_manifest_path = reader->file()->file_name();
while (reader->ReadRecord(&record, &scratch)) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (!s.ok()) {
break;
}
ColumnFamilyData* cfd =
column_family_set_->GetColumnFamily(edit.column_family_);
// If we cannot find this column family in our column family set, then it
// may be a new column family created by the primary after the secondary
// starts. Ignore it for now.
if (nullptr == cfd) {
continue;
}
if (active_version_builders_.find(edit.column_family_) ==
active_version_builders_.end()) {
std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
new BaseReferencedVersionBuilder(cfd));
active_version_builders_.insert(
std::make_pair(edit.column_family_, std::move(builder_guard)));
}
s = ApplyOneVersionEditToBuilder(
edit, &have_log_number, &log_number, &have_prev_log_number,
&previous_log_number, &have_next_file, &next_file,
&have_last_sequence, &last_sequence, &min_log_number_to_keep,
&max_column_family);
if (!s.ok()) {
break;
}
auto builder_iter = active_version_builders_.find(edit.column_family_);
assert(builder_iter != active_version_builders_.end());
auto builder = builder_iter->second->version_builder();
assert(builder != nullptr);
s = builder->LoadTableHandlers(
cfd->internal_stats(), db_options_->max_file_opening_threads,
false /* prefetch_index_and_filter_in_cache */,
false /* is_initial_load */,
cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
TEST_SYNC_POINT_CALLBACK(
"ReactiveVersionSet::ReadAndApply:AfterLoadTableHandlers", &s);
if (!s.ok() && !s.IsPathNotFound()) {
break;
} else if (s.IsPathNotFound()) {
s = Status::OK();
} else { // s.ok() == true
auto version = new Version(cfd, this, env_options_,
*cfd->GetLatestMutableCFOptions(),
current_version_number_++);
builder->SaveTo(version->storage_info());
version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
AppendVersion(cfd, version);
active_version_builders_.erase(builder_iter);
if (cfds_changed->count(cfd) == 0) {
cfds_changed->insert(cfd);
}
}
if (have_next_file) {
next_file_number_.store(next_file + 1);
}
if (have_last_sequence) {
last_allocated_sequence_ = last_sequence;
last_published_sequence_ = last_sequence;
last_sequence_ = last_sequence;
}
if (have_prev_log_number) {
prev_log_number_ = previous_log_number;
MarkFileNumberUsed(previous_log_number);
}
if (have_log_number) {
MarkFileNumberUsed(log_number);
}
column_family_set_->UpdateMaxColumnFamily(max_column_family);
MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
}
// It's possible that:
// 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
// 2) we have finished reading the current MANIFEST.
// 3) we have encountered an IOError reading the current MANIFEST.
// We need to look for the next MANIFEST and start from there. If we cannot
// find the next MANIFEST, we should exit the loop.
s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
reader = manifest_reader->get();
if (s.ok() && reader->file()->file_name() == old_manifest_path) {
break;
}
}
if (s.ok()) {
for (auto cfd : *column_family_set_) {
auto builder_iter = active_version_builders_.find(cfd->GetID());
if (builder_iter == active_version_builders_.end()) {
continue;
}
auto builder = builder_iter->second->version_builder();
if (!builder->CheckConsistencyForNumLevels()) {
s = Status::InvalidArgument(
"db has more levels than options.num_levels");
break;
}
}
}
return s;
}
Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
VersionEdit& edit, bool* have_log_number, uint64_t* log_number,
bool* have_prev_log_number, uint64_t* previous_log_number,
bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
uint32_t* max_column_family) {
ColumnFamilyData* cfd = nullptr;
Status status;
if (edit.is_column_family_add_) {
// TODO (yanqin) for now the secondary ignores column families created
// after Open. This also simplifies handling of switching to a new MANIFEST
// and processing the snapshot of the system at the beginning of the
// MANIFEST.
return Status::OK();
} else if (edit.is_column_family_drop_) {
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// Drop a CF created by primary after secondary starts? Then ignore
if (cfd == nullptr) {
return Status::OK();
}
// Drop the column family by setting it to be 'dropped' without destroying
// the column family handle.
cfd->SetDropped();
if (cfd->Unref()) {
delete cfd;
cfd = nullptr;
}
} else {
cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// Operation on a CF created after Open? Then ignore
if (cfd == nullptr) {
return Status::OK();
}
auto builder_iter = active_version_builders_.find(edit.column_family_);
assert(builder_iter != active_version_builders_.end());
auto builder = builder_iter->second->version_builder();
assert(builder != nullptr);
builder->Apply(&edit);
}
return ExtractInfoFromVersionEdit(
cfd, edit, have_log_number, log_number, have_prev_log_number,
previous_log_number, have_next_file, next_file, have_last_sequence,
last_sequence, min_log_number_to_keep, max_column_family);
}
Status ReactiveVersionSet::MaybeSwitchManifest(
log::Reader::Reporter* reporter,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
assert(manifest_reader != nullptr);
Status s;
do {
std::string manifest_path;
s = GetCurrentManifestPath(&manifest_path);
std::unique_ptr<SequentialFile> manifest_file;
if (s.ok()) {
if (nullptr == manifest_reader->get() ||
manifest_reader->get()->file()->file_name() != manifest_path) {
TEST_SYNC_POINT(
"ReactiveVersionSet::MaybeSwitchManifest:"
"AfterGetCurrentManifestPath:0");
TEST_SYNC_POINT(
"ReactiveVersionSet::MaybeSwitchManifest:"
"AfterGetCurrentManifestPath:1");
s = env_->NewSequentialFile(
manifest_path, &manifest_file,
env_->OptimizeForManifestRead(env_options_));
} else {
// No need to switch manifest.
break;
}
}
std::unique_ptr<SequentialFileReader> manifest_file_reader;
if (s.ok()) {
manifest_file_reader.reset(
new SequentialFileReader(std::move(manifest_file), manifest_path));
manifest_reader->reset(new log::FragmentBufferedReader(
nullptr, std::move(manifest_file_reader), reporter,
true /* checksum */, 0 /* log_number */));
ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
manifest_path.c_str());
// TODO (yanqin) every time we switch to a new MANIFEST, we clear the
// active_version_builders_ map because we choose to construct the
// versions from scratch, thanks to the first part of each MANIFEST
// written by VersionSet::WriteSnapshot. This is not necessary, but we
// choose this at present for the sake of simplicity.
active_version_builders_.clear();
}
} while (s.IsPathNotFound());
return s;
}
} // namespace rocksdb

@ -648,6 +648,7 @@ class Version {
private:
Env* env_;
friend class ReactiveVersionSet;
friend class VersionSet;
const InternalKeyComparator* internal_comparator() const {
@ -739,9 +740,7 @@ struct ObsoleteFileInfo {
}
};
namespace {
class BaseReferencedVersionBuilder;
}
class VersionSet {
public:
@ -749,7 +748,7 @@ class VersionSet {
const EnvOptions& env_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
~VersionSet();
virtual ~VersionSet();
// Apply *edit to the current version to form a new descriptor that
// is both saved to persistent state and installed as the new
@ -795,7 +794,7 @@ class VersionSet {
// The across-multi-cf batch version. If edit_lists contain more than
// 1 version edits, caller must ensure that no edit in the []list is column
// family manipulation.
Status LogAndApply(
virtual Status LogAndApply(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<autovector<VersionEdit*>>& edit_lists,
@ -803,6 +802,8 @@ class VersionSet {
bool new_descriptor_log = false,
const ColumnFamilyOptions* new_cf_options = nullptr);
Status GetCurrentManifestPath(std::string* manifest_filename);
// Recover the last saved descriptor from persistent storage.
// If read_only == true, Recover() will not complain if some column families
// are not opened
@ -983,11 +984,12 @@ class VersionSet {
static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
private:
protected:
struct ManifestWriter;
friend class Version;
friend class DBImpl;
friend class DBImplReadOnly;
struct LogReporter : public log::Reader::Reporter {
Status* status;
@ -1011,20 +1013,24 @@ class VersionSet {
ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options,
VersionEdit* edit);
Status ApplyOneVersionEdit(
// REQUIRES db mutex
Status ApplyOneVersionEditToBuilder(
VersionEdit& edit,
const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_opts,
std::unordered_map<int, std::string>& column_families_not_found,
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*>& builders,
std::unordered_map<
uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>& builders,
bool* have_log_number, uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
InstrumentedMutex* mu, Directory* db_directory,
bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options);
Status ExtractInfoFromVersionEdit(
ColumnFamilyData* cfd, const VersionEdit& edit, bool* have_log_number,
uint64_t* log_number, bool* have_prev_log_number,
uint64_t* previous_log_number, bool* have_next_file, uint64_t* next_file,
bool* have_last_sequence, SequenceNumber* last_sequence,
uint64_t* min_log_number_to_keep, uint32_t* max_column_family);
std::unique_ptr<ColumnFamilySet> column_family_set_;
@ -1074,13 +1080,77 @@ class VersionSet {
// env options for all reads and writes except compactions
EnvOptions env_options_;
private:
// No copying allowed
VersionSet(const VersionSet&);
void operator=(const VersionSet&);
// REQUIRES db mutex at beginning. may release and re-acquire db mutex
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,
InstrumentedMutex* mu, Directory* db_directory,
bool new_descriptor_log,
const ColumnFamilyOptions* new_cf_options);
void LogAndApplyCFHelper(VersionEdit* edit);
void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, Version* v,
void LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b,
VersionEdit* edit, InstrumentedMutex* mu);
};
class ReactiveVersionSet : public VersionSet {
public:
ReactiveVersionSet(const std::string& dbname,
const ImmutableDBOptions* _db_options,
const EnvOptions& _env_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
WriteController* write_controller);
~ReactiveVersionSet() override;
Status ReadAndApply(
InstrumentedMutex* mu,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
std::unordered_set<ColumnFamilyData*>* cfds_changed);
Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
std::unique_ptr<Status>* manifest_reader_status);
protected:
using VersionSet::ApplyOneVersionEditToBuilder;
// REQUIRES db mutex
Status ApplyOneVersionEditToBuilder(
VersionEdit& edit, bool* have_log_number, uint64_t* log_number,
bool* have_prev_log_number, uint64_t* previous_log_number,
bool* have_next_file, uint64_t* next_file, bool* have_last_sequence,
SequenceNumber* last_sequence, uint64_t* min_log_number_to_keep,
uint32_t* max_column_family);
Status MaybeSwitchManifest(
log::Reader::Reporter* reporter,
std::unique_ptr<log::FragmentBufferedReader>* manifest_reader);
private:
std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>
active_version_builders_;
using VersionSet::LogAndApply;
using VersionSet::Recover;
Status LogAndApply(
const autovector<ColumnFamilyData*>& /*cfds*/,
const autovector<const MutableCFOptions*>& /*mutable_cf_options_list*/,
const autovector<autovector<VersionEdit*>>& /*edit_lists*/,
InstrumentedMutex* /*mu*/, Directory* /*db_directory*/,
bool /*new_descriptor_log*/,
const ColumnFamilyOptions* /*new_cf_option*/) override {
return Status::NotSupported("not supported in reactive mode");
}
// No copy allowed
ReactiveVersionSet(const ReactiveVersionSet&);
ReactiveVersionSet& operator=(const ReactiveVersionSet&);
};
} // namespace rocksdb

@ -457,7 +457,7 @@ Status WalManager::ReadFirstLine(const std::string& fname,
reporter.status = &status;
reporter.ignore_error = !db_options_.paranoid_checks;
log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
true /*checksum*/, number, false /* retry_after_eof */);
true /*checksum*/, number);
std::string scratch;
Slice record;

8
env/env_hdfs.cc vendored

@ -36,9 +36,11 @@ namespace {
// Log error message
static Status IOError(const std::string& context, int err_number) {
return (err_number == ENOSPC) ?
Status::NoSpace(context, strerror(err_number)) :
Status::IOError(context, strerror(err_number));
return (err_number == ENOSPC)
? Status::NoSpace(context, strerror(err_number))
: (err_number == ENOENT)
? Status::PathNotFound(context, strerror(err_number))
: Status::IOError(context, strerror(err_number));
}
// assume that there is one global logger for now. It is not thread-safe,

3
env/io_posix.h vendored

@ -41,6 +41,9 @@ static Status IOError(const std::string& context, const std::string& file_name,
strerror(err_number));
case ESTALE:
return Status::IOError(Status::kStaleFile);
case ENOENT:
return Status::PathNotFound(IOErrorMsg(context, file_name),
strerror(err_number));
default:
return Status::IOError(IOErrorMsg(context, file_name),
strerror(err_number));

1
examples/.gitignore vendored

@ -2,6 +2,7 @@ c_simple_example
column_families_example
compact_files_example
compaction_filter_example
multi_processes_example
optimistic_transaction_example
options_file_example
simple_example

@ -43,8 +43,11 @@ transaction_example: librocksdb transaction_example.cc
options_file_example: librocksdb options_file_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
multi_processes_example: librocksdb multi_processes_example.cc
$(CXX) $(CXXFLAGS) $@.cc -o$@ ../librocksdb.a -I../include -O2 -std=c++11 $(PLATFORM_LDFLAGS) $(PLATFORM_CXXFLAGS) $(EXEC_LDFLAGS)
clean:
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example
rm -rf ./simple_example ./column_families_example ./compact_files_example ./compaction_filter_example ./c_simple_example c_simple_example.o ./optimistic_transaction_example ./transaction_example ./options_file_example ./multi_processes_example
librocksdb:
cd .. && $(MAKE) static_lib

@ -0,0 +1,395 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// How to use this example
// Open two terminals, in one of them, run `./multi_processes_example 0` to
// start a process running the primary instance. This will create a new DB in
// kDBPath. The process will run for a while inserting keys to the normal
// RocksDB database.
// Next, go to the other terminal and run `./multi_processes_example 1` to
// start a process running the secondary instance. This will create a secondary
// instance following the aforementioned primary instance. This process will
// run for a while, tailing the logs of the primary. After process with primary
// instance exits, this process will keep running until you hit 'CTRL+C'.
#include <inttypes.h>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <string>
#include <thread>
#include <vector>
#if defined(OS_LINUX)
#include <dirent.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#endif // !OS_LINUX
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
using rocksdb::ColumnFamilyDescriptor;
using rocksdb::ColumnFamilyHandle;
using rocksdb::ColumnFamilyOptions;
using rocksdb::DB;
using rocksdb::FlushOptions;
using rocksdb::Iterator;
using rocksdb::Options;
using rocksdb::ReadOptions;
using rocksdb::Slice;
using rocksdb::Status;
using rocksdb::WriteOptions;
const std::string kDBPath = "/tmp/rocksdb_multi_processes_example";
const std::string kPrimaryStatusFile =
"/tmp/rocksdb_multi_processes_example_primary_status";
const uint64_t kMaxKey = 600000;
const size_t kMaxValueLength = 256;
const size_t kNumKeysPerFlush = 1000;
const std::vector<std::string>& GetColumnFamilyNames() {
static std::vector<std::string> column_family_names = {
rocksdb::kDefaultColumnFamilyName, "pikachu"};
return column_family_names;
}
inline bool IsLittleEndian() {
uint32_t x = 1;
return *reinterpret_cast<char*>(&x) != 0;
}
static std::atomic<int>& ShouldSecondaryWait() {
static std::atomic<int> should_secondary_wait{1};
return should_secondary_wait;
}
static std::string Key(uint64_t k) {
std::string ret;
if (IsLittleEndian()) {
ret.append(reinterpret_cast<char*>(&k), sizeof(k));
} else {
char buf[sizeof(k)];
buf[0] = k & 0xff;
buf[1] = (k >> 8) & 0xff;
buf[2] = (k >> 16) & 0xff;
buf[3] = (k >> 24) & 0xff;
buf[4] = (k >> 32) & 0xff;
buf[5] = (k >> 40) & 0xff;
buf[6] = (k >> 48) & 0xff;
buf[7] = (k >> 56) & 0xff;
ret.append(buf, sizeof(k));
}
size_t i = 0, j = ret.size() - 1;
while (i < j) {
char tmp = ret[i];
ret[i] = ret[j];
ret[j] = tmp;
++i;
--j;
}
return ret;
}
static uint64_t Key(std::string key) {
assert(key.size() == sizeof(uint64_t));
size_t i = 0, j = key.size() - 1;
while (i < j) {
char tmp = key[i];
key[i] = key[j];
key[j] = tmp;
++i;
--j;
}
uint64_t ret = 0;
if (IsLittleEndian()) {
memcpy(&ret, key.c_str(), sizeof(uint64_t));
} else {
const char* buf = key.c_str();
ret |= static_cast<uint64_t>(buf[0]);
ret |= (static_cast<uint64_t>(buf[1]) << 8);
ret |= (static_cast<uint64_t>(buf[2]) << 16);
ret |= (static_cast<uint64_t>(buf[3]) << 24);
ret |= (static_cast<uint64_t>(buf[4]) << 32);
ret |= (static_cast<uint64_t>(buf[5]) << 40);
ret |= (static_cast<uint64_t>(buf[6]) << 48);
ret |= (static_cast<uint64_t>(buf[7]) << 56);
}
return ret;
}
static Slice GenerateRandomValue(const size_t max_length, char scratch[]) {
size_t sz = 1 + (std::rand() % max_length);
int rnd = std::rand();
for (size_t i = 0; i != sz; ++i) {
scratch[i] = static_cast<char>(rnd ^ i);
}
return Slice(scratch, sz);
}
static bool ShouldCloseDB() { return true; }
// TODO: port this example to other systems. It should be straightforward for
// POSIX-compliant systems.
#if defined(OS_LINUX)
void CreateDB() {
long my_pid = static_cast<long>(getpid());
Options options;
Status s = rocksdb::DestroyDB(kDBPath, options);
if (!s.ok()) {
fprintf(stderr, "[process %ld] Failed to destroy DB: %s\n", my_pid,
s.ToString().c_str());
assert(false);
}
options.create_if_missing = true;
DB* db = nullptr;
s = DB::Open(options, kDBPath, &db);
if (!s.ok()) {
fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
s.ToString().c_str());
assert(false);
}
std::vector<ColumnFamilyHandle*> handles;
ColumnFamilyOptions cf_opts(options);
for (const auto& cf_name : GetColumnFamilyNames()) {
if (rocksdb::kDefaultColumnFamilyName != cf_name) {
ColumnFamilyHandle* handle = nullptr;
s = db->CreateColumnFamily(cf_opts, cf_name, &handle);
if (!s.ok()) {
fprintf(stderr, "[process %ld] Failed to create CF %s: %s\n", my_pid,
cf_name.c_str(), s.ToString().c_str());
assert(false);
}
handles.push_back(handle);
}
}
fprintf(stdout, "[process %ld] Column families created\n", my_pid);
for (auto h : handles) {
delete h;
}
handles.clear();
delete db;
}
void RunPrimary() {
long my_pid = static_cast<long>(getpid());
fprintf(stdout, "[process %ld] Primary instance starts\n", my_pid);
CreateDB();
std::srand(time(nullptr));
DB* db = nullptr;
Options options;
options.create_if_missing = false;
std::vector<ColumnFamilyDescriptor> column_families;
for (const auto& cf_name : GetColumnFamilyNames()) {
column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
}
std::vector<ColumnFamilyHandle*> handles;
WriteOptions write_opts;
char val_buf[kMaxValueLength] = {0};
uint64_t curr_key = 0;
while (curr_key < kMaxKey) {
Status s;
if (nullptr == db) {
s = DB::Open(options, kDBPath, column_families, &handles, &db);
if (!s.ok()) {
fprintf(stderr, "[process %ld] Failed to open DB: %s\n", my_pid,
s.ToString().c_str());
assert(false);
}
}
assert(nullptr != db);
assert(handles.size() == GetColumnFamilyNames().size());
for (auto h : handles) {
assert(nullptr != h);
for (size_t i = 0; i != kNumKeysPerFlush; ++i) {
Slice key = Key(curr_key + static_cast<uint64_t>(i));
Slice value = GenerateRandomValue(kMaxValueLength, val_buf);
s = db->Put(write_opts, h, key, value);
if (!s.ok()) {
fprintf(stderr, "[process %ld] Failed to insert\n", my_pid);
assert(false);
}
}
s = db->Flush(FlushOptions(), h);
if (!s.ok()) {
fprintf(stderr, "[process %ld] Failed to flush\n", my_pid);
assert(false);
}
}
curr_key += static_cast<uint64_t>(kNumKeysPerFlush);
if (ShouldCloseDB()) {
for (auto h : handles) {
delete h;
}
handles.clear();
delete db;
db = nullptr;
}
}
if (nullptr != db) {
for (auto h : handles) {
delete h;
}
handles.clear();
delete db;
db = nullptr;
}
fprintf(stdout, "[process %ld] Finished adding keys\n", my_pid);
}
void secondary_instance_sigint_handler(int signal) {
ShouldSecondaryWait().store(0, std::memory_order_relaxed);
fprintf(stdout, "\n");
fflush(stdout);
};
void RunSecondary() {
::signal(SIGINT, secondary_instance_sigint_handler);
long my_pid = static_cast<long>(getpid());
const std::string kSecondaryPath =
"/tmp/rocksdb_multi_processes_example_secondary";
// Create directory if necessary
if (nullptr == opendir(kSecondaryPath.c_str())) {
int ret =
mkdir(kSecondaryPath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
if (ret < 0) {
perror("failed to create directory for secondary instance");
exit(0);
}
}
DB* db = nullptr;
Options options;
options.create_if_missing = false;
options.max_open_files = -1;
Status s = DB::OpenAsSecondary(options, kDBPath, kSecondaryPath, &db);
if (!s.ok()) {
fprintf(stderr, "[process %ld] Failed to open in secondary mode: %s\n",
my_pid, s.ToString().c_str());
assert(false);
} else {
fprintf(stdout, "[process %ld] Secondary instance starts\n", my_pid);
}
ReadOptions ropts;
ropts.verify_checksums = true;
ropts.total_order_seek = true;
std::vector<std::thread> test_threads;
test_threads.emplace_back([&]() {
while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
iter->SeekToFirst();
size_t count = 0;
for (; iter->Valid(); iter->Next()) {
++count;
}
}
fprintf(stdout, "[process %ld] Range_scan thread finished\n", my_pid);
});
test_threads.emplace_back([&]() {
std::srand(time(nullptr));
while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
Slice key = Key(std::rand() % kMaxKey);
std::string value;
db->Get(ropts, key, &value);
}
fprintf(stdout, "[process %ld] Point lookup thread finished\n");
});
uint64_t curr_key = 0;
while (1 == ShouldSecondaryWait().load(std::memory_order_relaxed)) {
s = db->TryCatchUpWithPrimary();
if (!s.ok()) {
fprintf(stderr,
"[process %ld] error while trying to catch up with "
"primary %s\n",
my_pid, s.ToString().c_str());
assert(false);
}
{
std::unique_ptr<Iterator> iter(db->NewIterator(ropts));
if (!iter) {
fprintf(stderr, "[process %ld] Failed to create iterator\n", my_pid);
assert(false);
}
iter->SeekToLast();
if (iter->Valid()) {
uint64_t curr_max_key = Key(iter->key().ToString());
if (curr_max_key != curr_key) {
fprintf(stdout, "[process %ld] Observed key %" PRIu64 "\n", my_pid,
curr_key);
curr_key = curr_max_key;
}
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
s = db->TryCatchUpWithPrimary();
if (!s.ok()) {
fprintf(stderr,
"[process %ld] error while trying to catch up with "
"primary %s\n",
my_pid, s.ToString().c_str());
assert(false);
}
std::vector<ColumnFamilyDescriptor> column_families;
for (const auto& cf_name : GetColumnFamilyNames()) {
column_families.push_back(ColumnFamilyDescriptor(cf_name, options));
}
std::vector<ColumnFamilyHandle*> handles;
DB* verification_db = nullptr;
s = DB::OpenForReadOnly(options, kDBPath, column_families, &handles,
&verification_db);
assert(s.ok());
Iterator* iter1 = verification_db->NewIterator(ropts);
iter1->SeekToFirst();
Iterator* iter = db->NewIterator(ropts);
iter->SeekToFirst();
for (; iter->Valid() && iter1->Valid(); iter->Next(), iter1->Next()) {
if (iter->key().ToString() != iter1->key().ToString()) {
fprintf(stderr, "%" PRIu64 "!= %" PRIu64 "\n",
Key(iter->key().ToString()), Key(iter1->key().ToString()));
assert(false);
} else if (iter->value().ToString() != iter1->value().ToString()) {
fprintf(stderr, "Value mismatch\n");
assert(false);
}
}
fprintf(stdout, "[process %ld] Verification succeeded\n", my_pid);
for (auto& thr : test_threads) {
thr.join();
}
delete iter;
delete iter1;
delete db;
delete verification_db;
}
int main(int argc, char** argv) {
if (argc < 2) {
fprintf(stderr, "%s <0 for primary, 1 for secondary>\n", argv[0]);
return 0;
}
if (atoi(argv[1]) == 0) {
RunPrimary();
} else {
RunSecondary();
}
return 0;
}
#else // OS_LINUX
int main() {
fpritnf(stderr, "Not implemented.\n");
return 0;
}
#endif // !OS_LINUX

@ -162,6 +162,54 @@ class DB {
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
bool error_if_log_file_exist = false);
// The following OpenAsSecondary functions create a secondary instance that
// can dynamically tail the MANIFEST of a primary that must have already been
// created. User can call TryCatchUpWithPrimary to make the secondary
// instance catch up with primary (WAL tailing is NOT supported now) whenever
// the user feels necessary. Column families created by the primary after the
// secondary instance starts are currently ignored by the secondary instance.
// Column families opened by secondary and dropped by the primary will be
// dropped by secondary as well. However the user of the secondary instance
// can still access the data of such dropped column family as long as they
// do not destroy the corresponding column family handle.
// WAL tailing is not supported at present, but will arrive soon.
//
// The options argument specifies the options to open the secondary instance.
// The name argument specifies the name of the primary db that you have used
// to open the primary instance.
// The secondary_path argument points to a directory where the secondary
// instance stores its info log.
// The dbptr is an out-arg corresponding to the opened secondary instance.
// The pointer points to a heap-allocated database, and the user should
// delete it after use.
// Open DB as secondary instance with only the default column family.
// Return OK on success, non-OK on failures.
static Status OpenAsSecondary(const Options& options, const std::string& name,
const std::string& secondary_path, DB** dbptr);
// Open DB as secondary instance with column families. You can open a subset
// of column families in secondary mode.
// The db_options specify the database specific options.
// The name argument specifies the name of the primary db that you have used
// to open the primary instance.
// The secondary_path argument points to a directory where the secondary
// instance stores its info log.
// The column_families argument specifieds a list of column families to open.
// If any of the column families does not exist, the function returns non-OK
// status.
// The handles is an out-arg corresponding to the opened database column
// familiy handles.
// The dbptr is an out-arg corresponding to the opened secondary instance.
// The pointer points to a heap-allocated database, and the caller should
// delete it after use. Before deleting the dbptr, the user should also
// delete the pointers stored in handles vector.
// Return OK on success, on-OK on failures.
static Status OpenAsSecondary(
const DBOptions& db_options, const std::string& name,
const std::string& secondary_path,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
// Open DB with column families.
// db_options specify database specific options
// column_families is the vector of all column families in the database,
@ -1235,6 +1283,23 @@ class DB {
return Status::NotSupported("GetStatsHistory() is not implemented.");
}
#ifndef ROCKSDB_LITE
// Make the secondary instance catch up with the primary by tailing and
// replaying the MANIFEST and WAL of the primary.
// Column families created by the primary after the secondary instance starts
// will be ignored unless the secondary instance closes and restarts with the
// newly created column families.
// Column families that exist before secondary instance starts and dropped by
// the primary afterwards will be marked as dropped. However, as long as the
// secondary instance does not delete the corresponding column family
// handles, the data of the column family is still accessible to the
// secondary.
// TODO: we will support WAL tailing soon.
virtual Status TryCatchUpWithPrimary() {
return Status::NotSupported("Supported only by secondary instance");
}
#endif // !ROCKSDB_LITE
private:
// No copying allowed
DB(const DB&);

@ -73,6 +73,7 @@ class Status {
kStaleFile = 6,
kMemoryLimit = 7,
kSpaceLimit = 8,
kPathNotFound = 9,
kMaxSubCode
};
@ -198,6 +199,11 @@ class Status {
return Status(kIOError, kSpaceLimit, msg, msg2);
}
static Status PathNotFound() { return Status(kIOError, kPathNotFound); }
static Status PathNotFound(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, kPathNotFound, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
@ -266,6 +272,14 @@ class Status {
return (code() == kAborted) && (subcode() == kMemoryLimit);
}
// Returns true iff the status indicates a PathNotFound error
// This is caused by an I/O error returning the specific "no such file or
// directory" error condition. A PathNotFound error is an I/O error with
// a specific subcode, enabling users to take appropriate action if necessary
bool IsPathNotFound() const {
return (code() == kIOError) && (subcode() == kPathNotFound);
}
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;

@ -27,7 +27,9 @@ std::string GetWindowsErrSz(DWORD err);
inline Status IOErrorFromWindowsError(const std::string& context, DWORD err) {
return ((err == ERROR_HANDLE_DISK_FULL) || (err == ERROR_DISK_FULL))
? Status::NoSpace(context, GetWindowsErrSz(err))
: Status::IOError(context, GetWindowsErrSz(err));
: ((err == ERROR_FILE_NOT_FOUND) || (err == ERROR_PATH_NOT_FOUND))
? Status::PathNotFound(context, GetWindowsErrSz(err))
: Status::IOError(context, GetWindowsErrSz(err));
}
inline Status IOErrorFromLastWindowsError(const std::string& context) {
@ -37,7 +39,9 @@ inline Status IOErrorFromLastWindowsError(const std::string& context) {
inline Status IOError(const std::string& context, int err_number) {
return (err_number == ENOSPC)
? Status::NoSpace(context, strerror(err_number))
: Status::IOError(context, strerror(err_number));
: (err_number == ENOENT)
? Status::PathNotFound(context, strerror(err_number))
: Status::IOError(context, strerror(err_number));
}
class WinFileData;
@ -426,9 +430,7 @@ public:
class WinDirectory : public Directory {
HANDLE handle_;
public:
explicit
WinDirectory(HANDLE h) noexcept :
handle_(h) {
explicit WinDirectory(HANDLE h) noexcept : handle_(h) {
assert(handle_ != INVALID_HANDLE_VALUE);
}
~WinDirectory() {

2
src.mk

@ -22,6 +22,7 @@ LIB_SOURCES = \
db/db_impl_files.cc \
db/db_impl_open.cc \
db/db_impl_readonly.cc \
db/db_impl_secondary.cc \
db/db_impl_write.cc \
db/db_info_dumper.cc \
db/db_iter.cc \
@ -279,6 +280,7 @@ MAIN_SOURCES = \
db/db_options_test.cc \
db/db_properties_test.cc \
db/db_range_del_test.cc \
db/db_secondary_test.cc \
db/db_sst_test.cc \
db/db_statistics_test.cc \
db/db_table_properties_test.cc \

@ -2014,8 +2014,7 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header,
log_number = 0;
}
log::Reader reader(options.info_log, std::move(wal_file_reader), &reporter,
true /* checksum */, log_number,
false /* retry_after_eof */);
true /* checksum */, log_number);
std::string scratch;
WriteBatch batch;
Slice record;

@ -41,7 +41,8 @@ static const char* msgs[static_cast<int>(Status::kMaxSubCode)] = {
"Deadlock", // kDeadlock
"Stale file handle", // kStaleFile
"Memory limit reached", // kMemoryLimit
"Space limit reached" // kSpaceLimit
"Space limit reached", // kSpaceLimit
"No such file or directory", // kPathNotFound
};
Status::Status(Code _code, SubCode _subcode, const Slice& msg,