Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/db_test.cc
This commit is contained in:
Igor Canadi 2014-03-20 14:41:37 -07:00
commit ac328a86b9
12 changed files with 320 additions and 105 deletions

View File

@ -19,6 +19,8 @@
* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership)
Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly)
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
* Added a command "checkconsistency" in ldb tool, which checks
if file system state matches DB state (file existence and file sizes)
### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files,

View File

@ -376,6 +376,39 @@ TEST(CorruptionTest, UnrelatedKeys) {
ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
}
TEST(CorruptionTest, FileSystemStateCorrupted) {
for (int iter = 0; iter < 2; ++iter) {
Options options;
options.paranoid_checks = true;
options.create_if_missing = true;
Reopen(&options);
Build(10);
ASSERT_OK(db_->Flush(FlushOptions()));
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
std::vector<LiveFileMetaData> metadata;
dbi->GetLiveFilesMetaData(&metadata);
ASSERT_GT(metadata.size(), 0);
std::string filename = dbname_ + metadata[0].name;
delete db_;
db_ = nullptr;
if (iter == 0) { // corrupt file size
unique_ptr<WritableFile> file;
env_.NewWritableFile(filename, &file, EnvOptions());
file->Append(Slice("corrupted sst"));
file.reset();
} else { // delete the file
env_.DeleteFile(filename);
}
Status x = TryReopen(&options);
ASSERT_TRUE(x.IsCorruption());
DestroyDB(dbname_, options_);
Reopen(&options);
}
}
} // namespace rocksdb
int main(int argc, char** argv) {

View File

@ -826,6 +826,9 @@ Status DBImpl::Recover(
}
Status s = versions_->Recover(column_families);
if (options_.paranoid_checks && s.ok()) {
s = CheckConsistency();
}
if (s.ok()) {
SequenceNumber max_sequence(0);
default_cf_handle_ = new ColumnFamilyHandleImpl(
@ -1211,13 +1214,13 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
if (!s.ok()) {
cfd->imm()->RollbackMemtableFlush(mems, file_number, &pending_outputs_);
return s;
}
} else {
// Replace immutable memtable with the generated Table
s = cfd->imm()->InstallMemtableFlushResults(
cfd, mems, versions_.get(), &mutex_, options_.info_log.get(), file_number,
pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get());
cfd, mems, versions_.get(), &mutex_, options_.info_log.get(),
file_number, pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get());
}
if (s.ok()) {
InstallSuperVersion(cfd, deletion_state);
@ -1236,6 +1239,13 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
}
}
}
if (!s.ok() && !s.IsShutdownInProgress() && options_.paranoid_checks &&
bg_error_.ok()) {
// if a bad error happened (not ShutdownInProgress) and paranoid_checks is
// true, mark DB read-only
bg_error_ = s;
}
return s;
}
@ -3955,6 +3965,33 @@ void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
versions_->GetLiveFilesMetaData(metadata);
}
Status DBImpl::CheckConsistency() {
mutex_.AssertHeld();
std::vector<LiveFileMetaData> metadata;
versions_->GetLiveFilesMetaData(&metadata);
std::string corruption_messages;
for (const auto& md : metadata) {
std::string file_path = dbname_ + md.name;
uint64_t fsize = 0;
Status s = env_->GetFileSize(file_path, &fsize);
if (!s.ok()) {
corruption_messages +=
"Can't access " + md.name + ": " + s.ToString() + "\n";
} else if (fsize != md.size) {
corruption_messages += "Sst file size mismatch: " + md.name +
". Size recorded in manifest " +
std::to_string(md.size) + ", actual size " +
std::to_string(fsize) + "\n";
}
}
if (corruption_messages.size() == 0) {
return Status::OK();
} else {
return Status::Corruption(corruption_messages);
}
}
void DBImpl::TEST_GetFilesMetaData(
ColumnFamilyHandle* column_family,
std::vector<std::vector<FileMetaData>>* metadata) {

View File

@ -130,6 +130,10 @@ class DBImpl : public DB {
virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata);
// checks if all live files exist on file system and that their file sizes
// match to our in-memory records
virtual Status CheckConsistency();
virtual Status GetDbIdentity(std::string& identity);
Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,

View File

@ -114,4 +114,4 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
return s;
}
}
} // namespace rocksdb

View File

@ -2531,6 +2531,97 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
}
// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count;
static std::string NEW_VALUE = "NewValue";
class KeepFilter : public CompactionFilter {
public:
virtual bool Filter(int level, const Slice& key, const Slice& value,
std::string* new_value, bool* value_changed) const
override {
cfilter_count++;
return false;
}
virtual const char* Name() const override { return "KeepFilter"; }
};
class DeleteFilter : public CompactionFilter {
public:
virtual bool Filter(int level, const Slice& key, const Slice& value,
std::string* new_value, bool* value_changed) const
override {
cfilter_count++;
return true;
}
virtual const char* Name() const override { return "DeleteFilter"; }
};
class ChangeFilter : public CompactionFilter {
public:
explicit ChangeFilter() {}
virtual bool Filter(int level, const Slice& key, const Slice& value,
std::string* new_value, bool* value_changed) const
override {
assert(new_value != nullptr);
*new_value = NEW_VALUE;
*value_changed = true;
return false;
}
virtual const char* Name() const override { return "ChangeFilter"; }
};
class KeepFilterFactory : public CompactionFilterFactory {
public:
explicit KeepFilterFactory(bool check_context = false)
: check_context_(check_context) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
if (check_context_) {
ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
}
return std::unique_ptr<CompactionFilter>(new KeepFilter());
}
virtual const char* Name() const override { return "KeepFilterFactory"; }
bool check_context_;
std::atomic_bool expect_full_compaction_;
std::atomic_bool expect_manual_compaction_;
};
class DeleteFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
} else {
return std::unique_ptr<CompactionFilter>(nullptr);
}
}
virtual const char* Name() const override { return "DeleteFilterFactory"; }
};
class ChangeFilterFactory : public CompactionFilterFactory {
public:
explicit ChangeFilterFactory() {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new ChangeFilter());
}
virtual const char* Name() const override { return "ChangeFilterFactory"; }
};
// TODO(kailiu) The tests on UniversalCompaction has some issues:
// 1. A lot of magic numbers ("11" or "12").
// 2. Made assumption on the memtable flush conidtions, which may change from
@ -2541,11 +2632,16 @@ TEST(DBTest, UniversalCompactionTrigger) {
options.write_buffer_size = 100<<10; //100KB
// trigger compaction if there are >= 4 files
options.level0_file_num_compaction_trigger = 4;
KeepFilterFactory* filter = new KeepFilterFactory(true);
filter->expect_manual_compaction_.store(false);
options.compaction_filter_factory.reset(filter);
CreateAndReopenWithCF({"pikachu"}, &options);
Random rnd(301);
int key_idx = 0;
filter->expect_full_compaction_.store(true);
// Stage 1:
// Generate a set of files at level 0, but don't trigger level-0
// compaction.
@ -2582,6 +2678,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// First, clean up memtable before inserting new data. This will generate
// a level-0 file, with size around 0.4 (according to previously written
// data amount).
filter->expect_full_compaction_.store(false);
ASSERT_OK(Flush(1));
for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
num++) {
@ -2653,6 +2750,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Stage 5:
// Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
// a new file of size 1.
filter->expect_full_compaction_.store(true);
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
@ -3381,100 +3479,6 @@ TEST(DBTest, InPlaceUpdateCallbackNoAction) {
} while (ChangeCompactOptions());
}
// This is a static filter used for filtering
// kvs during the compaction process.
static int cfilter_count;
static std::string NEW_VALUE = "NewValue";
class KeepFilter : public CompactionFilter {
public:
virtual bool Filter(int level, const Slice& key,
const Slice& value, std::string* new_value,
bool* value_changed) const override {
cfilter_count++;
return false;
}
virtual const char* Name() const override {
return "KeepFilter";
}
};
class DeleteFilter : public CompactionFilter {
public:
virtual bool Filter(int level, const Slice& key,
const Slice& value, std::string* new_value,
bool* value_changed) const override {
cfilter_count++;
return true;
}
virtual const char* Name() const override {
return "DeleteFilter";
}
};
class ChangeFilter : public CompactionFilter {
public:
explicit ChangeFilter() {}
virtual bool Filter(int level, const Slice& key,
const Slice& value, std::string* new_value,
bool* value_changed) const override {
assert(new_value != nullptr);
*new_value = NEW_VALUE;
*value_changed = true;
return false;
}
virtual const char* Name() const override {
return "ChangeFilter";
}
};
class KeepFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new KeepFilter());
}
virtual const char* Name() const override {
return "KeepFilterFactory";
}
};
class DeleteFilterFactory : public CompactionFilterFactory {
public:
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(const CompactionFilter::Context& context) override {
if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new DeleteFilter());
} else {
return std::unique_ptr<CompactionFilter>(nullptr);
}
}
virtual const char* Name() const override {
return "DeleteFilterFactory";
}
};
class ChangeFilterFactory : public CompactionFilterFactory {
public:
explicit ChangeFilterFactory() {}
virtual std::unique_ptr<CompactionFilter>
CreateCompactionFilter(const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new ChangeFilter());
}
virtual const char* Name() const override {
return "ChangeFilterFactory";
}
};
TEST(DBTest, CompactionFilter) {
Options options = CurrentOptions();
options.num_levels = 3;
@ -3665,6 +3669,60 @@ TEST(DBTest, CompactionFilterWithValueChange) {
} while (ChangeCompactOptions());
}
TEST(DBTest, CompactionFilterContextManual) {
KeepFilterFactory* filter = new KeepFilterFactory();
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.compaction_filter_factory.reset(filter);
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = 8;
Reopen(&options);
int num_keys_per_file = 400;
for (int j = 0; j < 3; j++) {
// Write several keys.
const std::string value(10, 'x');
for (int i = 0; i < num_keys_per_file; i++) {
char key[100];
snprintf(key, sizeof(key), "B%08d%02d", i, j);
Put(key, value);
}
dbfull()->TEST_FlushMemTable();
// Make sure next file is much smaller so automatic compaction will not
// be triggered.
num_keys_per_file /= 2;
}
// Force a manual compaction
cfilter_count = 0;
filter->expect_manual_compaction_.store(true);
filter->expect_full_compaction_.store(false); // Manual compaction always
// set this flag.
dbfull()->CompactRange(nullptr, nullptr);
ASSERT_EQ(cfilter_count, 700);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
// Verify total number of keys is correct after manual compaction.
int count = 0;
int total = 0;
Iterator* iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst();
ASSERT_OK(iter->status());
while (iter->Valid()) {
ParsedInternalKey ikey(Slice(), 0, kTypeValue);
ikey.sequence = -1;
ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true);
total++;
if (ikey.sequence != 0) {
count++;
}
iter->Next();
}
ASSERT_EQ(total, 700);
ASSERT_EQ(count, 1);
delete iter;
}
TEST(DBTest, SparseMerge) {
do {
Options options = CurrentOptions();

View File

@ -8,9 +8,10 @@ import getopt
import logging
import tempfile
import subprocess
import shutil
# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in Rocksdb.
# in case of unsafe crashes in RocksDB.
def main(argv):
try:
@ -59,6 +60,8 @@ def main(argv):
+ str(ops_per_thread) + "\nwrite_buffer_size="
+ str(write_buf_size) + "\n")
dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_')
while time.time() < exit_time:
run_had_errors = False
killtime = time.time() + interval
@ -99,7 +102,7 @@ def main(argv):
""" % (ops_per_thread,
threads,
write_buf_size,
tempfile.mkdtemp(),
dbname,
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1)))
@ -140,5 +143,8 @@ def main(argv):
time.sleep(1) # time to stabilize before the next run
# we need to clean up after ourselves -- only do this on test success
shutil.rmtree(dbname, True)
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))

View File

@ -8,6 +8,7 @@ import getopt
import logging
import tempfile
import subprocess
import shutil
# This python script runs db_stress multiple times. Some runs with
# kill_random_test that causes rocksdb to crash at various points in code.
@ -78,6 +79,7 @@ def main(argv):
# nomral run
additional_opts = "--ops_per_thread=" + str(ops_per_thread)
dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_')
cmd = re.sub('\s+', ' ', """
./db_stress
--test_batches_snapshots=%s
@ -114,7 +116,7 @@ def main(argv):
""" % (random.randint(0, 1),
threads,
write_buf_size,
tempfile.mkdtemp(),
dbname,
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1),
@ -155,6 +157,8 @@ def main(argv):
if (stdoutdata.find('fail') >= 0):
print "TEST FAILED. Output has 'fail'!!!\n"
sys.exit(2)
# we need to clean up after ourselves -- only do this on test success
shutil.rmtree(dbname, True)
check_mode = (check_mode + 1) % total_check_mode

View File

@ -129,6 +129,8 @@ class LDBTestCase(unittest.TestCase):
# It is weird that GET and SCAN raise exception for
# non-existent key, while delete does not
self.assertRunOK("checkconsistency", "OK")
def dumpDb(self, params, dumpFile):
return 0 == run_err_null("./ldb dump %s > %s" % (params, dumpFile))
@ -201,6 +203,7 @@ class LDBTestCase(unittest.TestCase):
self.assertRunOK("scan", "a1 : b1\na2 : b2\na3 : b3\na4 : b4")
self.assertRunOK("delete --hex 0x6133", "OK")
self.assertRunOK("scan", "a1 : b1\na2 : b2\na4 : b4")
self.assertRunOK("checkconsistency", "OK")
def testTtlPutGet(self):
print "Running testTtlPutGet..."
@ -215,6 +218,7 @@ class LDBTestCase(unittest.TestCase):
self.assertRunOK("put a3 b3 --create_if_missing", "OK")
# fails because timstamp's length is greater than value's
self.assertRunFAIL("get --ttl a3")
self.assertRunOK("checkconsistency", "OK")
def testInvalidCmdLines(self):
print "Running testInvalidCmdLines..."
@ -354,5 +358,26 @@ class LDBTestCase(unittest.TestCase):
origDbPath, os.path.join(origDbPath, "LOG"))))
self.assertRunOK("scan", "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")
def testCheckConsistency(self):
print "Running testCheckConsistency..."
dbPath = os.path.join(self.TMP_DIR, self.DB_NAME)
self.assertRunOK("put x1 y1 --create_if_missing", "OK")
self.assertRunOK("put x2 y2", "OK")
self.assertRunOK("get x1", "y1")
self.assertRunOK("checkconsistency", "OK")
sstFilePath = my_check_output("ls %s" % os.path.join(dbPath, "*.sst"),
shell=True)
# Modify the file
my_check_output("echo 'evil' > %s" % sstFilePath, shell=True)
self.assertRunFAIL("checkconsistency")
# Delete the file
my_check_output("rm -f %s" % sstFilePath, shell=True)
self.assertRunFAIL("checkconsistency")
if __name__ == "__main__":
unittest.main()

View File

@ -157,6 +157,8 @@ LDBCommand* LDBCommand::SelectCommand(
return new ListColumnFamiliesCommand(cmdParams, option_map, flags);
} else if (cmd == InternalDumpCommand::Name()) {
return new InternalDumpCommand(cmdParams, option_map, flags);
} else if (cmd == CheckConsistencyCommand::Name()) {
return new CheckConsistencyCommand(cmdParams, option_map, flags);
}
return nullptr;
}
@ -1799,5 +1801,32 @@ void DBQuerierCommand::DoCommand() {
}
}
CheckConsistencyCommand::CheckConsistencyCommand(const vector<string>& params,
const map<string, string>& options, const vector<string>& flags) :
LDBCommand(options, flags, false,
BuildCmdLineOptions({})) {
}
void CheckConsistencyCommand::Help(string& ret) {
ret.append(" ");
ret.append(CheckConsistencyCommand::Name());
ret.append("\n");
}
void CheckConsistencyCommand::DoCommand() {
Options opt = PrepareOptionsForOpenDB();
opt.paranoid_checks = true;
if (!exec_state_.IsNotStarted()) {
return;
}
DB* db;
Status st = DB::OpenForReadOnly(opt, db_path_, &db, false);
delete db;
if (st.ok()) {
fprintf(stdout, "OK\n");
} else {
exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString());
}
}
} // namespace rocksdb

View File

@ -703,4 +703,20 @@ private:
static const char* DELETE_CMD;
};
class CheckConsistencyCommand : public LDBCommand {
public:
static string Name() { return "checkconsistency"; }
CheckConsistencyCommand(const vector<string>& params,
const map<string, string>& options, const vector<string>& flags);
virtual void DoCommand();
virtual bool NoDBOpen() {
return true;
}
static void Help(string& ret);
};
} // namespace rocksdb

View File

@ -53,6 +53,7 @@ public:
DeleteCommand::Help(ret);
DBQuerierCommand::Help(ret);
ApproxSizeCommand::Help(ret);
CheckConsistencyCommand::Help(ret);
ret.append("\n\n");
ret.append("Admin Commands:\n");