From 04251e1e3a60d44e1c4e70340c125606caa32857 Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Fri, 19 Jun 2015 16:08:31 -0700 Subject: [PATCH] Add wal files to Checkpoint for multiple column families. Summary: When there are multiple column families, the flush in GetLiveFiles is not atomic, so that there are entries in the wal files which are needed to get a consisten RocksDB. We now add the log files to the checkpoint. Test Plan: CheckpointCF - This test forces more data to be written to the other column families after the flush of the first column family but before the second. Reviewers: igor, yhchiang, IslamAbdelRahman, anthony, kradhakrishnan, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D40323 --- Makefile | 4 + db/db_filesnapshot.cc | 2 + db/db_test.cc | 57 ---- src.mk | 1 + utilities/checkpoint/checkpoint.cc | 46 ++++ utilities/checkpoint/checkpoint_test.cc | 349 ++++++++++++++++++++++++ 6 files changed, 402 insertions(+), 57 deletions(-) create mode 100644 utilities/checkpoint/checkpoint_test.cc diff --git a/Makefile b/Makefile index 1f2a10fd4..4b4954091 100644 --- a/Makefile +++ b/Makefile @@ -231,6 +231,7 @@ TESTS = \ dynamic_bloom_test \ c_test \ cache_test \ + checkpoint_test \ coding_test \ corruption_test \ crc32c_test \ @@ -695,6 +696,9 @@ prefix_test: db/prefix_test.o $(LIBOBJECTS) $(TESTHARNESS) backupable_db_test: utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +checkpoint_test: utilities/checkpoint/checkpoint_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + document_db_test: utilities/document/document_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index c72430301..e39ccf496 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -98,6 +98,8 @@ Status DBImpl::GetLiveFiles(std::vector& ret, cfd->Ref(); mutex_.Unlock(); status = FlushMemTable(cfd, FlushOptions()); + TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); + TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); mutex_.Lock(); cfd->Unref(); if (!status.ok()) { diff --git a/db/db_test.cc b/db/db_test.cc index b4412ff74..be7e69852 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1863,63 +1863,6 @@ TEST_F(DBTest, GetSnapshot) { } while (ChangeOptions()); } -TEST_F(DBTest, GetSnapshotLink) { - do { - Options options; - const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; - DB* snapshotDB; - ReadOptions roptions; - std::string result; - Checkpoint* checkpoint; - - options = CurrentOptions(options); - delete db_; - db_ = nullptr; - ASSERT_OK(DestroyDB(dbname_, options)); - ASSERT_OK(DestroyDB(snapshot_name, options)); - env_->DeleteDir(snapshot_name); - - // Create a database - Status s; - options.create_if_missing = true; - ASSERT_OK(DB::Open(options, dbname_, &db_)); - std::string key = std::string("foo"); - ASSERT_OK(Put(key, "v1")); - // Take a snapshot - ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); - ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name)); - ASSERT_OK(Put(key, "v2")); - ASSERT_EQ("v2", Get(key)); - ASSERT_OK(Flush()); - ASSERT_EQ("v2", Get(key)); - // Open snapshot and verify contents while DB is running - options.create_if_missing = false; - ASSERT_OK(DB::Open(options, snapshot_name, &snapshotDB)); - ASSERT_OK(snapshotDB->Get(roptions, key, &result)); - ASSERT_EQ("v1", result); - delete snapshotDB; - snapshotDB = nullptr; - delete db_; - db_ = nullptr; - - // Destroy original DB - ASSERT_OK(DestroyDB(dbname_, options)); - - // Open snapshot and verify contents - options.create_if_missing = false; - dbname_ = snapshot_name; - ASSERT_OK(DB::Open(options, dbname_, &db_)); - ASSERT_EQ("v1", Get(key)); - delete db_; - db_ = nullptr; - ASSERT_OK(DestroyDB(dbname_, options)); - delete checkpoint; - - // Restore DB name - dbname_ = test::TmpDir(env_) + "/db_test"; - } while (ChangeOptions()); -} - TEST_F(DBTest, GetLevel0Ordering) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); diff --git a/src.mk b/src.mk index ec61aa28e..f6267f5a5 100644 --- a/src.mk +++ b/src.mk @@ -211,6 +211,7 @@ TEST_BENCH_SOURCES = \ util/filelock_test.cc \ util/histogram_test.cc \ utilities/backupable/backupable_db_test.cc \ + utilities/checkpoint/checkpoint_test.cc \ utilities/document/document_db_test.cc \ utilities/document/json_document_test.cc \ utilities/geodb/geodb_test.cc \ diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint.cc index 760a6dbf4..f9683e6e7 100644 --- a/utilities/checkpoint/checkpoint.cc +++ b/utilities/checkpoint/checkpoint.cc @@ -19,8 +19,10 @@ #include #include #include "db/filename.h" +#include "db/wal_manager.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/transaction_log.h" #include "util/file_util.h" namespace rocksdb { @@ -60,6 +62,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { uint64_t manifest_file_size = 0; uint64_t sequence_number = db_->GetLatestSequenceNumber(); bool same_fs = true; + VectorLogPtr live_wal_files; if (db_->GetEnv()->FileExists(checkpoint_dir)) { return Status::InvalidArgument("Directory exists"); @@ -70,11 +73,16 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { // this will return live_files prefixed with "/" s = db_->GetLiveFiles(live_files, &manifest_file_size, true); } + // if we have more than one column family, we need to also get WAL files + if (s.ok()) { + s = db_->GetSortedWalFiles(live_wal_files); + } if (!s.ok()) { db_->EnableFileDeletions(false); return s; } + size_t wal_size = live_wal_files.size(); Log(db_->GetOptions().info_log, "Started the snapshot process -- creating snapshot in directory %s", checkpoint_dir.c_str()); @@ -119,6 +127,44 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { (type == kDescriptorFile) ? manifest_file_size : 0); } } + Log(db_->GetOptions().info_log, "Number of log files %ld", + live_wal_files.size()); + + // Link WAL files. Copy exact size of last one because it is the only one + // that has changes after the last flush. + for (size_t i = 0; s.ok() && i < wal_size; ++i) { + if ((live_wal_files[i]->Type() == kAliveLogFile) && + (live_wal_files[i]->StartSequence() >= sequence_number)) { + if (i + 1 == wal_size) { + Log(db_->GetOptions().info_log, "Copying %s", + live_wal_files[i]->PathName().c_str()); + s = CopyFile(db_->GetEnv(), + db_->GetOptions().wal_dir + live_wal_files[i]->PathName(), + full_private_path + live_wal_files[i]->PathName(), + live_wal_files[i]->SizeFileBytes()); + break; + } + if (same_fs) { + // we only care about live log files + Log(db_->GetOptions().info_log, "Hard Linking %s", + live_wal_files[i]->PathName().c_str()); + s = db_->GetEnv()->LinkFile( + db_->GetOptions().wal_dir + live_wal_files[i]->PathName(), + full_private_path + live_wal_files[i]->PathName()); + if (s.IsNotSupported()) { + same_fs = false; + s = Status::OK(); + } + } + if (!same_fs) { + Log(db_->GetOptions().info_log, "Copying %s", + live_wal_files[i]->PathName().c_str()); + s = CopyFile(db_->GetEnv(), + db_->GetOptions().wal_dir + live_wal_files[i]->PathName(), + full_private_path + live_wal_files[i]->PathName(), 0); + } + } + } // we copied all the files, enable file deletions db_->EnableFileDeletions(false); diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc new file mode 100644 index 000000000..0cdf584ff --- /dev/null +++ b/utilities/checkpoint/checkpoint_test.cc @@ -0,0 +1,349 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include +#include +#include +#include "db/db_impl.h" +#include "port/stack_trace.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/utilities/checkpoint.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "util/xfunc.h" + +namespace rocksdb { +class DBTest : public testing::Test { + protected: + // Sequence of option configurations to try + enum OptionConfig { + kDefault = 0, + }; + int option_config_; + + public: + std::string dbname_; + std::string alternative_wal_dir_; + Env* env_; + DB* db_; + Options last_options_; + std::vector handles_; + + DBTest() : env_(Env::Default()) { + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); + dbname_ = test::TmpDir(env_) + "/db_test"; + alternative_wal_dir_ = dbname_ + "/wal"; + auto options = CurrentOptions(); + auto delete_options = options; + delete_options.wal_dir = alternative_wal_dir_; + EXPECT_OK(DestroyDB(dbname_, delete_options)); + // Destroy it for not alternative WAL dir is used. + EXPECT_OK(DestroyDB(dbname_, options)); + db_ = nullptr; + Reopen(options); + } + + ~DBTest() { + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->LoadDependency({}); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + Close(); + Options options; + options.db_paths.emplace_back(dbname_, 0); + options.db_paths.emplace_back(dbname_ + "_2", 0); + options.db_paths.emplace_back(dbname_ + "_3", 0); + options.db_paths.emplace_back(dbname_ + "_4", 0); + EXPECT_OK(DestroyDB(dbname_, options)); + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + options.env = env_; + options.create_if_missing = true; + return options; + } + + void CreateColumnFamilies(const std::vector& cfs, + const Options& options) { + ColumnFamilyOptions cf_opts(options); + size_t cfi = handles_.size(); + handles_.resize(cfi + cfs.size()); + for (auto cf : cfs) { + ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++])); + } + } + + void CreateAndReopenWithCF(const std::vector& cfs, + const Options& options) { + CreateColumnFamilies(cfs, options); + std::vector cfs_plus_default = cfs; + cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName); + ReopenWithColumnFamilies(cfs_plus_default, options); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const std::vector& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + void ReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); + } + + Status TryReopenWithColumnFamilies( + const std::vector& cfs, + const std::vector& options) { + Close(); + EXPECT_EQ(cfs.size(), options.size()); + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i])); + } + DBOptions db_opts = DBOptions(options[0]); + return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); + } + + Status TryReopenWithColumnFamilies(const std::vector& cfs, + const Options& options) { + Close(); + std::vector v_opts(cfs.size(), options); + return TryReopenWithColumnFamilies(cfs, v_opts); + } + + void Reopen(const Options& options) { + ASSERT_OK(TryReopen(options)); + } + + void Close() { + for (auto h : handles_) { + delete h; + } + handles_.clear(); + delete db_; + db_ = nullptr; + } + + void DestroyAndReopen(const Options& options) { + // Destroy using last options + Destroy(last_options_); + ASSERT_OK(TryReopen(options)); + } + + void Destroy(const Options& options) { + Close(); + ASSERT_OK(DestroyDB(dbname_, options)); + } + + Status ReadOnlyReopen(const Options& options) { + return DB::OpenForReadOnly(options, dbname_, &db_); + } + + Status TryReopen(const Options& options) { + Close(); + last_options_ = options; + return DB::Open(options, dbname_, &db_); + } + + Status Flush(int cf = 0) { + if (cf == 0) { + return db_->Flush(FlushOptions()); + } else { + return db_->Flush(FlushOptions(), handles_[cf]); + } + } + + Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) { + return db_->Put(wo, k, v); + } + + Status Put(int cf, const Slice& k, const Slice& v, + WriteOptions wo = WriteOptions()) { + return db_->Put(wo, handles_[cf], k, v); + } + + Status Delete(const std::string& k) { + return db_->Delete(WriteOptions(), k); + } + + Status Delete(int cf, const std::string& k) { + return db_->Delete(WriteOptions(), handles_[cf], k); + } + + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + std::string Get(int cf, const std::string& k, + const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.verify_checksums = true; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, handles_[cf], k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } +}; + +TEST_F(DBTest, GetSnapshotLink) { + Options options; + const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + Checkpoint* checkpoint; + + options = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_OK(DestroyDB(snapshot_name, options)); + env_->DeleteDir(snapshot_name); + + // Create a database + Status s; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + std::string key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + // Take a snapshot + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name)); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_OK(Flush()); + ASSERT_EQ("v2", Get(key)); + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + ASSERT_OK(DB::Open(options, snapshot_name, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, key, &result)); + ASSERT_EQ("v1", result); + delete snapshotDB; + snapshotDB = nullptr; + delete db_; + db_ = nullptr; + + // Destroy original DB + ASSERT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents + options.create_if_missing = false; + dbname_ = snapshot_name; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + ASSERT_EQ("v1", Get(key)); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + delete checkpoint; + + // Restore DB name + dbname_ = test::TmpDir(env_) + "/db_test"; +} + +TEST_F(DBTest, CheckpointCF) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBTest::CheckpointCF:2", + "DBImpl::GetLiveFiles:2"}, + {"DBImpl::GetLiveFiles:1", + "DBTest::CheckpointCF:1"}}); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + ASSERT_OK(Put(2, "two", "two")); + ASSERT_OK(Put(3, "three", "three")); + ASSERT_OK(Put(4, "four", "four")); + ASSERT_OK(Put(5, "five", "five")); + + const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector cphandles; + + ASSERT_OK(DestroyDB(snapshot_name, options)); + env_->DeleteDir(snapshot_name); + + Status s; + // Take a snapshot + std::thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name)); + }); + TEST_SYNC_POINT("DBTest::CheckpointCF:1"); + ASSERT_OK(Put(0, "Default", "Default1")); + ASSERT_OK(Put(1, "one", "eleven")); + ASSERT_OK(Put(2, "two", "twelve")); + ASSERT_OK(Put(3, "three", "thirteen")); + ASSERT_OK(Put(4, "four", "fourteen")); + ASSERT_OK(Put(5, "five", "fifteen")); + TEST_SYNC_POINT("DBTest::CheckpointCF:2"); + t.join(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_OK(Put(1, "one", "twentyone")); + ASSERT_OK(Put(2, "two", "twentytwo")); + ASSERT_OK(Put(3, "three", "twentythree")); + ASSERT_OK(Put(4, "four", "twentyfour")); + ASSERT_OK(Put(5, "five", "twentyfive")); + ASSERT_OK(Flush()); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector cfs; + cfs= {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name, + column_families, &cphandles, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default1", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("eleven", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; + ASSERT_OK(DestroyDB(snapshot_name, options)); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}