Add SST ingestion to ldb (#4205)

Summary:
We add two subcommands `write_extern_sst` and `ingest_extern_sst` to ldb. This PR avoids changing existing code because we hope to cherry-pick to earlier releases to support compatibility check for external SST file ingestion.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4205

Differential Revision: D9112711

Pulled By: riversand963

fbshipit-source-id: 7cae88380d4de86da8440230e87eca66755648e4
This commit is contained in:
Yanqin Jin 2018-08-09 14:18:59 -07:00
parent b6c49fc5b4
commit 940641c3cc
5 changed files with 287 additions and 11 deletions

View File

@ -210,12 +210,20 @@ class LDBCommand {
bool ParseStringOption(const std::map<std::string, std::string>& options,
const std::string& option, std::string* value);
/**
* Returns the value of the specified option as a boolean.
* default_val is used if the option is not found in options.
* Throws an exception if the value of the option is not
* "true" or "false" (case insensitive).
*/
bool ParseBooleanOption(const std::map<std::string, std::string>& options,
const std::string& option, bool default_val);
Options options_;
std::vector<ColumnFamilyDescriptor> column_families_;
LDBOptions ldb_options_;
private:
friend class WALDumperCommand;
/**
* Interpret command line options and flags to determine if the key
* should be input/output in hex.
@ -230,15 +238,6 @@ class LDBCommand {
bool IsValueHex(const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags);
/**
* Returns the value of the specified option as a boolean.
* default_val is used if the option is not found in options.
* Throws an exception if the value of the option is not
* "true" or "false" (case insensitive).
*/
bool ParseBooleanOption(const std::map<std::string, std::string>& options,
const std::string& option, bool default_val);
/**
* Converts val to a boolean.
* val must be either true or false (case insensitive).

View File

@ -242,6 +242,14 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) {
} else if (parsed_params.cmd == RestoreCommand::Name()) {
return new RestoreCommand(parsed_params.cmd_params,
parsed_params.option_map, parsed_params.flags);
} else if (parsed_params.cmd == WriteExternalSstFilesCommand::Name()) {
return new WriteExternalSstFilesCommand(parsed_params.cmd_params,
parsed_params.option_map,
parsed_params.flags);
} else if (parsed_params.cmd == IngestExternalSstFilesCommand::Name()) {
return new IngestExternalSstFilesCommand(parsed_params.cmd_params,
parsed_params.option_map,
parsed_params.flags);
}
return nullptr;
}
@ -2936,5 +2944,180 @@ void DBFileDumperCommand::DoCommand() {
}
}
void WriteExternalSstFilesCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(WriteExternalSstFilesCommand::Name());
ret.append(" <output_sst_path>");
ret.append("\n");
}
WriteExternalSstFilesCommand::WriteExternalSstFilesCommand(
const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(
options, flags, false /* is_read_only */,
BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX, ARG_FROM,
ARG_TO, ARG_CREATE_IF_MISSING})) {
create_if_missing_ =
IsFlagPresent(flags, ARG_CREATE_IF_MISSING) ||
ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false);
if (params.size() != 1) {
exec_state_ = LDBCommandExecuteResult::Failed(
"output SST file path must be specified");
} else {
output_sst_path_ = params.at(0);
}
}
void WriteExternalSstFilesCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
ColumnFamilyHandle* cfh = GetCfHandle();
SstFileWriter sst_file_writer(EnvOptions(), db_->GetOptions(), cfh);
Status status = sst_file_writer.Open(output_sst_path_);
if (!status.ok()) {
exec_state_ = LDBCommandExecuteResult::Failed("failed to open SST file: " +
status.ToString());
return;
}
int bad_lines = 0;
std::string line;
std::ifstream ifs_stdin("/dev/stdin");
std::istream* istream_p = ifs_stdin.is_open() ? &ifs_stdin : &std::cin;
while (getline(*istream_p, line, '\n')) {
std::string key;
std::string value;
if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) {
status = sst_file_writer.Put(key, value);
if (!status.ok()) {
exec_state_ = LDBCommandExecuteResult::Failed(
"failed to write record to file: " + status.ToString());
return;
}
} else if (0 == line.find("Keys in range:")) {
// ignore this line
} else if (0 == line.find("Created bg thread 0x")) {
// ignore this line
} else {
bad_lines++;
}
}
status = sst_file_writer.Finish();
if (!status.ok()) {
exec_state_ = LDBCommandExecuteResult::Failed(
"Failed to finish writing to file: " + status.ToString());
return;
}
if (bad_lines > 0) {
fprintf(stderr, "Warning: %d bad lines ignored.\n", bad_lines);
}
exec_state_ = LDBCommandExecuteResult::Succeed(
"external SST file written to " + output_sst_path_);
}
Options WriteExternalSstFilesCommand::PrepareOptionsForOpenDB() {
Options opt = LDBCommand::PrepareOptionsForOpenDB();
opt.create_if_missing = create_if_missing_;
return opt;
}
const std::string IngestExternalSstFilesCommand::ARG_MOVE_FILES = "move_files";
const std::string IngestExternalSstFilesCommand::ARG_SNAPSHOT_CONSISTENCY =
"snapshot_consistency";
const std::string IngestExternalSstFilesCommand::ARG_ALLOW_GLOBAL_SEQNO =
"allow_global_seqno";
const std::string IngestExternalSstFilesCommand::ARG_ALLOW_BLOCKING_FLUSH =
"allow_blocking_flush";
const std::string IngestExternalSstFilesCommand::ARG_INGEST_BEHIND =
"ingest_behind";
void IngestExternalSstFilesCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(IngestExternalSstFilesCommand::Name());
ret.append(" <input_sst_path>");
ret.append(" [--" + ARG_MOVE_FILES + "] ");
ret.append(" [--" + ARG_SNAPSHOT_CONSISTENCY + "] ");
ret.append(" [--" + ARG_ALLOW_GLOBAL_SEQNO + "] ");
ret.append(" [--" + ARG_ALLOW_BLOCKING_FLUSH + "] ");
ret.append(" [--" + ARG_INGEST_BEHIND + "] ");
ret.append("\n");
}
IngestExternalSstFilesCommand::IngestExternalSstFilesCommand(
const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(
options, flags, false /* is_read_only */,
BuildCmdLineOptions({ARG_MOVE_FILES, ARG_SNAPSHOT_CONSISTENCY,
ARG_ALLOW_GLOBAL_SEQNO, ARG_CREATE_IF_MISSING,
ARG_ALLOW_BLOCKING_FLUSH, ARG_INGEST_BEHIND})),
move_files_(false),
snapshot_consistency_(true),
allow_global_seqno_(true),
allow_blocking_flush_(true),
ingest_behind_(false) {
create_if_missing_ =
IsFlagPresent(flags, ARG_CREATE_IF_MISSING) ||
ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false);
move_files_ = IsFlagPresent(flags, ARG_MOVE_FILES) ||
ParseBooleanOption(options, ARG_MOVE_FILES, false);
snapshot_consistency_ =
IsFlagPresent(flags, ARG_SNAPSHOT_CONSISTENCY) ||
ParseBooleanOption(options, ARG_SNAPSHOT_CONSISTENCY, true);
allow_global_seqno_ =
IsFlagPresent(flags, ARG_ALLOW_GLOBAL_SEQNO) ||
ParseBooleanOption(options, ARG_ALLOW_GLOBAL_SEQNO, true);
allow_blocking_flush_ =
IsFlagPresent(flags, ARG_ALLOW_BLOCKING_FLUSH) ||
ParseBooleanOption(options, ARG_ALLOW_BLOCKING_FLUSH, true);
ingest_behind_ = IsFlagPresent(flags, ARG_INGEST_BEHIND) ||
ParseBooleanOption(options, ARG_INGEST_BEHIND, false);
if (params.size() != 1) {
exec_state_ =
LDBCommandExecuteResult::Failed("input SST path must be specified");
} else {
input_sst_path_ = params.at(0);
}
}
void IngestExternalSstFilesCommand::DoCommand() {
if (!db_) {
assert(GetExecuteState().IsFailed());
return;
}
if (GetExecuteState().IsFailed()) {
return;
}
ColumnFamilyHandle* cfh = GetCfHandle();
IngestExternalFileOptions ifo;
ifo.move_files = move_files_;
ifo.snapshot_consistency = snapshot_consistency_;
ifo.allow_global_seqno = allow_global_seqno_;
ifo.allow_blocking_flush = allow_blocking_flush_;
ifo.ingest_behind = ingest_behind_;
Status status = db_->IngestExternalFile(cfh, {input_sst_path_}, ifo);
if (!status.ok()) {
exec_state_ = LDBCommandExecuteResult::Failed(
"failed to ingest external SST: " + status.ToString());
} else {
exec_state_ =
LDBCommandExecuteResult::Succeed("external SST files ingested");
}
}
Options IngestExternalSstFilesCommand::PrepareOptionsForOpenDB() {
Options opt = LDBCommand::PrepareOptionsForOpenDB();
opt.create_if_missing = create_if_missing_;
return opt;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -522,4 +522,55 @@ class RestoreCommand : public BackupableCommand {
static void Help(std::string& ret);
};
class WriteExternalSstFilesCommand : public LDBCommand {
public:
static std::string Name() { return "write_extern_sst"; }
WriteExternalSstFilesCommand(
const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags);
virtual void DoCommand() override;
virtual bool NoDBOpen() override { return false; }
virtual Options PrepareOptionsForOpenDB() override;
static void Help(std::string& ret);
private:
std::string output_sst_path_;
};
class IngestExternalSstFilesCommand : public LDBCommand {
public:
static std::string Name() { return "ingest_extern_sst"; }
IngestExternalSstFilesCommand(
const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags);
virtual void DoCommand() override;
virtual bool NoDBOpen() override { return false; }
virtual Options PrepareOptionsForOpenDB() override;
static void Help(std::string& ret);
private:
std::string input_sst_path_;
bool move_files_;
bool snapshot_consistency_;
bool allow_global_seqno_;
bool allow_blocking_flush_;
bool ingest_behind_;
static const std::string ARG_MOVE_FILES;
static const std::string ARG_SNAPSHOT_CONSISTENCY;
static const std::string ARG_ALLOW_GLOBAL_SEQNO;
static const std::string ARG_ALLOW_BLOCKING_FLUSH;
static const std::string ARG_INGEST_BEHIND;
};
} // namespace rocksdb

View File

@ -76,7 +76,7 @@ class LDBTestCase(unittest.TestCase):
my_check_output("./ldb %s >/dev/null 2>&1 |grep -v \"Created bg \
thread\"" % params, shell=True)
except Exception, e:
except Exception:
return
self.fail(
"Exception should have been raised for command with params: %s" %
@ -146,6 +146,14 @@ class LDBTestCase(unittest.TestCase):
def loadDb(self, params, dumpFile):
return 0 == run_err_null("cat %s | ./ldb load %s" % (dumpFile, params))
def writeExternSst(self, params, inputDumpFile, outputSst):
return 0 == run_err_null("cat %s | ./ldb write_extern_sst %s %s"
% (inputDumpFile, outputSst, params))
def ingestExternSst(self, params, inputSst):
return 0 == run_err_null("./ldb ingest_extern_sst %s %s"
% (inputSst, params))
def testStringBatchPut(self):
print "Running testStringBatchPut..."
self.assertRunOK("batchput x1 y1 --create_if_missing", "OK")
@ -547,5 +555,38 @@ class LDBTestCase(unittest.TestCase):
# non-existing column family.
self.assertRunFAIL("get cf3_1 --column_family=four")
def testIngestExternalSst(self):
print "Running testIngestExternalSst..."
# Dump, load, write external sst and ingest it in another db
dbPath = os.path.join(self.TMP_DIR, "db1")
self.assertRunOK(
"batchput --db=%s --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4"
% dbPath,
"OK")
self.assertRunOK("scan --db=%s" % dbPath,
"x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4")
dumpFilePath = os.path.join(self.TMP_DIR, "dump1")
with open(dumpFilePath, 'w') as f:
f.write("x1 ==> y10\nx2 ==> y20\nx3 ==> y30\nx4 ==> y40")
externSstPath = os.path.join(self.TMP_DIR, "extern_data1.sst")
self.assertTrue(self.writeExternSst("--create_if_missing --db=%s"
% dbPath,
dumpFilePath,
externSstPath))
# cannot ingest if allow_global_seqno is false
self.assertFalse(
self.ingestExternSst(
"--create_if_missing --allow_global_seqno=false --db=%s"
% dbPath,
externSstPath))
self.assertTrue(
self.ingestExternSst(
"--create_if_missing --allow_global_seqno --db=%s"
% dbPath,
externSstPath))
self.assertRunOKFull("scan --db=%s" % dbPath,
"x1 : y10\nx2 : y20\nx3 : y30\nx4 : y40")
if __name__ == "__main__":
unittest.main()

View File

@ -88,6 +88,8 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
BackupCommand::Help(ret);
RestoreCommand::Help(ret);
CheckPointCommand::Help(ret);
WriteExternalSstFilesCommand::Help(ret);
IngestExternalSstFilesCommand::Help(ret);
fprintf(stderr, "%s\n", ret.c_str());
}