Add bulk create/drop column family API

Summary:
Adding DB::CreateColumnFamilie() and DB::DropColumnFamilies() to bulk create/drop column families. This is to address the problem creating/dropping 1k column families takes minutes. The bottleneck is we persist options files for every single column family create/drop, and it parses the persisted options file for verification, which take a lot CPU time.

The new APIs simply create/drop column families individually, and persist options file once at the end. This improves create 1k column families to within ~0.1s. Further improvement can be merge manifest write to one IO.
Closes https://github.com/facebook/rocksdb/pull/2248

Differential Revision: D5001578

Pulled By: yiwu-arbug

fbshipit-source-id: d4e00bda671451e0b314c13e12ad194b1704aa03
This commit is contained in:
Yi Wu 2017-05-07 22:12:55 -07:00
parent dc0bbf78f7
commit ded1d5a1af
7 changed files with 264 additions and 69 deletions

View File

@ -1,6 +1,14 @@
# Rocksdb Change Log # Rocksdb Change Log
## 5.4.1 (04/28/2017) ## 5.4.4 (05/11/2017)
### Buf Fxies ### New Features
* Added DB::CreateColumnFamilie() and DB::DropColumnFamilies() to bulk create/drop column families.
## 5.4.3 (05/10/2017)
## 5.4.2 (05/08/2017)
## 5.4.1 (05/02/2017)
### Bug Fxies
* Fix WriteBatchWithIndex address use after scope error. * Fix WriteBatchWithIndex address use after scope error.
* Fix WritableFile buffer size in direct IO. * Fix WritableFile buffer size in direct IO.
* Add prefetch to PosixRandomAccessFile in buffered io. * Add prefetch to PosixRandomAccessFile in buffered io.

View File

@ -676,6 +676,38 @@ TEST_F(ColumnFamilyTest, AddDrop) {
std::vector<std::string>({"default", "four", "three"})); std::vector<std::string>({"default", "four", "three"}));
} }
TEST_F(ColumnFamilyTest, BulkAddDrop) {
constexpr int kNumCF = 1000;
ColumnFamilyOptions cf_options;
WriteOptions write_options;
Open();
std::vector<std::string> cf_names;
std::vector<ColumnFamilyHandle*> cf_handles;
for (int i = 1; i <= kNumCF; i++) {
cf_names.push_back("cf1-" + ToString(i));
}
ASSERT_OK(db_->CreateColumnFamilies(cf_options, cf_names, &cf_handles));
for (int i = 1; i <= kNumCF; i++) {
ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
}
ASSERT_OK(db_->DropColumnFamilies(cf_handles));
std::vector<ColumnFamilyDescriptor> cf_descriptors;
cf_handles.clear();
for (int i = 1; i <= kNumCF; i++) {
cf_descriptors.emplace_back("cf2-" + ToString(i), ColumnFamilyOptions());
}
ASSERT_OK(db_->CreateColumnFamilies(cf_descriptors, &cf_handles));
for (int i = 1; i <= kNumCF; i++) {
ASSERT_OK(db_->Put(write_options, cf_handles[i - 1], "foo", "bar"));
}
ASSERT_OK(db_->DropColumnFamilies(cf_handles));
Close();
std::vector<std::string> families;
ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families));
std::sort(families.begin(), families.end());
ASSERT_TRUE(families == std::vector<std::string>({"default"}));
}
TEST_F(ColumnFamilyTest, DropTest) { TEST_F(ColumnFamilyTest, DropTest) {
// first iteration - dont reopen DB before dropping // first iteration - dont reopen DB before dropping
// second iteration - reopen DB before dropping // second iteration - reopen DB before dropping

View File

@ -514,9 +514,8 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
delete old_sv; delete old_sv;
write_thread_.EnterUnbatched(&w, &mutex_); persist_options_status = WriteOptionsFile(
persist_options_status = WriteOptionsFile(); false /*need_mutex_lock*/, true /*need_enter_write_thread*/);
write_thread_.ExitUnbatched(&w);
} }
} }
@ -532,14 +531,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
"[%s] SetOptions() succeeded", cfd->GetName().c_str()); "[%s] SetOptions() succeeded", cfd->GetName().c_str());
new_options.Dump(immutable_db_options_.info_log.get()); new_options.Dump(immutable_db_options_.info_log.get());
if (!persist_options_status.ok()) { if (!persist_options_status.ok()) {
if (immutable_db_options_.fail_if_options_file_error) { s = persist_options_status;
s = Status::IOError(
"SetOptions() succeeded, but unable to persist options",
persist_options_status.ToString());
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to persist options in SetOptions() -- %s",
persist_options_status.ToString().c_str());
} }
} else { } else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed", ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] SetOptions() failed",
@ -591,7 +583,8 @@ Status DBImpl::SetDBOptions(
purge_wal_status.ToString().c_str()); purge_wal_status.ToString().c_str());
} }
} }
persist_options_status = WriteOptionsFile(); persist_options_status = WriteOptionsFile(
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
} }
} }
@ -1121,6 +1114,74 @@ std::vector<Status> DBImpl::MultiGet(
} }
Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family,
ColumnFamilyHandle** handle) {
assert(handle != nullptr);
Status s = CreateColumnFamilyImpl(cf_options, column_family, handle);
if (s.ok()) {
s = WriteOptionsFile(true /*need_mutex_lock*/,
true /*need_enter_write_thread*/);
}
return s;
}
Status DBImpl::CreateColumnFamilies(
const ColumnFamilyOptions& cf_options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
handles->clear();
size_t num_cf = column_family_names.size();
Status s;
bool success_once = false;
for (size_t i = 0; i < num_cf; i++) {
ColumnFamilyHandle* handle;
s = CreateColumnFamilyImpl(cf_options, column_family_names[i], &handle);
if (!s.ok()) {
break;
}
handles->push_back(handle);
success_once = true;
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
}
return s;
}
Status DBImpl::CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) {
assert(handles != nullptr);
handles->clear();
size_t num_cf = column_families.size();
Status s;
bool success_once = false;
for (size_t i = 0; i < num_cf; i++) {
ColumnFamilyHandle* handle;
s = CreateColumnFamilyImpl(column_families[i].options,
column_families[i].name, &handle);
if (!s.ok()) {
break;
}
handles->push_back(handle);
success_once = true;
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
}
return s;
}
Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& column_family_name, const std::string& column_family_name,
ColumnFamilyHandle** handle) { ColumnFamilyHandle** handle) {
Status s; Status s;
@ -1159,12 +1220,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit, s = versions_->LogAndApply(nullptr, MutableCFOptions(cf_options), &edit,
&mutex_, directories_.GetDbDir(), false, &mutex_, directories_.GetDbDir(), false,
&cf_options); &cf_options);
if (s.ok()) {
// If the column family was created successfully, we then persist
// the updated RocksDB options under the same single write thread
persist_options_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
} }
if (s.ok()) { if (s.ok()) {
@ -1194,22 +1249,42 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
if (s.ok()) { if (s.ok()) {
NewThreadStatusCfInfo( NewThreadStatusCfInfo(
reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd()); reinterpret_cast<ColumnFamilyHandleImpl*>(*handle)->cfd());
if (!persist_options_status.ok()) {
if (immutable_db_options_.fail_if_options_file_error) {
s = Status::IOError(
"ColumnFamily has been created, but unable to persist"
"options in CreateColumnFamily()",
persist_options_status.ToString().c_str());
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to persist options in CreateColumnFamily() -- %s",
persist_options_status.ToString().c_str());
}
} }
return s; return s;
} }
Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
assert(column_family != nullptr);
Status s = DropColumnFamilyImpl(column_family);
if (s.ok()) {
s = WriteOptionsFile(true /*need_mutex_lock*/,
true /*need_enter_write_thread*/);
}
return s;
}
Status DBImpl::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) {
Status s;
bool success_once = false;
for (auto* handle : column_families) {
s = DropColumnFamilyImpl(handle);
if (!s.ok()) {
break;
}
success_once = true;
}
if (success_once) {
Status persist_options_status = WriteOptionsFile(
true /*need_mutex_lock*/, true /*need_enter_write_thread*/);
if (s.ok() && !persist_options_status.ok()) {
s = persist_options_status;
}
}
return s;
}
Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
if (cfd->GetID() == 0) { if (cfd->GetID() == 0) {
@ -1223,7 +1298,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
edit.SetColumnFamily(cfd->GetID()); edit.SetColumnFamily(cfd->GetID());
Status s; Status s;
Status options_persist_status;
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
if (cfd->IsDropped()) { if (cfd->IsDropped()) {
@ -1235,11 +1309,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_); &edit, &mutex_);
if (s.ok()) {
// If the column family was dropped successfully, we then persist
// the updated RocksDB options under the same single write thread
options_persist_status = WriteOptionsFile();
}
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
} }
@ -1268,18 +1337,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
mutable_cf_options->max_write_buffer_number; mutable_cf_options->max_write_buffer_number;
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Dropped column family with id %u\n", cfd->GetID()); "Dropped column family with id %u\n", cfd->GetID());
if (!options_persist_status.ok()) {
if (immutable_db_options_.fail_if_options_file_error) {
s = Status::IOError(
"ColumnFamily has been dropped, but unable to persist "
"options in DropColumnFamily()",
options_persist_status.ToString().c_str());
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unable to persist options in DropColumnFamily() -- %s",
options_persist_status.ToString().c_str());
}
} else { } else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Dropping column family with id %u FAILED -- %s\n", "Dropping column family with id %u FAILED -- %s\n",
@ -2107,9 +2164,29 @@ Status DB::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
ColumnFamilyHandle** handle) { ColumnFamilyHandle** handle) {
return Status::NotSupported(""); return Status::NotSupported("");
} }
Status DB::CreateColumnFamilies(
const ColumnFamilyOptions& cf_options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) {
return Status::NotSupported("");
}
Status DB::CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) {
return Status::NotSupported("");
}
Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) { Status DB::DropColumnFamily(ColumnFamilyHandle* column_family) {
return Status::NotSupported(""); return Status::NotSupported("");
} }
Status DB::DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) {
return Status::NotSupported("");
}
Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) { Status DB::DestroyColumnFamilyHandle(ColumnFamilyHandle* column_family) {
delete column_family; delete column_family;
return Status::OK(); return Status::OK();
@ -2216,9 +2293,18 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
return result; return result;
} }
Status DBImpl::WriteOptionsFile() { Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
bool need_enter_write_thread) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
WriteThread::Writer w;
if (need_mutex_lock) {
mutex_.Lock();
} else {
mutex_.AssertHeld(); mutex_.AssertHeld();
}
if (need_enter_write_thread) {
write_thread_.EnterUnbatched(&w, &mutex_);
}
std::vector<std::string> cf_names; std::vector<std::string> cf_names;
std::vector<ColumnFamilyOptions> cf_opts; std::vector<ColumnFamilyOptions> cf_opts;
@ -2246,11 +2332,23 @@ Status DBImpl::WriteOptionsFile() {
if (s.ok()) { if (s.ok()) {
s = RenameTempFileToOptionsFile(file_name); s = RenameTempFileToOptionsFile(file_name);
} }
// restore lock
if (!need_mutex_lock) {
mutex_.Lock(); mutex_.Lock();
return s; }
#else if (need_enter_write_thread) {
return Status::OK(); write_thread_.ExitUnbatched(&w);
}
if (!s.ok()) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"Unnable to persist options -- %s", s.ToString().c_str());
if (immutable_db_options_.fail_if_options_file_error) {
return Status::IOError("Unable to persist options.",
s.ToString().c_str());
}
}
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
return Status::OK();
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

View File

@ -100,10 +100,19 @@ class DBImpl : public DB {
const std::vector<Slice>& keys, const std::vector<Slice>& keys,
std::vector<std::string>* values) override; std::vector<std::string>* values) override;
virtual Status CreateColumnFamily(const ColumnFamilyOptions& options, virtual Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
const std::string& column_family, const std::string& column_family,
ColumnFamilyHandle** handle) override; ColumnFamilyHandle** handle) override;
virtual Status CreateColumnFamilies(
const ColumnFamilyOptions& cf_options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) override;
virtual Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) override;
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override; virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
virtual Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) override;
// Returns false if key doesn't exist in the database and true if it may. // Returns false if key doesn't exist in the database and true if it may.
// If value_found is not passed in as null, then return the value if found in // If value_found is not passed in as null, then return the value if found in
@ -549,9 +558,10 @@ class DBImpl : public DB {
RangeDelAggregator* range_del_agg); RangeDelAggregator* range_del_agg);
// Except in DB::Open(), WriteOptionsFile can only be called when: // Except in DB::Open(), WriteOptionsFile can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used. // Persist options to options file.
// 2. db_mutex is held // If need_mutex_lock = false, the method will lock DB mutex.
Status WriteOptionsFile(); // If need_enter_write_thread = false, the method will enter write thread.
Status WriteOptionsFile(bool need_mutex_lock, bool need_enter_write_thread);
// The following two functions can only be called when: // The following two functions can only be called when:
// 1. WriteThread::Writer::EnterUnbatched() is used. // 1. WriteThread::Writer::EnterUnbatched() is used.
@ -634,6 +644,12 @@ class DBImpl : public DB {
const Status CreateArchivalDirectory(); const Status CreateArchivalDirectory();
Status CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
const std::string& cf_name,
ColumnFamilyHandle** handle);
Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
// Delete any unneeded files and stale in-memory entries. // Delete any unneeded files and stale in-memory entries.
void DeleteObsoleteFiles(); void DeleteObsoleteFiles();
// Delete obsolete files and log status and information of file deletion // Delete obsolete files and log status and information of file deletion

View File

@ -1032,7 +1032,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) { if (s.ok()) {
// Persist RocksDB Options before scheduling the compaction. // Persist RocksDB Options before scheduling the compaction.
// The WriteOptionsFile() will release and lock the mutex internally. // The WriteOptionsFile() will release and lock the mutex internally.
persist_options_status = impl->WriteOptionsFile(); persist_options_status = impl->WriteOptionsFile(
false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
*dbptr = impl; *dbptr = impl;
impl->opened_successfully_ = true; impl->opened_successfully_ = true;
@ -1065,15 +1066,10 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl); ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
LogFlush(impl->immutable_db_options_.info_log); LogFlush(impl->immutable_db_options_.info_log);
if (!persist_options_status.ok()) { if (!persist_options_status.ok()) {
if (db_options.fail_if_options_file_error) {
s = Status::IOError( s = Status::IOError(
"DB::Open() failed --- Unable to persist Options file", "DB::Open() failed --- Unable to persist Options file",
persist_options_status.ToString()); persist_options_status.ToString());
} }
ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
"Unable to persist options in DB::Open() -- %s",
persist_options_status.ToString().c_str());
}
} }
if (!s.ok()) { if (!s.ok()) {
for (auto* h : *handles) { for (auto* h : *handles) {

View File

@ -179,10 +179,37 @@ class DB {
const std::string& column_family_name, const std::string& column_family_name,
ColumnFamilyHandle** handle); ColumnFamilyHandle** handle);
// Bulk create column families with the same column family options.
// Return the handles of the column families through the argument handles.
// In case of error, the request may succeed partially, and handles will
// contain column family handles that it managed to create, and have size
// equal to the number of created column families.
virtual Status CreateColumnFamilies(
const ColumnFamilyOptions& options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles);
// Bulk create column families.
// Return the handles of the column families through the argument handles.
// In case of error, the request may succeed partially, and handles will
// contain column family handles that it managed to create, and have size
// equal to the number of created column families.
virtual Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles);
// Drop a column family specified by column_family handle. This call // Drop a column family specified by column_family handle. This call
// only records a drop record in the manifest and prevents the column // only records a drop record in the manifest and prevents the column
// family from flushing and compacting. // family from flushing and compacting.
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family); virtual Status DropColumnFamily(ColumnFamilyHandle* column_family);
// Bulk drop column families. This call only records drop records in the
// manifest and prevents the column families from flushing and compacting.
// In case of error, the request may succeed partially. User may call
// ListColumnFamilies to check the result.
virtual Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families);
// Close a column family specified by column_family handle and destroy // Close a column family specified by column_family handle and destroy
// the column family handle specified to avoid double deletion. This call // the column family handle specified to avoid double deletion. This call
// deletes the column family handle by default. Use this method to // deletes the column family handle by default. Use this method to

View File

@ -37,10 +37,28 @@ class StackableDB : public DB {
return db_->CreateColumnFamily(options, column_family_name, handle); return db_->CreateColumnFamily(options, column_family_name, handle);
} }
virtual Status CreateColumnFamilies(
const ColumnFamilyOptions& options,
const std::vector<std::string>& column_family_names,
std::vector<ColumnFamilyHandle*>* handles) override {
return db_->CreateColumnFamilies(options, column_family_names, handles);
}
virtual Status CreateColumnFamilies(
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles) override {
return db_->CreateColumnFamilies(column_families, handles);
}
virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override { virtual Status DropColumnFamily(ColumnFamilyHandle* column_family) override {
return db_->DropColumnFamily(column_family); return db_->DropColumnFamily(column_family);
} }
virtual Status DropColumnFamilies(
const std::vector<ColumnFamilyHandle*>& column_families) override {
return db_->DropColumnFamilies(column_families);
}
virtual Status DestroyColumnFamilyHandle( virtual Status DestroyColumnFamilyHandle(
ColumnFamilyHandle* column_family) override { ColumnFamilyHandle* column_family) override {
return db_->DestroyColumnFamilyHandle(column_family); return db_->DestroyColumnFamilyHandle(column_family);