Merge branch 'master' of github.com:facebook/rocksdb into ignore

This commit is contained in:
Yueh-Hsuan Chiang 2014-04-03 10:55:23 -07:00
commit f5469b1a61
13 changed files with 443 additions and 62 deletions

82
db/c.cc
View File

@ -25,12 +25,14 @@
#include "rocksdb/universal_compaction.h" #include "rocksdb/universal_compaction.h"
#include "rocksdb/statistics.h" #include "rocksdb/statistics.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
using rocksdb::Cache; using rocksdb::Cache;
using rocksdb::Comparator; using rocksdb::Comparator;
using rocksdb::CompressionType; using rocksdb::CompressionType;
using rocksdb::DB; using rocksdb::DB;
using rocksdb::Env; using rocksdb::Env;
using rocksdb::InfoLogLevel;
using rocksdb::FileLock; using rocksdb::FileLock;
using rocksdb::FilterPolicy; using rocksdb::FilterPolicy;
using rocksdb::FlushOptions; using rocksdb::FlushOptions;
@ -656,6 +658,11 @@ void rocksdb_options_set_info_log(rocksdb_options_t* opt, rocksdb_logger_t* l) {
} }
} }
void rocksdb_options_set_info_log_level(
rocksdb_options_t* opt, int v) {
opt->rep.info_log_level = static_cast<InfoLogLevel>(v);
}
void rocksdb_options_set_write_buffer_size(rocksdb_options_t* opt, size_t s) { void rocksdb_options_set_write_buffer_size(rocksdb_options_t* opt, size_t s) {
opt->rep.write_buffer_size = s; opt->rep.write_buffer_size = s;
} }
@ -714,6 +721,14 @@ void rocksdb_options_set_max_grandparent_overlap_factor(
opt->rep.max_grandparent_overlap_factor = n; opt->rep.max_grandparent_overlap_factor = n;
} }
void rocksdb_options_set_max_bytes_for_level_multiplier_additional(
rocksdb_options_t* opt, int* level_values, size_t num_levels) {
opt->rep.max_bytes_for_level_multiplier_additional.resize(num_levels);
for (size_t i = 0; i < num_levels; ++i) {
opt->rep.max_bytes_for_level_multiplier_additional[i] = level_values[i];
}
}
void rocksdb_options_enable_statistics(rocksdb_options_t* opt) { void rocksdb_options_enable_statistics(rocksdb_options_t* opt) {
opt->rep.statistics = rocksdb::CreateDBStatistics(); opt->rep.statistics = rocksdb::CreateDBStatistics();
} }
@ -857,6 +872,24 @@ void rocksdb_options_set_advise_random_on_open(
opt->rep.advise_random_on_open = v; opt->rep.advise_random_on_open = v;
} }
void rocksdb_options_set_access_hint_on_compaction_start(
rocksdb_options_t* opt, int v) {
switch(v) {
case 0:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::NONE;
break;
case 1:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::NORMAL;
break;
case 2:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::SEQUENTIAL;
break;
case 3:
opt->rep.access_hint_on_compaction_start = rocksdb::Options::WILLNEED;
break;
}
}
void rocksdb_options_set_use_adaptive_mutex( void rocksdb_options_set_use_adaptive_mutex(
rocksdb_options_t* opt, unsigned char v) { rocksdb_options_t* opt, unsigned char v) {
opt->rep.use_adaptive_mutex = v; opt->rep.use_adaptive_mutex = v;
@ -867,6 +900,11 @@ void rocksdb_options_set_bytes_per_sync(
opt->rep.bytes_per_sync = v; opt->rep.bytes_per_sync = v;
} }
void rocksdb_options_set_verify_checksums_in_compaction(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.verify_checksums_in_compaction = v;
}
void rocksdb_options_set_filter_deletes( void rocksdb_options_set_filter_deletes(
rocksdb_options_t* opt, unsigned char v) { rocksdb_options_t* opt, unsigned char v) {
opt->rep.filter_deletes = v; opt->rep.filter_deletes = v;
@ -1003,11 +1041,48 @@ void rocksdb_options_set_hash_link_list_rep(
opt->rep.memtable_factory.reset(factory); opt->rep.memtable_factory.reset(factory);
} }
void rocksdb_options_set_plain_table_factory(
rocksdb_options_t *opt, uint32_t user_key_len, int bloom_bits_per_key,
double hash_table_ratio, size_t index_sparseness) {
static rocksdb::TableFactory* factory = 0;
if (!factory) {
factory = rocksdb::NewPlainTableFactory(
user_key_len, bloom_bits_per_key,
hash_table_ratio, index_sparseness);
}
opt->rep.table_factory.reset(factory);
}
void rocksdb_options_set_max_successive_merges( void rocksdb_options_set_max_successive_merges(
rocksdb_options_t* opt, size_t v) { rocksdb_options_t* opt, size_t v) {
opt->rep.max_successive_merges = v; opt->rep.max_successive_merges = v;
} }
void rocksdb_options_set_min_partial_merge_operands(
rocksdb_options_t* opt, uint32_t v) {
opt->rep.min_partial_merge_operands = v;
}
void rocksdb_options_set_bloom_locality(
rocksdb_options_t* opt, uint32_t v) {
opt->rep.bloom_locality = v;
}
void rocksdb_options_set_allow_thread_local(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.allow_thread_local = v;
}
void rocksdb_options_set_inplace_update_support(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.inplace_update_support = v;
}
void rocksdb_options_set_inplace_update_num_locks(
rocksdb_options_t* opt, size_t v) {
opt->rep.inplace_update_num_locks = v;
}
void rocksdb_options_set_compaction_style(rocksdb_options_t *opt, int style) { void rocksdb_options_set_compaction_style(rocksdb_options_t *opt, int style) {
opt->rep.compaction_style = static_cast<rocksdb::CompactionStyle>(style); opt->rep.compaction_style = static_cast<rocksdb::CompactionStyle>(style);
} }
@ -1022,21 +1097,14 @@ DB::OpenForReadOnly
DB::MultiGet DB::MultiGet
DB::KeyMayExist DB::KeyMayExist
DB::GetOptions DB::GetOptions
DB::GetLiveFiles
DB::GetSortedWalFiles DB::GetSortedWalFiles
DB::GetLatestSequenceNumber DB::GetLatestSequenceNumber
DB::GetUpdatesSince DB::GetUpdatesSince
DB::DeleteFile
DB::GetDbIdentity DB::GetDbIdentity
DB::RunManualCompaction DB::RunManualCompaction
custom cache custom cache
compaction_filter compaction_filter
max_bytes_for_level_multiplier_additional
access_hint_on_compaction_start
table_factory
table_properties_collectors table_properties_collectors
inplace_update_support
inplace_update_num_locks
*/ */
rocksdb_comparator_t* rocksdb_comparator_create( rocksdb_comparator_t* rocksdb_comparator_create(

View File

@ -443,6 +443,7 @@ int main(int argc, char** argv) {
rocksdb_options_set_filter_policy(options, policy); rocksdb_options_set_filter_policy(options, policy);
rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3)); rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3));
rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4); rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4);
rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16);
db = rocksdb_open(options, dbname, &err); db = rocksdb_open(options, dbname, &err);
CheckNoError(err); CheckNoError(err);

View File

@ -587,7 +587,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
options_->level0_file_num_compaction_trigger; options_->level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp( if ((c = PickCompactionUniversalReadAmp(
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
Log(options_->info_log, "Universal: compacting for file num\n"); LogToBuffer(log_buffer, "Universal: compacting for file num\n");
} }
} }
} }

View File

@ -17,6 +17,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "port/port.h" #include "port/port.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
@ -95,20 +96,55 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
} }
Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
// First get sorted files in archive dir, then append sorted files from main // First get sorted files in db dir, then get sorted files from archived
// dir to maintain sorted order // dir, to avoid a race condition where a log file is moved to archived
// dir in between.
// list wal files in archive dir.
Status s; Status s;
// list wal files in main db dir.
VectorLogPtr logs;
s = GetSortedWalsOfType(options_.wal_dir, logs, kAliveLogFile);
if (!s.ok()) {
return s;
}
// Reproduce the race condition where a log file is moved
// to archived dir, between these two sync points, used in
// (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1");
TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2");
files.clear();
// list wal files in archive dir.
std::string archivedir = ArchivalDirectory(options_.wal_dir); std::string archivedir = ArchivalDirectory(options_.wal_dir);
if (env_->FileExists(archivedir)) { if (env_->FileExists(archivedir)) {
s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
} }
// list wal files in main db dir.
return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile); uint64_t latest_archived_log_number = 0;
if (!files.empty()) {
latest_archived_log_number = files.back()->LogNumber();
Log(options_.info_log, "Latest Archived log: %lu",
latest_archived_log_number);
}
files.reserve(files.size() + logs.size());
for (auto& log : logs) {
if (log->LogNumber() > latest_archived_log_number) {
files.push_back(std::move(log));
} else {
// When the race condition happens, we could see the
// same log in both db dir and archived dir. Simply
// ignore the one in db dir. Note that, if we read
// archived dir first, we would have missed the log file.
Log(options_.info_log, "%s already moved to archive",
log->PathName().c_str());
}
}
return s;
} }
} }

View File

@ -64,6 +64,7 @@
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
@ -872,7 +873,11 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
if (type == kLogFile && if (type == kLogFile &&
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1");
Status s = env_->RenameFile(fname, archived_log_name); Status s = env_->RenameFile(fname, archived_log_name);
// The sync point below is used in (DBTest,TransactionLogIteratorRace)
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2");
Log(options_.info_log, Log(options_.info_log,
"Move log file %s to %s -- %s\n", "Move log file %s to %s -- %s\n",
fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); fname.c_str(), archived_log_name.c_str(), s.ToString().c_str());
@ -1020,7 +1025,7 @@ void DBImpl::PurgeObsoleteWALFiles() {
size_t files_del_num = log_files_num - files_keep_num; size_t files_del_num = log_files_num - files_keep_num;
VectorLogPtr archived_logs; VectorLogPtr archived_logs;
AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile);
if (files_del_num > archived_logs.size()) { if (files_del_num > archived_logs.size()) {
Log(options_.info_log, "Trying to delete more archived log files than " Log(options_.info_log, "Trying to delete more archived log files than "
@ -1455,7 +1460,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress,
s = imm_.InstallMemtableFlushResults( s = imm_.InstallMemtableFlushResults(
mems, versions_.get(), &mutex_, options_.info_log.get(), file_number, mems, versions_.get(), &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get()); db_directory_.get(), log_buffer);
} }
if (s.ok()) { if (s.ok()) {
@ -1791,20 +1796,14 @@ struct CompareLogByPointer {
} }
}; };
Status DBImpl::AppendSortedWalsOfType(const std::string& path, Status DBImpl::GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files, WalFileType log_type) { VectorLogPtr& log_files, WalFileType log_type) {
std::vector<std::string> all_files; std::vector<std::string> all_files;
const Status status = env_->GetChildren(path, &all_files); const Status status = env_->GetChildren(path, &all_files);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
log_files.reserve(log_files.size() + all_files.size()); log_files.reserve(all_files.size());
VectorLogPtr::iterator pos_start;
if (!log_files.empty()) {
pos_start = log_files.end() - 1;
} else {
pos_start = log_files.begin();
}
for (const auto& f : all_files) { for (const auto& f : all_files) {
uint64_t number; uint64_t number;
FileType type; FileType type;
@ -1830,7 +1829,7 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path,
} }
} }
CompareLogByPointer compare_log_files; CompareLogByPointer compare_log_files;
std::sort(pos_start, log_files.end(), compare_log_files); std::sort(log_files.begin(), log_files.end(), compare_log_files);
return status; return status;
} }
@ -2014,8 +2013,9 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
LogBuffer* log_buffer) { LogBuffer* log_buffer) {
Status stat; Status stat;
while (stat.ok() && imm_.IsFlushPending()) { while (stat.ok() && imm_.IsFlushPending()) {
Log(options_.info_log, LogToBuffer(log_buffer,
"BackgroundCallFlush doing FlushMemTableToOutputFile, flush slots available %d", "BackgroundCallFlush doing FlushMemTableToOutputFile, "
"flush slots available %d",
options_.max_background_flushes - bg_flush_scheduled_); options_.max_background_flushes - bg_flush_scheduled_);
stat = FlushMemTableToOutputFile(madeProgress, deletion_state, log_buffer); stat = FlushMemTableToOutputFile(madeProgress, deletion_state, log_buffer);
} }
@ -2462,7 +2462,8 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
} }
Status DBImpl::InstallCompactionResults(CompactionState* compact) { Status DBImpl::InstallCompactionResults(CompactionState* compact,
LogBuffer* log_buffer) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// paranoia: verify that the files that we started with // paranoia: verify that the files that we started with
@ -2478,11 +2479,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
return Status::Corruption("Compaction input files inconsistent"); return Status::Corruption("Compaction input files inconsistent");
} }
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", LogToBuffer(
compact->compaction->num_input_files(0), log_buffer, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->level(), compact->compaction->num_input_files(0), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->num_input_files(1), compact->compaction->level() + 1,
compact->compaction->level() + 1,
static_cast<long long>(compact->total_bytes)); static_cast<long long>(compact->total_bytes));
// Add compaction outputs // Add compaction outputs
@ -2906,17 +2906,16 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
bool prefix_initialized = false; bool prefix_initialized = false;
int64_t imm_micros = 0; // Micros spent doing imm_ compactions int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log, LogToBuffer(log_buffer,
"Compacting %d@%d + %d@%d files, score %.2f slots available %d", "Compacting %d@%d + %d@%d files, score %.2f slots available %d",
compact->compaction->num_input_files(0), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->num_input_files(1),
compact->compaction->output_level(), compact->compaction->output_level(), compact->compaction->score(),
compact->compaction->score(),
options_.max_background_compactions - bg_compaction_scheduled_); options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[2345]; char scratch[2345];
compact->compaction->Summary(scratch, sizeof(scratch)); compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch); LogToBuffer(log_buffer, "Compaction start summary: %s\n", scratch);
assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0); assert(versions_->current()->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == nullptr); assert(compact->builder == nullptr);
@ -3174,11 +3173,12 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
ReleaseCompactionUnusedFileNumbers(compact); ReleaseCompactionUnusedFileNumbers(compact);
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(compact); status = InstallCompactionResults(compact, log_buffer);
InstallSuperVersion(deletion_state); InstallSuperVersion(deletion_state);
} }
Version::LevelSummaryStorage tmp; Version::LevelSummaryStorage tmp;
Log(options_.info_log, LogToBuffer(
log_buffer,
"compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) " "compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s\n", "write-amplify(%.1f) %s\n",

View File

@ -388,13 +388,14 @@ class DBImpl : public DB {
Status OpenCompactionOutputFile(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact); Status InstallCompactionResults(CompactionState* compact,
LogBuffer* log_buffer);
void AllocateCompactionOutputFileNumbers(CompactionState* compact); void AllocateCompactionOutputFileNumbers(CompactionState* compact);
void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); void ReleaseCompactionUnusedFileNumbers(CompactionState* compact);
void PurgeObsoleteWALFiles(); void PurgeObsoleteWALFiles();
Status AppendSortedWalsOfType(const std::string& path, Status GetSortedWalsOfType(const std::string& path,
VectorLogPtr& log_files, VectorLogPtr& log_files,
WalFileType type); WalFileType type);

View File

@ -37,6 +37,7 @@
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/statistics.h" #include "util/statistics.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/sync_point.h"
#include "util/testutil.h" #include "util/testutil.h"
namespace rocksdb { namespace rocksdb {
@ -5189,6 +5190,51 @@ TEST(DBTest, TransactionLogIterator) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST(DBTest, TransactionLogIteratorRace) {
// Setup sync point dependency to reproduce the race condition of
// a log file moved to archived dir, in the middle of GetSortedWalFiles
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{ { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1" },
{ "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" },
});
do {
rocksdb::SyncPoint::GetInstance()->ClearTrace();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Options options = OptionsForLogIterTest();
DestroyAndReopen(&options);
Put("key1", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key2", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key3", DummyString(1024));
dbfull()->Flush(FlushOptions());
Put("key4", DummyString(1024));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
{
auto iter = OpenTransactionLogIter(0);
ExpectRecords(4, iter);
}
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// trigger async flush, and log move. Well, log move will
// wait until the GetSortedWalFiles:1 to reproduce the race
// condition
FlushOptions flush_options;
flush_options.wait = false;
dbfull()->Flush(flush_options);
// "key5" would be written in a new memtable and log
Put("key5", DummyString(1024));
{
// this iter would miss "key4" if not fixed
auto iter = OpenTransactionLogIter(0);
ExpectRecords(5, iter);
}
} while (ChangeCompactOptions());
}
TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) {
do { do {
Options options = OptionsForLogIterTest(); Options options = OptionsForLogIterTest();

View File

@ -11,6 +11,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/log_buffer.h"
namespace rocksdb { namespace rocksdb {
@ -140,10 +141,10 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Record a successful flush in the manifest file // Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults( Status MemTableList::InstallMemtableFlushResults(
const autovector<MemTable*>& mems, VersionSet* vset, const autovector<MemTable*>& mems, VersionSet* vset, port::Mutex* mu,
port::Mutex* mu, Logger* info_log, uint64_t file_number, Logger* info_log, uint64_t file_number, std::set<uint64_t>& pending_outputs,
std::set<uint64_t>& pending_outputs, autovector<MemTable*>* to_delete, autovector<MemTable*>* to_delete, Directory* db_directory,
Directory* db_directory) { LogBuffer* log_buffer) {
mu->AssertHeld(); mu->AssertHeld();
// flush was sucessful // flush was sucessful
@ -173,8 +174,7 @@ Status MemTableList::InstallMemtableFlushResults(
break; break;
} }
Log(info_log, LogToBuffer(log_buffer, "Level-0 commit table #%lu started",
"Level-0 commit table #%lu started",
(unsigned long)m->file_number_); (unsigned long)m->file_number_);
// this can release and reacquire the mutex. // this can release and reacquire the mutex.
@ -189,10 +189,8 @@ Status MemTableList::InstallMemtableFlushResults(
uint64_t mem_id = 1; // how many memtables has been flushed. uint64_t mem_id = 1; // how many memtables has been flushed.
do { do {
if (s.ok()) { // commit new state if (s.ok()) { // commit new state
Log(info_log, LogToBuffer(log_buffer, "Level-0 commit table #%lu: memtable #%lu done",
"Level-0 commit table #%lu: memtable #%lu done", (unsigned long)m->file_number_, (unsigned long)mem_id);
(unsigned long)m->file_number_,
(unsigned long)mem_id);
current_->Remove(m); current_->Remove(m);
assert(m->file_number_ > 0); assert(m->file_number_ > 0);

View File

@ -104,7 +104,8 @@ class MemTableList {
Logger* info_log, uint64_t file_number, Logger* info_log, uint64_t file_number,
std::set<uint64_t>& pending_outputs, std::set<uint64_t>& pending_outputs,
autovector<MemTable*>* to_delete, autovector<MemTable*>* to_delete,
Directory* db_directory); Directory* db_directory,
LogBuffer* log_buffer);
// New memtables are inserted at the front of the list. // New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add(). // Takes ownership of the referenced held on *m by the caller of Add().

View File

@ -243,6 +243,7 @@ extern void rocksdb_options_set_paranoid_checks(
rocksdb_options_t*, unsigned char); rocksdb_options_t*, unsigned char);
extern void rocksdb_options_set_env(rocksdb_options_t*, rocksdb_env_t*); extern void rocksdb_options_set_env(rocksdb_options_t*, rocksdb_env_t*);
extern void rocksdb_options_set_info_log(rocksdb_options_t*, rocksdb_logger_t*); extern void rocksdb_options_set_info_log(rocksdb_options_t*, rocksdb_logger_t*);
extern void rocksdb_options_set_info_log_level(rocksdb_options_t*, int);
extern void rocksdb_options_set_write_buffer_size(rocksdb_options_t*, size_t); extern void rocksdb_options_set_write_buffer_size(rocksdb_options_t*, size_t);
extern void rocksdb_options_set_max_open_files(rocksdb_options_t*, int); extern void rocksdb_options_set_max_open_files(rocksdb_options_t*, int);
extern void rocksdb_options_set_cache(rocksdb_options_t*, rocksdb_cache_t*); extern void rocksdb_options_set_cache(rocksdb_options_t*, rocksdb_cache_t*);
@ -275,6 +276,8 @@ extern void rocksdb_options_set_expanded_compaction_factor(
rocksdb_options_t*, int); rocksdb_options_t*, int);
extern void rocksdb_options_set_max_grandparent_overlap_factor( extern void rocksdb_options_set_max_grandparent_overlap_factor(
rocksdb_options_t*, int); rocksdb_options_t*, int);
extern void rocksdb_options_set_max_bytes_for_level_multiplier_additional(
rocksdb_options_t*, int* level_values, size_t num_levels);
extern void rocksdb_options_enable_statistics(rocksdb_options_t*); extern void rocksdb_options_enable_statistics(rocksdb_options_t*);
extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int); extern void rocksdb_options_set_max_write_buffer_number(rocksdb_options_t*, int);
@ -330,10 +333,14 @@ extern void rocksdb_options_set_block_size_deviation(
rocksdb_options_t*, int); rocksdb_options_t*, int);
extern void rocksdb_options_set_advise_random_on_open( extern void rocksdb_options_set_advise_random_on_open(
rocksdb_options_t*, unsigned char); rocksdb_options_t*, unsigned char);
extern void rocksdb_options_set_access_hint_on_compaction_start(
rocksdb_options_t*, int);
extern void rocksdb_options_set_use_adaptive_mutex( extern void rocksdb_options_set_use_adaptive_mutex(
rocksdb_options_t*, unsigned char); rocksdb_options_t*, unsigned char);
extern void rocksdb_options_set_bytes_per_sync( extern void rocksdb_options_set_bytes_per_sync(
rocksdb_options_t*, uint64_t); rocksdb_options_t*, uint64_t);
extern void rocksdb_options_set_verify_checksums_in_compaction(
rocksdb_options_t*, unsigned char);
extern void rocksdb_options_set_filter_deletes( extern void rocksdb_options_set_filter_deletes(
rocksdb_options_t*, unsigned char); rocksdb_options_t*, unsigned char);
extern void rocksdb_options_set_max_sequential_skip_in_iterations( extern void rocksdb_options_set_max_sequential_skip_in_iterations(
@ -348,6 +355,7 @@ extern void rocksdb_options_prepare_for_bulk_load(rocksdb_options_t*);
extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*); extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*);
extern void rocksdb_options_set_hash_skip_list_rep(rocksdb_options_t*, size_t, int32_t, int32_t); extern void rocksdb_options_set_hash_skip_list_rep(rocksdb_options_t*, size_t, int32_t, int32_t);
extern void rocksdb_options_set_hash_link_list_rep(rocksdb_options_t*, size_t); extern void rocksdb_options_set_hash_link_list_rep(rocksdb_options_t*, size_t);
extern void rocksdb_options_set_plain_table_factory(rocksdb_options_t*, uint32_t, int, double, size_t);
extern void rocksdb_options_set_max_bytes_for_level_base(rocksdb_options_t* opt, uint64_t n); extern void rocksdb_options_set_max_bytes_for_level_base(rocksdb_options_t* opt, uint64_t n);
extern void rocksdb_options_set_stats_dump_period_sec(rocksdb_options_t* opt, unsigned int sec); extern void rocksdb_options_set_stats_dump_period_sec(rocksdb_options_t* opt, unsigned int sec);
@ -360,6 +368,16 @@ extern void rocksdb_options_set_memtable_prefix_bloom_probes(
rocksdb_options_t*, uint32_t); rocksdb_options_t*, uint32_t);
extern void rocksdb_options_set_max_successive_merges( extern void rocksdb_options_set_max_successive_merges(
rocksdb_options_t*, size_t); rocksdb_options_t*, size_t);
extern void rocksdb_options_set_min_partial_merge_operands(
rocksdb_options_t*, uint32_t);
extern void rocksdb_options_set_bloom_locality(
rocksdb_options_t*, uint32_t);
extern void rocksdb_options_set_allow_thread_local(
rocksdb_options_t*, unsigned char);
extern void rocksdb_options_set_inplace_update_support(
rocksdb_options_t*, unsigned char);
extern void rocksdb_options_set_inplace_update_num_locks(
rocksdb_options_t*, size_t);
enum { enum {
rocksdb_no_compression = 0, rocksdb_no_compression = 0,

71
tools/auto_sanity_test.sh Executable file
View File

@ -0,0 +1,71 @@
TMP_DIR="/tmp/rocksdb-sanity-test"
if [ "$#" -lt 2 ]; then
echo "usage: ./auto_sanity_test.sh [new_commit] [old_commit]"
echo "Missing either [new_commit] or [old_commit], perform sanity check with the latest and 10th latest commits."
recent_commits=`git log | grep -e "^commit [a-z0-9]\+$"| head -n10 | sed -e 's/commit //g'`
commit_new=`echo "$recent_commits" | head -n1`
commit_old=`echo "$recent_commits" | tail -n1`
echo "the most recent commits are:"
echo "$recent_commits"
else
commit_new=$1
commit_old=$2
fi
if [ ! -d $TMP_DIR ]; then
mkdir $TMP_DIR
fi
dir_new="${TMP_DIR}/${commit_new}"
dir_old="${TMP_DIR}/${commit_old}"
function makestuff() {
echo "make clean"
make clean > /dev/null
echo "make db_sanity_test -j32"
make db_sanity_test -j32 > /dev/null
if [ $? -ne 0 ]; then
echo "[ERROR] Failed to perform 'make db_sanity_test'"
exit 1
fi
}
rm -r -f $dir_new
rm -r -f $dir_old
echo "Running db sanity check with commits $commit_new and $commit_old."
echo "============================================================="
echo "Making build $commit_new"
makestuff
mv db_sanity_test new_db_sanity_test
echo "Creating db based on the new commit --- $commit_new"
./new_db_sanity_test $dir_new create
echo "============================================================="
echo "Making build $commit_old"
makestuff
mv db_sanity_test old_db_sanity_test
echo "Creating db based on the old commit --- $commit_old"
./old_db_sanity_test $dir_old create
echo "============================================================="
echo "Verifying new db $dir_new using the old commit --- $commit_old"
./old_db_sanity_test $dir_new verify
if [ $? -ne 0 ]; then
echo "[ERROR] Verification of $dir_new using commit $commit_old failed."
exit 2
fi
echo "============================================================="
echo "Verifying old db $dir_old using the new commit --- $commit_new"
./new_db_sanity_test $dir_old verify
if [ $? -ne 0 ]; then
echo "[ERROR] Verification of $dir_old using commit $commit_new failed."
exit 2
fi
rm old_db_sanity_test
rm new_db_sanity_test
echo "Auto sanity test passed!"

62
util/sync_point.cc Normal file
View File

@ -0,0 +1,62 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/sync_point.h"
namespace rocksdb {
SyncPoint* SyncPoint::GetInstance() {
static SyncPoint sync_point;
return &sync_point;
}
void SyncPoint::LoadDependency(const std::vector<Dependency>& dependencies) {
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
}
bool SyncPoint::PredecessorsAllCleared(const std::string& point) {
for (const auto& pred : predecessors_[point]) {
if (cleared_points_.count(pred) == 0) {
return false;
}
}
return true;
}
void SyncPoint::EnableProcessing() {
std::unique_lock<std::mutex> lock(mutex_);
enabled_ = true;
}
void SyncPoint::DisableProcessing() {
std::unique_lock<std::mutex> lock(mutex_);
enabled_ = false;
}
void SyncPoint::ClearTrace() {
std::unique_lock<std::mutex> lock(mutex_);
cleared_points_.clear();
}
void SyncPoint::Process(const std::string& point) {
std::unique_lock<std::mutex> lock(mutex_);
if (!enabled_) return;
while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
}
cleared_points_.insert(point);
cv_.notify_all();
}
} // namespace rocksdb

79
util/sync_point.h Normal file
View File

@ -0,0 +1,79 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <condition_variable>
#include <mutex>
#include <string>
#include <unordered_set>
#include <unordered_map>
#include <vector>
namespace rocksdb {
// This class provides facility to reproduce race conditions deterministically
// in unit tests.
// Developer could specify sync points in the codebase via TEST_SYNC_POINT.
// Each sync point represents a position in the execution stream of a thread.
// In the unit test, 'Happens After' relationship among sync points could be
// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of
// threads execution.
// Refer to (DBTest,TransactionLogIteratorRace), for an exmaple use case.
class SyncPoint {
public:
static SyncPoint* GetInstance();
struct Dependency {
std::string predecessor;
std::string successor;
};
// call once at the beginning of a test to setup the dependency between
// sync points
void LoadDependency(const std::vector<Dependency>& dependencies);
// enable sync point processing (disabled on startup)
void EnableProcessing();
// disable sync point processing
void DisableProcessing();
// remove the execution trace of all sync points
void ClearTrace();
// triggered by TEST_SYNC_POINT, blocking execution until all predecessors
// are executed.
void Process(const std::string& point);
// TODO: it might be useful to provide a function that blocks until all
// sync points are cleared.
private:
bool PredecessorsAllCleared(const std::string& point);
// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::mutex mutex_;
std::condition_variable cv_;
// sync points that have been passed through
std::unordered_set<std::string> cleared_points_;
bool enabled_ = false;
};
} // namespace rocksdb
// Use TEST_SYNC_POINT to specify sync points inside code base.
// Sync points can have happens-after depedency on other sync points,
// configured at runtime via SyncPoint::LoadDependency. This could be
// utilized to re-produce race conditions between threads.
// See TransactionLogIteratorRace in db_test.cc for an example use case.
// TEST_SYNC_POINT is no op in release build.
#ifdef NDEBUG
#define TEST_SYNC_POINT(x)
#else
#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x)
#endif