[CF] Column family support for LDB tool

Summary: Added list_column_family command and also updated dump_manifest

Test Plan: no

Reviewers: dhruba, haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16419
This commit is contained in:
Igor Canadi 2014-02-27 16:18:23 -08:00
parent 85b1b5e1b9
commit 492c9f71c6
4 changed files with 151 additions and 33 deletions

View File

@ -16,6 +16,7 @@
#include <map>
#include <set>
#include <climits>
#include <unordered_map>
#include <stdio.h>
#include "db/filename.h"
@ -2109,18 +2110,23 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
int count = 0;
// TODO(icanadi) works only for default column family currently
ColumnFamilyData* default_cfd = column_family_set_->GetDefault();
VersionSet::Builder builder(default_cfd, default_cfd->current());
std::unordered_map<uint32_t, std::string> comparators;
std::unordered_map<uint32_t, Builder*> builders;
// add default column family
VersionEdit default_cf_edit;
default_cf_edit.AddColumnFamily(default_column_family_name);
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &default_cf_edit);
builders.insert({0, new Builder(default_cfd, default_cfd->current())});
{
VersionSet::LogReporter reporter;
@ -2132,13 +2138,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.column_family_ == 0 && edit.has_comparator_ &&
edit.comparator_ != default_cfd->user_comparator()->Name()) {
s = Status::InvalidArgument(
default_cfd->user_comparator()->Name(),
"does not match existing comparator " + edit.comparator_);
}
if (!s.ok()) {
break;
}
// Write out each individual edit
@ -2148,13 +2149,58 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
}
count++;
if (s.ok() && edit.column_family_ == 0) {
builder.Apply(&edit);
bool cf_in_builders =
builders.find(edit.column_family_) != builders.end();
if (edit.has_comparator_) {
comparators.insert({edit.column_family_, edit.comparator_});
}
if (edit.is_column_family_add_) {
if (cf_in_builders) {
s = Status::Corruption(
"Manifest adding the same column family twice");
break;
}
ColumnFamilyData* new_cfd =
CreateColumnFamily(ColumnFamilyOptions(options), &edit);
builders.insert(
{edit.column_family_, new Builder(new_cfd, new_cfd->current())});
} else if (edit.is_column_family_drop_) {
if (!cf_in_builders) {
s = Status::Corruption(
"Manifest - dropping non-existing column family");
break;
}
auto builder_iter = builders.find(edit.column_family_);
delete builder_iter->second;
builders.erase(builder_iter);
comparators.erase(edit.column_family_);
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_);
assert(cfd != nullptr);
cfd->Unref();
delete cfd;
} else {
if (!cf_in_builders) {
s = Status::Corruption(
"Manifest record referencing unknown column family");
break;
}
auto cfd = column_family_set_->GetColumnFamily(edit.column_family_);
// this should never happen since cf_in_builders is true
assert(cfd != nullptr);
if (edit.has_log_number_) {
log_number = std::max(log_number, edit.log_number_);
have_log_number = true;
cfd->SetLogNumber(edit.log_number_);
}
// if it is not column family add or column family drop,
// then it's a file add/delete, which should be forwarded
// to builder
auto builder = builders.find(edit.column_family_);
assert(builder != builders.end());
builder->second->Apply(&edit);
}
if (edit.has_prev_log_number_) {
@ -2179,9 +2225,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
printf("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
printf("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
printf("no last-sequence-number entry in descriptor");
s = Status::Corruption("no last-sequence-number entry in descriptor");
@ -2190,28 +2233,41 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) {
Version* v = new Version(column_family_set_->GetDefault(), this, 0);
builder.SaveTo(v);
for (auto cfd : *column_family_set_) {
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto builder = builders_iter->second;
Version* v = new Version(cfd, this, current_version_number_++);
builder->SaveTo(v);
delete builder;
printf("--------------- Column family \"%s\" (ID %u) --------------\n",
cfd->GetName().c_str(), (unsigned int)cfd->GetID());
printf("log number: %lu\n", (unsigned long)cfd->GetLogNumber());
auto comparator = comparators.find(cfd->GetID());
if (comparator != comparators.end()) {
printf("comparator: %s\n", comparator->second.c_str());
} else {
printf("comparator: <NO COMPARATOR>\n");
}
printf("%s \n", v->DebugString(hex).c_str());
delete v;
}
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
prev_log_number_ = prev_log_number;
printf("manifest_file_number %lu next_file_number %lu last_sequence "
"%lu log_number %lu prev_log_number %lu\n",
(unsigned long)manifest_file_number_,
(unsigned long)next_file_number_,
(unsigned long)last_sequence,
(unsigned long)log_number,
(unsigned long)prev_log_number);
printf("%s \n", v->DebugString(hex).c_str());
printf(
"manifest_file_number %lu next_file_number %lu last_sequence "
"%lu prev_log_number %lu\n",
(unsigned long)manifest_file_number_, (unsigned long)next_file_number_,
(unsigned long)last_sequence, (unsigned long)prev_log_number);
}
return s;

View File

@ -153,6 +153,8 @@ LDBCommand* LDBCommand::SelectCommand(
return new DBLoaderCommand(cmdParams, option_map, flags);
} else if (cmd == ManifestDumpCommand::Name()) {
return new ManifestDumpCommand(cmdParams, option_map, flags);
} else if (cmd == ListColumnFamiliesCommand::Name()) {
return new ListColumnFamiliesCommand(cmdParams, option_map, flags);
} else if (cmd == InternalDumpCommand::Name()) {
return new InternalDumpCommand(cmdParams, option_map, flags);
}
@ -555,6 +557,48 @@ void ManifestDumpCommand::DoCommand() {
// ----------------------------------------------------------------------------
void ListColumnFamiliesCommand::Help(string& ret) {
ret.append(" ");
ret.append(ListColumnFamiliesCommand::Name());
ret.append(" full_path_to_db_directory ");
ret.append("\n");
}
ListColumnFamiliesCommand::ListColumnFamiliesCommand(
const vector<string>& params, const map<string, string>& options,
const vector<string>& flags)
: LDBCommand(options, flags, false, {}) {
if (params.size() != 1) {
exec_state_ = LDBCommandExecuteResult::FAILED(
"dbname must be specified for the list_column_families command");
} else {
dbname_ = params[0];
}
}
void ListColumnFamiliesCommand::DoCommand() {
vector<string> column_families;
Status s = DB::ListColumnFamilies(DBOptions(), dbname_, &column_families);
if (!s.ok()) {
printf("Error in processing db %s %s\n", dbname_.c_str(),
s.ToString().c_str());
} else {
printf("Column families in %s: \n{", dbname_.c_str());
bool first = true;
for (auto cf : column_families) {
if (!first) {
printf(", ");
}
first = false;
printf("%s", cf.c_str());
}
printf("}\n");
}
}
// ----------------------------------------------------------------------------
string ReadableTime(int unixtime) {
char time_buffer [80];
time_t rawtime = unixtime;

View File

@ -484,6 +484,23 @@ private:
static const string ARG_PATH;
};
class ListColumnFamiliesCommand : public LDBCommand {
public:
static string Name() { return "list_column_families"; }
ListColumnFamiliesCommand(const vector<string>& params,
const map<string, string>& options,
const vector<string>& flags);
static void Help(string& ret);
virtual void DoCommand();
virtual bool NoDBOpen() { return true; }
private:
string dbname_;
};
class ReduceDBLevelsCommand : public LDBCommand {
public:
static string Name() { return "reduce_levels"; }

View File

@ -63,6 +63,7 @@ public:
DBDumperCommand::Help(ret);
DBLoaderCommand::Help(ret);
ManifestDumpCommand::Help(ret);
ListColumnFamiliesCommand::Help(ret);
InternalDumpCommand::Help(ret);
fprintf(stderr, "%s\n", ret.c_str());