diff --git a/include/rocksdb/utilities/ldb_cmd.h b/include/rocksdb/utilities/ldb_cmd.h index 4d91f0a66..907c9daf2 100644 --- a/include/rocksdb/utilities/ldb_cmd.h +++ b/include/rocksdb/utilities/ldb_cmd.h @@ -210,12 +210,20 @@ class LDBCommand { bool ParseStringOption(const std::map& options, const std::string& option, std::string* value); + /** + * Returns the value of the specified option as a boolean. + * default_val is used if the option is not found in options. + * Throws an exception if the value of the option is not + * "true" or "false" (case insensitive). + */ + bool ParseBooleanOption(const std::map& options, + const std::string& option, bool default_val); + Options options_; std::vector column_families_; LDBOptions ldb_options_; private: - friend class WALDumperCommand; /** * Interpret command line options and flags to determine if the key * should be input/output in hex. @@ -230,15 +238,6 @@ class LDBCommand { bool IsValueHex(const std::map& options, const std::vector& flags); - /** - * Returns the value of the specified option as a boolean. - * default_val is used if the option is not found in options. - * Throws an exception if the value of the option is not - * "true" or "false" (case insensitive). - */ - bool ParseBooleanOption(const std::map& options, - const std::string& option, bool default_val); - /** * Converts val to a boolean. * val must be either true or false (case insensitive). diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 075731259..41ed52cc1 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -242,6 +242,14 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) { } else if (parsed_params.cmd == RestoreCommand::Name()) { return new RestoreCommand(parsed_params.cmd_params, parsed_params.option_map, parsed_params.flags); + } else if (parsed_params.cmd == WriteExternalSstFilesCommand::Name()) { + return new WriteExternalSstFilesCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); + } else if (parsed_params.cmd == IngestExternalSstFilesCommand::Name()) { + return new IngestExternalSstFilesCommand(parsed_params.cmd_params, + parsed_params.option_map, + parsed_params.flags); } return nullptr; } @@ -2936,5 +2944,180 @@ void DBFileDumperCommand::DoCommand() { } } +void WriteExternalSstFilesCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(WriteExternalSstFilesCommand::Name()); + ret.append(" "); + ret.append("\n"); +} + +WriteExternalSstFilesCommand::WriteExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags) + : LDBCommand( + options, flags, false /* is_read_only */, + BuildCmdLineOptions({ARG_HEX, ARG_KEY_HEX, ARG_VALUE_HEX, ARG_FROM, + ARG_TO, ARG_CREATE_IF_MISSING})) { + create_if_missing_ = + IsFlagPresent(flags, ARG_CREATE_IF_MISSING) || + ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false); + if (params.size() != 1) { + exec_state_ = LDBCommandExecuteResult::Failed( + "output SST file path must be specified"); + } else { + output_sst_path_ = params.at(0); + } +} + +void WriteExternalSstFilesCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + ColumnFamilyHandle* cfh = GetCfHandle(); + SstFileWriter sst_file_writer(EnvOptions(), db_->GetOptions(), cfh); + Status status = sst_file_writer.Open(output_sst_path_); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed("failed to open SST file: " + + status.ToString()); + return; + } + + int bad_lines = 0; + std::string line; + std::ifstream ifs_stdin("/dev/stdin"); + std::istream* istream_p = ifs_stdin.is_open() ? &ifs_stdin : &std::cin; + while (getline(*istream_p, line, '\n')) { + std::string key; + std::string value; + if (ParseKeyValue(line, &key, &value, is_key_hex_, is_value_hex_)) { + status = sst_file_writer.Put(key, value); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "failed to write record to file: " + status.ToString()); + return; + } + } else if (0 == line.find("Keys in range:")) { + // ignore this line + } else if (0 == line.find("Created bg thread 0x")) { + // ignore this line + } else { + bad_lines++; + } + } + + status = sst_file_writer.Finish(); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "Failed to finish writing to file: " + status.ToString()); + return; + } + + if (bad_lines > 0) { + fprintf(stderr, "Warning: %d bad lines ignored.\n", bad_lines); + } + exec_state_ = LDBCommandExecuteResult::Succeed( + "external SST file written to " + output_sst_path_); +} + +Options WriteExternalSstFilesCommand::PrepareOptionsForOpenDB() { + Options opt = LDBCommand::PrepareOptionsForOpenDB(); + opt.create_if_missing = create_if_missing_; + return opt; +} + +const std::string IngestExternalSstFilesCommand::ARG_MOVE_FILES = "move_files"; +const std::string IngestExternalSstFilesCommand::ARG_SNAPSHOT_CONSISTENCY = + "snapshot_consistency"; +const std::string IngestExternalSstFilesCommand::ARG_ALLOW_GLOBAL_SEQNO = + "allow_global_seqno"; +const std::string IngestExternalSstFilesCommand::ARG_ALLOW_BLOCKING_FLUSH = + "allow_blocking_flush"; +const std::string IngestExternalSstFilesCommand::ARG_INGEST_BEHIND = + "ingest_behind"; + +void IngestExternalSstFilesCommand::Help(std::string& ret) { + ret.append(" "); + ret.append(IngestExternalSstFilesCommand::Name()); + ret.append(" "); + ret.append(" [--" + ARG_MOVE_FILES + "] "); + ret.append(" [--" + ARG_SNAPSHOT_CONSISTENCY + "] "); + ret.append(" [--" + ARG_ALLOW_GLOBAL_SEQNO + "] "); + ret.append(" [--" + ARG_ALLOW_BLOCKING_FLUSH + "] "); + ret.append(" [--" + ARG_INGEST_BEHIND + "] "); + ret.append("\n"); +} + +IngestExternalSstFilesCommand::IngestExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags) + : LDBCommand( + options, flags, false /* is_read_only */, + BuildCmdLineOptions({ARG_MOVE_FILES, ARG_SNAPSHOT_CONSISTENCY, + ARG_ALLOW_GLOBAL_SEQNO, ARG_CREATE_IF_MISSING, + ARG_ALLOW_BLOCKING_FLUSH, ARG_INGEST_BEHIND})), + move_files_(false), + snapshot_consistency_(true), + allow_global_seqno_(true), + allow_blocking_flush_(true), + ingest_behind_(false) { + create_if_missing_ = + IsFlagPresent(flags, ARG_CREATE_IF_MISSING) || + ParseBooleanOption(options, ARG_CREATE_IF_MISSING, false); + move_files_ = IsFlagPresent(flags, ARG_MOVE_FILES) || + ParseBooleanOption(options, ARG_MOVE_FILES, false); + snapshot_consistency_ = + IsFlagPresent(flags, ARG_SNAPSHOT_CONSISTENCY) || + ParseBooleanOption(options, ARG_SNAPSHOT_CONSISTENCY, true); + allow_global_seqno_ = + IsFlagPresent(flags, ARG_ALLOW_GLOBAL_SEQNO) || + ParseBooleanOption(options, ARG_ALLOW_GLOBAL_SEQNO, true); + allow_blocking_flush_ = + IsFlagPresent(flags, ARG_ALLOW_BLOCKING_FLUSH) || + ParseBooleanOption(options, ARG_ALLOW_BLOCKING_FLUSH, true); + ingest_behind_ = IsFlagPresent(flags, ARG_INGEST_BEHIND) || + ParseBooleanOption(options, ARG_INGEST_BEHIND, false); + + if (params.size() != 1) { + exec_state_ = + LDBCommandExecuteResult::Failed("input SST path must be specified"); + } else { + input_sst_path_ = params.at(0); + } +} + +void IngestExternalSstFilesCommand::DoCommand() { + if (!db_) { + assert(GetExecuteState().IsFailed()); + return; + } + if (GetExecuteState().IsFailed()) { + return; + } + ColumnFamilyHandle* cfh = GetCfHandle(); + IngestExternalFileOptions ifo; + ifo.move_files = move_files_; + ifo.snapshot_consistency = snapshot_consistency_; + ifo.allow_global_seqno = allow_global_seqno_; + ifo.allow_blocking_flush = allow_blocking_flush_; + ifo.ingest_behind = ingest_behind_; + Status status = db_->IngestExternalFile(cfh, {input_sst_path_}, ifo); + if (!status.ok()) { + exec_state_ = LDBCommandExecuteResult::Failed( + "failed to ingest external SST: " + status.ToString()); + } else { + exec_state_ = + LDBCommandExecuteResult::Succeed("external SST files ingested"); + } +} + +Options IngestExternalSstFilesCommand::PrepareOptionsForOpenDB() { + Options opt = LDBCommand::PrepareOptionsForOpenDB(); + opt.create_if_missing = create_if_missing_; + return opt; +} + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/tools/ldb_cmd_impl.h b/tools/ldb_cmd_impl.h index e49e851b3..6443e3cec 100644 --- a/tools/ldb_cmd_impl.h +++ b/tools/ldb_cmd_impl.h @@ -522,4 +522,55 @@ class RestoreCommand : public BackupableCommand { static void Help(std::string& ret); }; +class WriteExternalSstFilesCommand : public LDBCommand { + public: + static std::string Name() { return "write_extern_sst"; } + WriteExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags); + + virtual void DoCommand() override; + + virtual bool NoDBOpen() override { return false; } + + virtual Options PrepareOptionsForOpenDB() override; + + static void Help(std::string& ret); + + private: + std::string output_sst_path_; +}; + +class IngestExternalSstFilesCommand : public LDBCommand { + public: + static std::string Name() { return "ingest_extern_sst"; } + IngestExternalSstFilesCommand( + const std::vector& params, + const std::map& options, + const std::vector& flags); + + virtual void DoCommand() override; + + virtual bool NoDBOpen() override { return false; } + + virtual Options PrepareOptionsForOpenDB() override; + + static void Help(std::string& ret); + + private: + std::string input_sst_path_; + bool move_files_; + bool snapshot_consistency_; + bool allow_global_seqno_; + bool allow_blocking_flush_; + bool ingest_behind_; + + static const std::string ARG_MOVE_FILES; + static const std::string ARG_SNAPSHOT_CONSISTENCY; + static const std::string ARG_ALLOW_GLOBAL_SEQNO; + static const std::string ARG_ALLOW_BLOCKING_FLUSH; + static const std::string ARG_INGEST_BEHIND; +}; + } // namespace rocksdb diff --git a/tools/ldb_test.py b/tools/ldb_test.py index fa0ded438..2200fb464 100644 --- a/tools/ldb_test.py +++ b/tools/ldb_test.py @@ -76,7 +76,7 @@ class LDBTestCase(unittest.TestCase): my_check_output("./ldb %s >/dev/null 2>&1 |grep -v \"Created bg \ thread\"" % params, shell=True) - except Exception, e: + except Exception: return self.fail( "Exception should have been raised for command with params: %s" % @@ -146,6 +146,14 @@ class LDBTestCase(unittest.TestCase): def loadDb(self, params, dumpFile): return 0 == run_err_null("cat %s | ./ldb load %s" % (dumpFile, params)) + def writeExternSst(self, params, inputDumpFile, outputSst): + return 0 == run_err_null("cat %s | ./ldb write_extern_sst %s %s" + % (inputDumpFile, outputSst, params)) + + def ingestExternSst(self, params, inputSst): + return 0 == run_err_null("./ldb ingest_extern_sst %s %s" + % (inputSst, params)) + def testStringBatchPut(self): print "Running testStringBatchPut..." self.assertRunOK("batchput x1 y1 --create_if_missing", "OK") @@ -547,5 +555,38 @@ class LDBTestCase(unittest.TestCase): # non-existing column family. self.assertRunFAIL("get cf3_1 --column_family=four") + def testIngestExternalSst(self): + print "Running testIngestExternalSst..." + + # Dump, load, write external sst and ingest it in another db + dbPath = os.path.join(self.TMP_DIR, "db1") + self.assertRunOK( + "batchput --db=%s --create_if_missing x1 y1 x2 y2 x3 y3 x4 y4" + % dbPath, + "OK") + self.assertRunOK("scan --db=%s" % dbPath, + "x1 : y1\nx2 : y2\nx3 : y3\nx4 : y4") + dumpFilePath = os.path.join(self.TMP_DIR, "dump1") + with open(dumpFilePath, 'w') as f: + f.write("x1 ==> y10\nx2 ==> y20\nx3 ==> y30\nx4 ==> y40") + externSstPath = os.path.join(self.TMP_DIR, "extern_data1.sst") + self.assertTrue(self.writeExternSst("--create_if_missing --db=%s" + % dbPath, + dumpFilePath, + externSstPath)) + # cannot ingest if allow_global_seqno is false + self.assertFalse( + self.ingestExternSst( + "--create_if_missing --allow_global_seqno=false --db=%s" + % dbPath, + externSstPath)) + self.assertTrue( + self.ingestExternSst( + "--create_if_missing --allow_global_seqno --db=%s" + % dbPath, + externSstPath)) + self.assertRunOKFull("scan --db=%s" % dbPath, + "x1 : y10\nx2 : y20\nx3 : y30\nx4 : y40") + if __name__ == "__main__": unittest.main() diff --git a/tools/ldb_tool.cc b/tools/ldb_tool.cc index b09076ecc..fe307eab7 100644 --- a/tools/ldb_tool.cc +++ b/tools/ldb_tool.cc @@ -88,6 +88,8 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options, BackupCommand::Help(ret); RestoreCommand::Help(ret); CheckPointCommand::Help(ret); + WriteExternalSstFilesCommand::Help(ret); + IngestExternalSstFilesCommand::Help(ret); fprintf(stderr, "%s\n", ret.c_str()); }