diff --git a/HISTORY.md b/HISTORY.md index 0ced2e137..48cf931ca 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ ### New Features * Add avoid_flush_during_recovery option. * Add a read option background_purge_on_iterator_cleanup to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread. +* RepairDB support for column families. RepairDB now associates data with non-default column families using information embedded in the SST/WAL files (4.7 or later). For data written by 4.6 or earlier, RepairDB associates it with the default column family. ## 4.9.0 (6/9/2016) ### Public API changes diff --git a/db/repair.cc b/db/repair.cc index baedb5320..f676fa4d9 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -154,6 +154,14 @@ class Repairer { status = vset_.Recover({{kDefaultColumnFamilyName, cf_options_}}, false); } if (status.ok()) { + // Need to scan existing SST files first so the column families are + // created before we process WAL files + ExtractMetaData(); + + // ExtractMetaData() uses table_fds_ to know which SST files' metadata to + // extract -- we need to clear it here since metadata for existing SST + // files has been extracted already + table_fds_.clear(); ConvertLogFilesToTables(); ExtractMetaData(); status = AddTables(); @@ -177,6 +185,8 @@ class Repairer { private: struct TableInfo { FileMetaData meta; + uint32_t column_family_id; + std::string column_family_name; SequenceNumber min_sequence; SequenceNumber max_sequence; }; @@ -294,16 +304,17 @@ class Repairer { log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter, true /*enable checksum*/, 0 /*initial_offset*/, log); + // Initialize per-column family memtables + for (auto* cfd : *vset_.GetColumnFamilySet()) { + cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + } + auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet()); + // Read all the records and add to a memtable std::string scratch; Slice record; WriteBatch batch; - WriteBuffer wb(options_.db_write_buffer_size); - MemTable* mem = - new MemTable(icmp_, ioptions_, MutableCFOptions(options_, ioptions_), - &wb, kMaxSequenceNumber); - auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem); - mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { if (record.size() < WriteBatchInternal::kHeader) { @@ -312,7 +323,7 @@ class Repairer { continue; } WriteBatchInternal::SetContents(&batch, record); - status = WriteBatchInternal::InsertInto(&batch, cf_mems_default, nullptr); + status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr); if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { @@ -323,36 +334,40 @@ class Repairer { } } - // Do not record a version edit for this conversion to a Table - // since ExtractMetaData() will also generate edits. - FileMetaData meta; - meta.fd = FileDescriptor(next_file_number_++, 0, 0); - { + // Dump a table for each column family with entries in this log file. + for (auto* cfd : *vset_.GetColumnFamilySet()) { + // Do not record a version edit for this conversion to a Table + // since ExtractMetaData() will also generate edits. + MemTable* mem = cfd->mem(); + if (mem->IsEmpty()) { + continue; + } + + FileMetaData meta; + meta.fd = FileDescriptor(next_file_number_++, 0, 0); ReadOptions ro; ro.total_order_seek = true; Arena arena; ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); - MutableCFOptions mutable_cf_options(options_, ioptions_); status = BuildTable( - dbname_, env_, ioptions_, mutable_cf_options, env_options_, - table_cache_, iter.get(), &meta, icmp_, - &int_tbl_prop_collector_factories_, - TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, - std::string() /* column_family_name */, {}, kMaxSequenceNumber, - kNoCompression, CompressionOptions(), false, - nullptr /* internal_stats */, TableFileCreationReason::kRecovery); - } - delete mem->Unref(); - delete cf_mems_default; - mem = nullptr; - if (status.ok()) { - if (meta.fd.GetFileSize() > 0) { - table_fds_.push_back(meta.fd); + dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), + env_options_, table_cache_, iter.get(), &meta, + cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), + cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression, + CompressionOptions(), false, nullptr /* internal_stats */, + TableFileCreationReason::kRecovery); + Log(InfoLogLevel::INFO_LEVEL, options_.info_log, + "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, + counter, meta.fd.GetNumber(), status.ToString().c_str()); + if (status.ok()) { + if (meta.fd.GetFileSize() > 0) { + table_fds_.push_back(meta.fd); + } + } else { + break; } } - Log(InfoLogLevel::INFO_LEVEL, options_.info_log, - "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", - log, counter, meta.fd.GetNumber(), status.ToString().c_str()); + delete cf_mems; return status; } @@ -385,9 +400,45 @@ class Repairer { Status status = env_->GetFileSize(fname, &file_size); t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(), file_size); + std::shared_ptr props; + if (status.ok()) { + status = table_cache_->GetTableProperties(env_options_, icmp_, t->meta.fd, + &props); + } + if (status.ok()) { + t->column_family_id = static_cast(props->column_family_id); + if (t->column_family_id == + TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) { + Log(InfoLogLevel::WARN_LEVEL, options_.info_log, + "Table #%" PRIu64 + ": column family unknown (probably due to legacy format); " + "adding to default column family id 0.", + t->meta.fd.GetNumber()); + t->column_family_id = 0; + } + + if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) == + nullptr) { + status = + AddColumnFamily(props->column_family_name, t->column_family_id); + } + } + ColumnFamilyData* cfd; + if (status.ok()) { + cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id); + if (cfd->GetName() != props->column_family_name) { + Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, + "Table #%" PRIu64 + ": inconsistent column family name '%s'; expected '%s' for column " + "family id %" PRIu32 ".", + t->meta.fd.GetNumber(), props->column_family_name.c_str(), + cfd->GetName().c_str(), t->column_family_id); + status = Status::Corruption(dbname_, "inconsistent column family name"); + } + } if (status.ok()) { InternalIterator* iter = table_cache_->NewIterator( - ReadOptions(), env_options_, icmp_, t->meta.fd); + ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd); bool empty = true; ParsedInternalKey parsed; t->min_sequence = 0; @@ -395,9 +446,9 @@ class Repairer { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); if (!ParseInternalKey(key, &parsed)) { - Log(InfoLogLevel::ERROR_LEVEL, - options_.info_log, "Table #%" PRIu64 ": unparsable key %s", - t->meta.fd.GetNumber(), EscapeString(key).c_str()); + Log(InfoLogLevel::ERROR_LEVEL, options_.info_log, + "Table #%" PRIu64 ": unparsable key %s", t->meta.fd.GetNumber(), + EscapeString(key).c_str()); continue; } @@ -418,42 +469,51 @@ class Repairer { status = iter->status(); } delete iter; + + Log(InfoLogLevel::INFO_LEVEL, options_.info_log, + "Table #%" PRIu64 ": %d entries %s", t->meta.fd.GetNumber(), counter, + status.ToString().c_str()); } - Log(InfoLogLevel::INFO_LEVEL, - options_.info_log, "Table #%" PRIu64 ": %d entries %s", - t->meta.fd.GetNumber(), counter, status.ToString().c_str()); return status; } Status AddTables() { + std::unordered_map> cf_id_to_tables; SequenceNumber max_sequence = 0; for (size_t i = 0; i < tables_.size(); i++) { + cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]); if (max_sequence < tables_[i].max_sequence) { max_sequence = tables_[i].max_sequence; } } vset_.SetLastSequence(max_sequence); - auto* cfd = vset_.GetColumnFamilySet()->GetDefault(); - VersionEdit edit; - edit.SetComparatorName(cfd->user_comparator()->Name()); - edit.SetLogNumber(0); - edit.SetNextFile(next_file_number_); - edit.SetColumnFamily(cfd->GetID()); + for (const auto& cf_id_and_tables : cf_id_to_tables) { + auto* cfd = + vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first); + VersionEdit edit; + edit.SetComparatorName(cfd->user_comparator()->Name()); + edit.SetLogNumber(0); + edit.SetNextFile(next_file_number_); + edit.SetColumnFamily(cfd->GetID()); - // TODO(opt): separate out into multiple levels - for (const auto& table : tables_) { - edit.AddFile(0, table.meta.fd.GetNumber(), table.meta.fd.GetPathId(), - table.meta.fd.GetFileSize(), table.meta.smallest, - table.meta.largest, table.min_sequence, table.max_sequence, - table.meta.marked_for_compaction); + // TODO(opt): separate out into multiple levels + for (const auto* table : cf_id_and_tables.second) { + edit.AddFile(0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(), + table->meta.fd.GetFileSize(), table->meta.smallest, + table->meta.largest, table->min_sequence, + table->max_sequence, table->meta.marked_for_compaction); + } + mutex_.Lock(); + Status status = vset_.LogAndApply( + cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, + nullptr /* db_directory */, false /* new_descriptor_log */); + mutex_.Unlock(); + if (!status.ok()) { + return status; + } } - mutex_.Lock(); - Status status = vset_.LogAndApply( - cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, - nullptr /* db_directory */, false /* new_descriptor_log */); - mutex_.Unlock(); - return status; + return Status::OK(); } void ArchiveFile(const std::string& fname) { diff --git a/db/repair_test.cc b/db/repair_test.cc index f0c1d231d..86bda9053 100644 --- a/db/repair_test.cc +++ b/db/repair_test.cc @@ -12,6 +12,7 @@ #include "rocksdb/db.h" #include "rocksdb/transaction_log.h" #include "util/file_util.h" +#include "util/string_util.h" namespace rocksdb { @@ -169,6 +170,44 @@ TEST_F(RepairTest, UnflushedSst) { ASSERT_EQ(Get("key"), "val"); } +TEST_F(RepairTest, RepairMultipleColumnFamilies) { + // Verify repair logic associates SST files with their original column + // families. + const int kNumCfs = 3; + const int kEntriesPerCf = 2; + DestroyAndReopen(CurrentOptions()); + CreateAndReopenWithCF({"pikachu1", "pikachu2"}, CurrentOptions()); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + Put(i, "key" + ToString(j), "val" + ToString(j)); + if (j == kEntriesPerCf - 1 && i == kNumCfs - 1) { + // Leave one unflushed so we can verify WAL entries are properly + // associated with column families. + continue; + } + Flush(i); + } + } + + // Need to get path before Close() deletes db_, but delete it after Close() to + // ensure Close() doesn't re-create the manifest. + std::string manifest_path = + DescriptorFileName(dbname_, dbfull()->TEST_Current_Manifest_FileNo()); + Close(); + ASSERT_OK(env_->FileExists(manifest_path)); + ASSERT_OK(env_->DeleteFile(manifest_path)); + + ASSERT_OK(RepairDB(dbname_, CurrentOptions())); + + ReopenWithColumnFamilies({"default", "pikachu1", "pikachu2"}, + CurrentOptions()); + for (int i = 0; i < kNumCfs; ++i) { + for (int j = 0; j < kEntriesPerCf; ++j) { + ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j)); + } + } +} + } // namespace rocksdb int main(int argc, char** argv) {