diff --git a/db/db_impl.cc b/db/db_impl.cc index 56a96a67d..215e7d941 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2334,6 +2334,24 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { return s; } +Status DBImpl::EnableAutoCompaction( + const std::vector& column_family_handles) { + Status s; + for (auto cf_ptr : column_family_handles) { + // check options here, enable only if didn't initially disable + if (s.ok()) { + s = this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}}); + } + } + + if (s.ok()) { + InstrumentedMutexLock guard_lock(&mutex_); + MaybeScheduleFlushOrCompaction(); + } + + return s; +} + void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); if (!opened_successfully_) { @@ -5007,6 +5025,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } TEST_SYNC_POINT("DBImpl::Open:Opened"); if (s.ok()) { + *dbptr = impl; impl->opened_successfully_ = true; impl->MaybeScheduleFlushOrCompaction(); } @@ -5029,9 +5048,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, persist_options_status.ToString().c_str()); } } - if (s.ok()) { - *dbptr = impl; - } else { + if (!s.ok()) { for (auto* h : *handles) { delete h; } diff --git a/db/db_impl.h b/db/db_impl.h index f16c6d8dc..1ef96be14 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -144,6 +144,9 @@ class DBImpl : public DB { virtual Status PauseBackgroundWork() override; virtual Status ContinueBackgroundWork() override; + virtual Status EnableAutoCompaction( + const std::vector& column_family_handles) override; + using DB::SetOptions; Status SetOptions( ColumnFamilyHandle* column_family, diff --git a/db/db_test.cc b/db/db_test.cc index 6f47eee9a..8a495500c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5850,6 +5850,11 @@ class ModelDB: public DB { return Status::NotSupported("Not supported operation."); } + Status EnableAutoCompaction( + const std::vector& column_family_handles) override { + return Status::NotSupported("Not supported operation."); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return 1; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 35df65f5d..2fb1c51b5 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -558,6 +558,12 @@ class DB { virtual Status PauseBackgroundWork() = 0; virtual Status ContinueBackgroundWork() = 0; + // This function will enable automatic compactions for the given column + // families if they were previously disabled via the disable_auto_compactions + // option. + virtual Status EnableAutoCompaction( + const std::vector& column_family_handles) = 0; + // Number of levels used for this DB. virtual int NumberLevels(ColumnFamilyHandle* column_family) = 0; virtual int NumberLevels() { return NumberLevels(DefaultColumnFamily()); } diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 3bfa0e2a5..542f6c654 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -183,6 +183,11 @@ class StackableDB : public DB { return db_->ContinueBackgroundWork(); } + virtual Status EnableAutoCompaction( + const std::vector& column_family_handles) override { + return db_->EnableAutoCompaction(column_family_handles); + } + using DB::NumberLevels; virtual int NumberLevels(ColumnFamilyHandle* column_family) override { return db_->NumberLevels(column_family); @@ -274,9 +279,10 @@ class StackableDB : public DB { } using DB::SetOptions; - virtual Status SetOptions( - const std::unordered_map& new_options) override { - return db_->SetOptions(new_options); + virtual Status SetOptions(ColumnFamilyHandle* column_family_handle, + const std::unordered_map& + new_options) override { + return db_->SetOptions(column_family_handle, new_options); } using DB::GetPropertiesOfAllTables; diff --git a/utilities/transactions/transaction_db_impl.cc b/utilities/transactions/transaction_db_impl.cc index 42ded1576..f8a47b948 100644 --- a/utilities/transactions/transaction_db_impl.cc +++ b/utilities/transactions/transaction_db_impl.cc @@ -8,6 +8,7 @@ #include "utilities/transactions/transaction_db_impl.h" #include +#include #include #include "db/db_impl.h" @@ -77,28 +78,45 @@ Status TransactionDB::Open( DB* db; std::vector column_families_copy = column_families; + std::vector compaction_enabled_cf_indices; // Enable MemTable History if not already enabled - for (auto& column_family : column_families_copy) { - ColumnFamilyOptions* options = &column_family.options; + for (size_t i = 0; i < column_families_copy.size(); i++) { + ColumnFamilyOptions* options = &column_families_copy[i].options; if (options->max_write_buffer_number_to_maintain == 0) { // Setting to -1 will set the History size to max_write_buffer_number. options->max_write_buffer_number_to_maintain = -1; } + + if (!options->disable_auto_compactions) { + // Disable compactions momentarily to prevent race with DB::Open + options->disable_auto_compactions = true; + compaction_enabled_cf_indices.push_back(i); + } } - s = DB::Open(db_options, dbname, column_families, handles, &db); + s = DB::Open(db_options, dbname, column_families_copy, handles, &db); if (s.ok()) { TransactionDBImpl* txn_db = new TransactionDBImpl( db, TransactionDBImpl::ValidateTxnDBOptions(txn_db_options)); + *dbptr = txn_db; for (auto cf_ptr : *handles) { txn_db->AddColumnFamily(cf_ptr); } - *dbptr = txn_db; + // Re-enable compaction for the column families that initially had + // compaction enabled. + assert(column_families_copy.size() == (*handles).size()); + std::vector compaction_enabled_cf_handles; + compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); + for (auto index : compaction_enabled_cf_indices) { + compaction_enabled_cf_handles.push_back((*handles)[index]); + } + + s = txn_db->EnableAutoCompaction(compaction_enabled_cf_handles); } return s; diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index 2d8fb2044..0d6647ae5 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -7,7 +7,9 @@ #include +#include "db/db_impl.h" #include "rocksdb/db.h" +#include "rocksdb/options.h" #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "util/logging.h" @@ -2208,6 +2210,65 @@ TEST_F(TransactionTest, ClearSnapshotTest) { delete txn; } +TEST_F(TransactionTest, ToggleAutoCompactionTest) { + Status s; + + TransactionOptions txn_options; + ColumnFamilyHandle *cfa, *cfb; + ColumnFamilyOptions cf_options; + + // Create 2 new column families + s = db->CreateColumnFamily(cf_options, "CFA", &cfa); + ASSERT_OK(s); + s = db->CreateColumnFamily(cf_options, "CFB", &cfb); + ASSERT_OK(s); + + delete cfa; + delete cfb; + delete db; + + // open DB with three column families + std::vector column_families; + // have to open default column family + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, ColumnFamilyOptions())); + // open the new column families + column_families.push_back( + ColumnFamilyDescriptor("CFA", ColumnFamilyOptions())); + column_families.push_back( + ColumnFamilyDescriptor("CFB", ColumnFamilyOptions())); + + ColumnFamilyOptions* cf_opt_default = &column_families[0].options; + ColumnFamilyOptions* cf_opt_cfa = &column_families[1].options; + ColumnFamilyOptions* cf_opt_cfb = &column_families[2].options; + cf_opt_default->disable_auto_compactions = false; + cf_opt_cfa->disable_auto_compactions = true; + cf_opt_cfb->disable_auto_compactions = false; + + std::vector handles; + + s = TransactionDB::Open(options, txn_db_options, dbname, column_families, + &handles, &db); + ASSERT_OK(s); + + auto cfh_default = reinterpret_cast(handles[0]); + auto opt_default = *cfh_default->cfd()->GetLatestMutableCFOptions(); + + auto cfh_a = reinterpret_cast(handles[1]); + auto opt_a = *cfh_a->cfd()->GetLatestMutableCFOptions(); + + auto cfh_b = reinterpret_cast(handles[2]); + auto opt_b = *cfh_b->cfd()->GetLatestMutableCFOptions(); + + ASSERT_EQ(opt_default.disable_auto_compactions, false); + ASSERT_EQ(opt_a.disable_auto_compactions, true); + ASSERT_EQ(opt_b.disable_auto_compactions, false); + + for (auto handle : handles) { + delete handle; + } +} + } // namespace rocksdb int main(int argc, char** argv) {