From b3585a11b4d9745038dcd022dfdb0cd54b8d0d53 Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Thu, 11 Jun 2020 14:25:01 -0700 Subject: [PATCH] Ingest SST files with checksum information (#6891) Summary: Application can ingest SST files with file checksum information, such that during ingestion, DB is able to check data integrity and identify of the SST file. The PR introduces generate_and_verify_file_checksum to IngestExternalFileOption to control if the ingested checksum information should be verified with the generated checksum. 1. If generate_and_verify_file_checksum options is *FALSE*: *1)* if DB does not enable SST file checksum, the checksum information ingested will be ignored; *2)* if DB enables the SST file checksum and the checksum function name matches the checksum function name in DB, we trust the ingested checksum, store it in Manifest. If the checksum function name does not match, we treat that as an error and fail the IngestExternalFile() call. 2. If generate_and_verify_file_checksum options is *TRUE*: *1)* if DB does not enable SST file checksum, the checksum information ingested will be ignored; *2)* if DB enable the SST file checksum, we will use the checksum generator from DB to calculate the checksum for each ingested SST files after they are copied or moved. Then, compare the checksum results with the ingested checksum information: _A)_ if the checksum function name does not match, _verification always report true_ and we store the DB generated checksum information in Manifest. _B)_ if the checksum function name mach, and checksum match, ingestion continues and stores the checksum information in the Manifest. Otherwise, terminate file ingestion and report file corruption. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6891 Test Plan: added unit test, pass make asan_check Reviewed By: pdillinger Differential Revision: D21935988 Pulled By: zhichao-cao fbshipit-source-id: 7b55f486632db467e76d72602218d0658aa7f6ed --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 6 +- db/external_sst_file_basic_test.cc | 283 ++++++++++++++++++++++++++ db/external_sst_file_ingestion_job.cc | 167 ++++++++++++++- db/external_sst_file_ingestion_job.h | 11 + file/file_util.cc | 64 ++++++ file/file_util.h | 7 + include/rocksdb/db.h | 6 + include/rocksdb/options.h | 20 ++ 9 files changed, 558 insertions(+), 7 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 301c18618..3690fafb6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -29,6 +29,7 @@ * sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too. * Generate file checksum in SstFileWriter if Options.file_checksum_gen_factory is set. The checksum and checksum function name are stored in ExternalSstFileInfo after the sst file write is finished. * Add a value_size_soft_limit in read options which limits the cumulative value size of keys read in batches in MultiGet. Once the cumulative value size of found keys exceeds read_options.value_size_soft_limit, all the remaining keys are returned with status Abort without further finding their values. By default the value_size_soft_limit is std::numeric_limits::max(). +* Enable SST file ingestion with file checksum information when calling IngestExternalFiles(const std::vector& args). Added files_checksums and files_checksum_func_names to IngestExternalFileArg such that user can ingest the sst files with their file checksum information. Added verify_file_checksum to IngestExternalFileOptions (default is True). To be backward compatible, if DB does not enable file checksum or user does not provide checksum information (vectors of files_checksums and files_checksum_func_names are both empty), verification of file checksum is always sucessful. If DB enables file checksum, DB will always generate the checksum for each ingested SST file during Prepare stage of ingestion and store the checksum in Manifest, unless verify_file_checksum is False and checksum information is provided by the application. In this case, we only verify the checksum function name and directly store the ingested checksum in Manifest. If verify_file_checksum is set to True, DB will verify the ingested checksum and function name with the genrated ones. Any mismatch will fail the ingestion. Note that, if IngestExternalFileOptions::write_global_seqno is True, the seqno will be changed in the ingested file. Therefore, the checksum of the file will be changed. In this case, a new checksum will be generated after the seqno is updated and be stored in the Manifest. ### Performance Improvements * Eliminate redundant key comparisons during random access in block-based tables. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index c9d7aab3a..bf8185175 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -4180,7 +4180,8 @@ Status DBImpl::IngestExternalFiles( static_cast(args[i].column_family)->cfd(); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); exec_results[i].second = ingestion_jobs[i].Prepare( - args[i].external_files, start_file_number, super_version); + args[i].external_files, args[i].files_checksums, + args[i].files_checksum_func_names, start_file_number, super_version); exec_results[i].first = true; CleanupSuperVersion(super_version); } @@ -4191,7 +4192,8 @@ Status DBImpl::IngestExternalFiles( static_cast(args[0].column_family)->cfd(); SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); exec_results[0].second = ingestion_jobs[0].Prepare( - args[0].external_files, next_file_number, super_version); + args[0].external_files, args[0].files_checksums, + args[0].files_checksum_func_names, next_file_number, super_version); exec_results[0].first = true; CleanupSuperVersion(super_version); } diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 610faf5dd..7e62963ec 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -42,6 +42,29 @@ class ExternalSSTFileBasicTest return db_->IngestExternalFile(files, opts); } + Status AddFileWithFileChecksum( + const std::vector& files, + const std::vector& files_checksums, + const std::vector& files_checksum_func_names, + bool verify_file_checksum = true, bool move_files = false, + bool skip_snapshot_check = false, bool write_global_seqno = true) { + IngestExternalFileOptions opts; + opts.move_files = move_files; + opts.snapshot_consistency = !skip_snapshot_check; + opts.allow_global_seqno = false; + opts.allow_blocking_flush = false; + opts.write_global_seqno = write_global_seqno; + opts.verify_file_checksum = verify_file_checksum; + + IngestExternalFileArg arg; + arg.column_family = db_->DefaultColumnFamily(); + arg.external_files = files; + arg.options = opts; + arg.files_checksums = files_checksums; + arg.files_checksum_func_names = files_checksum_func_names; + return db_->IngestExternalFiles({arg}); + } + Status GenerateAndAddExternalFile( const Options options, std::vector keys, const std::vector& value_types, @@ -298,6 +321,266 @@ TEST_F(ExternalSSTFileBasicTest, BasicWithFileChecksumCrc32c) { DestroyAndRecreateExternalSSTFilesDir(); } +TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) { + Options old_options = CurrentOptions(); + Options options = CurrentOptions(); + options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory(); + const ImmutableCFOptions ioptions(options); + ChecksumVerifyHelper checksum_helper(options); + + SstFileWriter sst_file_writer(EnvOptions(), options); + + // file01.sst (1000 => 1099) + std::string file1 = sst_files_dir_ + "file01.sst"; + ASSERT_OK(sst_file_writer.Open(file1)); + for (int k = 1000; k < 1100; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file1_info; + Status s = sst_file_writer.Finish(&file1_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file1_info.file_path, file1); + ASSERT_EQ(file1_info.num_entries, 100); + ASSERT_EQ(file1_info.smallest_key, Key(1000)); + ASSERT_EQ(file1_info.largest_key, Key(1099)); + std::string file_checksum1, file_checksum_func_name1; + ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName( + file1, &file_checksum1, &file_checksum_func_name1)); + ASSERT_EQ(file1_info.file_checksum, file_checksum1); + ASSERT_EQ(file1_info.file_checksum_func_name, file_checksum_func_name1); + + // file02.sst (1100 => 1299) + std::string file2 = sst_files_dir_ + "file02.sst"; + ASSERT_OK(sst_file_writer.Open(file2)); + for (int k = 1100; k < 1300; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val")); + } + ExternalSstFileInfo file2_info; + s = sst_file_writer.Finish(&file2_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file2_info.file_path, file2); + ASSERT_EQ(file2_info.num_entries, 200); + ASSERT_EQ(file2_info.smallest_key, Key(1100)); + ASSERT_EQ(file2_info.largest_key, Key(1299)); + std::string file_checksum2, file_checksum_func_name2; + ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName( + file2, &file_checksum2, &file_checksum_func_name2)); + ASSERT_EQ(file2_info.file_checksum, file_checksum2); + ASSERT_EQ(file2_info.file_checksum_func_name, file_checksum_func_name2); + + // file03.sst (1300 => 1499) + std::string file3 = sst_files_dir_ + "file03.sst"; + ASSERT_OK(sst_file_writer.Open(file3)); + for (int k = 1300; k < 1500; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file3_info; + s = sst_file_writer.Finish(&file3_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file3_info.file_path, file3); + ASSERT_EQ(file3_info.num_entries, 200); + ASSERT_EQ(file3_info.smallest_key, Key(1300)); + ASSERT_EQ(file3_info.largest_key, Key(1499)); + std::string file_checksum3, file_checksum_func_name3; + ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName( + file3, &file_checksum3, &file_checksum_func_name3)); + ASSERT_EQ(file3_info.file_checksum, file_checksum3); + ASSERT_EQ(file3_info.file_checksum_func_name, file_checksum_func_name3); + + // file04.sst (1500 => 1799) + std::string file4 = sst_files_dir_ + "file04.sst"; + ASSERT_OK(sst_file_writer.Open(file4)); + for (int k = 1500; k < 1800; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file4_info; + s = sst_file_writer.Finish(&file4_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file4_info.file_path, file4); + ASSERT_EQ(file4_info.num_entries, 300); + ASSERT_EQ(file4_info.smallest_key, Key(1500)); + ASSERT_EQ(file4_info.largest_key, Key(1799)); + std::string file_checksum4, file_checksum_func_name4; + ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName( + file4, &file_checksum4, &file_checksum_func_name4)); + ASSERT_EQ(file4_info.file_checksum, file_checksum4); + ASSERT_EQ(file4_info.file_checksum_func_name, file_checksum_func_name4); + + // file05.sst (1800 => 1899) + std::string file5 = sst_files_dir_ + "file05.sst"; + ASSERT_OK(sst_file_writer.Open(file5)); + for (int k = 1800; k < 2000; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file5_info; + s = sst_file_writer.Finish(&file5_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file5_info.file_path, file5); + ASSERT_EQ(file5_info.num_entries, 200); + ASSERT_EQ(file5_info.smallest_key, Key(1800)); + ASSERT_EQ(file5_info.largest_key, Key(1999)); + std::string file_checksum5, file_checksum_func_name5; + ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName( + file5, &file_checksum5, &file_checksum_func_name5)); + ASSERT_EQ(file5_info.file_checksum, file_checksum5); + ASSERT_EQ(file5_info.file_checksum_func_name, file_checksum_func_name5); + + // file06.sst (2000 => 2199) + std::string file6 = sst_files_dir_ + "file06.sst"; + ASSERT_OK(sst_file_writer.Open(file6)); + for (int k = 2000; k < 2200; k++) { + ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val_overlap")); + } + ExternalSstFileInfo file6_info; + s = sst_file_writer.Finish(&file6_info); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_EQ(file6_info.file_path, file6); + ASSERT_EQ(file6_info.num_entries, 200); + ASSERT_EQ(file6_info.smallest_key, Key(2000)); + ASSERT_EQ(file6_info.largest_key, Key(2199)); + std::string file_checksum6, file_checksum_func_name6; + ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName( + file6, &file_checksum6, &file_checksum_func_name6)); + ASSERT_EQ(file6_info.file_checksum, file_checksum6); + ASSERT_EQ(file6_info.file_checksum_func_name, file_checksum_func_name6); + + s = AddFileWithFileChecksum({file1}, {file_checksum1, "xyz"}, + {file_checksum1}, true, false, false, false); + // does not care the checksum input since db does not enable file checksum + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file1)); + std::vector live_files; + dbfull()->GetLiveFilesMetaData(&live_files); + std::set set1; + for (auto f : live_files) { + set1.insert(f.name); + ASSERT_EQ(f.file_checksum, kUnknownFileChecksum); + ASSERT_EQ(f.file_checksum_func_name, kUnknownFileChecksumFuncName); + } + + // Reopen Db with checksum enabled + Reopen(options); + // Enable verify_file_checksum option + // The checksum vector does not match, fail the ingestion + s = AddFileWithFileChecksum({file2}, {file_checksum2, "xyz"}, + {file_checksum_func_name2}, true, false, false, + false); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Enable verify_file_checksum option + // The checksum name does not match, fail the ingestion + s = AddFileWithFileChecksum({file2}, {file_checksum2}, {"xyz"}, true, false, + false, false); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Enable verify_file_checksum option + // The checksum itself does not match, fail the ingestion + s = AddFileWithFileChecksum({file2}, {"xyz"}, {file_checksum_func_name2}, + true, false, false, false); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Enable verify_file_checksum option + // All matches, ingestion is successful + s = AddFileWithFileChecksum({file2}, {file_checksum2}, + {file_checksum_func_name2}, true, false, false, + false); + ASSERT_TRUE(s.ok()) << s.ToString(); + std::vector live_files1; + dbfull()->GetLiveFilesMetaData(&live_files1); + for (auto f : live_files1) { + if (set1.find(f.name) == set1.end()) { + ASSERT_EQ(f.file_checksum, file_checksum2); + ASSERT_EQ(f.file_checksum_func_name, file_checksum_func_name2); + set1.insert(f.name); + } + } + ASSERT_OK(env_->FileExists(file2)); + + // Enable verify_file_checksum option + // No checksum information is provided, generate it when ingesting + std::vector checksum, checksum_func; + s = AddFileWithFileChecksum({file3}, checksum, checksum_func, true, false, + false, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + std::vector live_files2; + dbfull()->GetLiveFilesMetaData(&live_files2); + for (auto f : live_files2) { + if (set1.find(f.name) == set1.end()) { + ASSERT_EQ(f.file_checksum, file_checksum3); + ASSERT_EQ(f.file_checksum_func_name, file_checksum_func_name3); + set1.insert(f.name); + } + } + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file3)); + + // Does not enable verify_file_checksum options + // The checksum name does not match, fail the ingestion + s = AddFileWithFileChecksum({file4}, {file_checksum4}, {"xyz"}, false, false, + false, false); + ASSERT_FALSE(s.ok()) << s.ToString(); + + // Does not enable verify_file_checksum options + // Checksum function name matches, store the checksum being ingested. + s = AddFileWithFileChecksum({file4}, {"asd"}, {file_checksum_func_name4}, + false, false, false, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + std::vector live_files3; + dbfull()->GetLiveFilesMetaData(&live_files3); + for (auto f : live_files3) { + if (set1.find(f.name) == set1.end()) { + ASSERT_FALSE(f.file_checksum == file_checksum4); + ASSERT_EQ(f.file_checksum, "asd"); + ASSERT_EQ(f.file_checksum_func_name, file_checksum_func_name4); + set1.insert(f.name); + } + } + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file4)); + + // enable verify_file_checksum options, DB enable checksum, and enable + // write_global_seq. So the checksum stored is different from the one + // ingested due to the sequence number changes. + s = AddFileWithFileChecksum({file5}, {file_checksum5}, + {file_checksum_func_name5}, true, false, false, + true); + ASSERT_OK(s); + ASSERT_TRUE(s.ok()) << s.ToString(); + std::vector live_files4; + dbfull()->GetLiveFilesMetaData(&live_files4); + for (auto f : live_files4) { + if (set1.find(f.name) == set1.end()) { + std::string cur_checksum5, cur_checksum_func_name5; + ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName( + dbname_ + f.name, &cur_checksum5, &cur_checksum_func_name5)); + ASSERT_EQ(f.file_checksum, cur_checksum5); + ASSERT_EQ(f.file_checksum_func_name, file_checksum_func_name5); + set1.insert(f.name); + } + } + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file5)); + + // Does not enable verify_file_checksum options and also the ingested file + // checksum information is empty. DB will generate and store the checksum + // in Manifest. + std::vector files_c6, files_name6; + s = AddFileWithFileChecksum({file6}, files_c6, files_name6, false, false, + false, false); + ASSERT_TRUE(s.ok()) << s.ToString(); + std::vector live_files6; + dbfull()->GetLiveFilesMetaData(&live_files6); + for (auto f : live_files6) { + if (set1.find(f.name) == set1.end()) { + ASSERT_EQ(f.file_checksum, file_checksum6); + ASSERT_EQ(f.file_checksum_func_name, file_checksum_func_name6); + set1.insert(f.name); + } + } + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_OK(env_->FileExists(file6)); +} + TEST_F(ExternalSSTFileBasicTest, NoCopy) { Options options = CurrentOptions(); const ImmutableCFOptions ioptions(options); diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 2c5c2804e..7ae2ca375 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -28,6 +28,8 @@ namespace ROCKSDB_NAMESPACE { Status ExternalSstFileIngestionJob::Prepare( const std::vector& external_files_paths, + const std::vector& files_checksums, + const std::vector& files_checksum_func_names, uint64_t next_file_number, SuperVersion* sv) { Status status; @@ -142,6 +144,9 @@ Status ExternalSstFileIngestionJob::Prepare( break; } f.internal_file_path = path_inside_db; + // Initialize the checksum information of ingested files. + f.file_checksum = kUnknownFileChecksum; + f.file_checksum_func_name = kUnknownFileChecksumFuncName; ingestion_path_ids.insert(f.fd.GetPathId()); } @@ -160,6 +165,128 @@ Status ExternalSstFileIngestionJob::Prepare( } TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir"); + // Generate and check the sst file checksum. Note that, if + // IngestExternalFileOptions::write_global_seqno is true, we will not update + // the checksum information in the files_to_ingests_ here, since the file is + // upadted with the new global_seqno. After global_seqno is updated, DB will + // generate the new checksum and store it in the Manifest. In all other cases + // if ingestion_options_.write_global_seqno == true and + // verify_file_checksum is false, we only check the checksum function name. + if (status.ok() && db_options_.file_checksum_gen_factory != nullptr) { + if (ingestion_options_.verify_file_checksum == false && + files_checksums.size() == files_to_ingest_.size() && + files_checksum_func_names.size() == files_to_ingest_.size()) { + // Only when verify_file_checksum == false and the checksum for ingested + // files are provided, DB will use the provided checksum and does not + // generate the checksum for ingested files. + need_generate_file_checksum_ = false; + } else { + need_generate_file_checksum_ = true; + } + FileChecksumGenContext gen_context; + std::unique_ptr file_checksum_gen = + db_options_.file_checksum_gen_factory->CreateFileChecksumGenerator( + gen_context); + std::vector generated_checksums; + std::vector generated_checksum_func_names; + // Step 1: generate the checksum for ingested sst file. + if (need_generate_file_checksum_) { + for (size_t i = 0; i < files_to_ingest_.size(); i++) { + std::string generated_checksum, generated_checksum_func_name; + IOStatus io_s = GenerateOneFileChecksum( + fs_, files_to_ingest_[i].internal_file_path, + db_options_.file_checksum_gen_factory.get(), &generated_checksum, + &generated_checksum_func_name, + ingestion_options_.verify_checksums_readahead_size, + db_options_.allow_mmap_reads); + if (!io_s.ok()) { + status = io_s; + ROCKS_LOG_WARN(db_options_.info_log, + "Sst file checksum generation of file: %s failed: %s", + files_to_ingest_[i].internal_file_path.c_str(), + status.ToString().c_str()); + break; + } + if (ingestion_options_.write_global_seqno == false) { + files_to_ingest_[i].file_checksum = generated_checksum; + files_to_ingest_[i].file_checksum_func_name = + generated_checksum_func_name; + } + generated_checksums.push_back(generated_checksum); + generated_checksum_func_names.push_back(generated_checksum_func_name); + } + } + + // Step 2: based on the verify_file_checksum and ingested checksum + // information, do the verification. + if (status.ok()) { + if (files_checksums.size() == files_to_ingest_.size() && + files_checksum_func_names.size() == files_to_ingest_.size()) { + // Verify the checksum and checksum function name. + if (ingestion_options_.verify_file_checksum) { + for (size_t i = 0; i < files_to_ingest_.size(); i++) { + if (files_checksum_func_names[i] != + generated_checksum_func_names[i]) { + status = Status::InvalidArgument( + "Checksum function name does not match with the checksum " + "function name of this DB"); + ROCKS_LOG_WARN( + db_options_.info_log, + "Sst file checksum verification of file: %s failed: %s", + external_files_paths[i].c_str(), status.ToString().c_str()); + break; + } + if (files_checksums[i] != generated_checksums[i]) { + status = Status::Corruption( + "Ingested checksum does not match with the generated " + "checksum"); + ROCKS_LOG_WARN( + db_options_.info_log, + "Sst file checksum verification of file: %s failed: %s", + files_to_ingest_[i].internal_file_path.c_str(), + status.ToString().c_str()); + break; + } + } + } else { + // If verify_file_checksum is not enabled, we only verify the + // checksum function name. If it does not match, fail the ingestion. + // If matches, we trust the ingested checksum information and store + // in the Manifest. + for (size_t i = 0; i < files_to_ingest_.size(); i++) { + if (files_checksum_func_names[i] != file_checksum_gen->Name()) { + status = Status::InvalidArgument( + "Checksum function name does not match with the checksum " + "function name of this DB"); + ROCKS_LOG_WARN( + db_options_.info_log, + "Sst file checksum verification of file: %s failed: %s", + external_files_paths[i].c_str(), status.ToString().c_str()); + break; + } + files_to_ingest_[i].file_checksum = files_checksums[i]; + files_to_ingest_[i].file_checksum_func_name = + files_checksum_func_names[i]; + } + } + } else if (files_checksums.size() != files_checksum_func_names.size() || + (files_checksums.size() == files_checksum_func_names.size() && + files_checksums.size() != 0)) { + // The checksum or checksum function name vector are not both empty + // and they are incomplete. + status = Status::InvalidArgument( + "The checksum information of ingested sst files are nonempty and " + "the size of checksums or the size of the checksum function " + "names " + "does not match with the number of ingested sst files"); + ROCKS_LOG_WARN( + db_options_.info_log, + "The ingested sst files checksum information is incomplete: %s", + status.ToString().c_str()); + } + } + } + // TODO: The following is duplicated with Cleanup(). if (!status.ok()) { // We failed, remove all files that we copied into the db @@ -245,6 +372,11 @@ Status ExternalSstFileIngestionJob::Run() { return status; } + status = GenerateChecksumForIngestedFile(&f); + if (!status.ok()) { + return status; + } + // We use the import time as the ancester time. This is the time the data // is written to the database. int64_t temp_current_time = 0; @@ -255,11 +387,11 @@ Status ExternalSstFileIngestionJob::Run() { static_cast(temp_current_time); } - edit_.AddFile( - f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(), - f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno, - f.assigned_seqno, false, kInvalidBlobFileNumber, oldest_ancester_time, - current_time, kUnknownFileChecksum, kUnknownFileChecksumFuncName); + edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), + f.fd.GetFileSize(), f.smallest_internal_key, + f.largest_internal_key, f.assigned_seqno, f.assigned_seqno, + false, kInvalidBlobFileNumber, oldest_ancester_time, + current_time, f.file_checksum, f.file_checksum_func_name); } return status; } @@ -687,6 +819,31 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( return Status::OK(); } +IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( + IngestedFileInfo* file_to_ingest) { + if (db_options_.file_checksum_gen_factory == nullptr || + need_generate_file_checksum_ == false || + ingestion_options_.write_global_seqno == false) { + // If file_checksum_gen_factory is not set, we are not able to generate + // the checksum. if write_global_seqno is false, it means we will use + // file checksum generated during Prepare(). This step will be skipped. + return IOStatus::OK(); + } + std::string file_checksum, file_checksum_func_name; + IOStatus io_s = GenerateOneFileChecksum( + fs_, file_to_ingest->internal_file_path, + db_options_.file_checksum_gen_factory.get(), &file_checksum, + &file_checksum_func_name, + ingestion_options_.verify_checksums_readahead_size, + db_options_.allow_mmap_reads); + if (!io_s.ok()) { + return io_s; + } + file_to_ingest->file_checksum = file_checksum; + file_to_ingest->file_checksum_func_name = file_checksum_func_name; + return IOStatus::OK(); +} + bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( const IngestedFileInfo* file_to_ingest, int level) { if (level == 0) { diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 7ddb6f3e8..f7723989d 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -63,6 +63,10 @@ struct IngestedFileInfo { // ingestion_options.move_files is false by default, thus copy_file is true // by default. bool copy_file = true; + // The checksum of ingested file + std::string file_checksum; + // The name of checksum function that generate the checksum + std::string file_checksum_func_name; }; class ExternalSstFileIngestionJob { @@ -90,6 +94,8 @@ class ExternalSstFileIngestionJob { // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, + const std::vector& files_checksums, + const std::vector& files_checksum_func_names, uint64_t next_file_number, SuperVersion* sv); // Check if we need to flush the memtable before running the ingestion job @@ -148,6 +154,8 @@ class ExternalSstFileIngestionJob { // Set the file global sequence number to `seqno` Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest, SequenceNumber seqno); + // Generate the file checksum and store in the IngestedFileInfo + IOStatus GenerateChecksumForIngestedFile(IngestedFileInfo* file_to_ingest); // Check if `file_to_ingest` can fit in level `level` // REQUIRES: Mutex held @@ -175,6 +183,9 @@ class ExternalSstFileIngestionJob { // Set in ExternalSstFileIngestionJob::Prepare(), if true all files are // ingested in L0 bool files_overlap_{false}; + // Set in ExternalSstFileIngestionJob::Prepare(), if true and DB + // file_checksum_gen_factory is set, DB will generate checksum each file. + bool need_generate_file_checksum_{true}; }; } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_util.cc b/file/file_util.cc index 8ee2bc490..603b22937 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -122,4 +122,68 @@ bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options) { return same; } +IOStatus GenerateOneFileChecksum(FileSystem* fs, const std::string& file_path, + FileChecksumGenFactory* checksum_factory, + std::string* file_checksum, + std::string* file_checksum_func_name, + size_t verify_checksums_readahead_size, + bool allow_mmap_reads) { + if (checksum_factory == nullptr) { + return IOStatus::InvalidArgument("Checksum factory is invalid"); + } + assert(file_checksum != nullptr); + assert(file_checksum_func_name != nullptr); + + FileChecksumGenContext gen_context; + std::unique_ptr checksum_generator = + checksum_factory->CreateFileChecksumGenerator(gen_context); + uint64_t size; + IOStatus io_s; + std::unique_ptr reader; + { + std::unique_ptr r_file; + io_s = fs->NewRandomAccessFile(file_path, FileOptions(), &r_file, nullptr); + if (!io_s.ok()) { + return io_s; + } + io_s = fs->GetFileSize(file_path, IOOptions(), &size, nullptr); + if (!io_s.ok()) { + return io_s; + } + reader.reset(new RandomAccessFileReader(std::move(r_file), file_path)); + } + + // Found that 256 KB readahead size provides the best performance, based on + // experiments, for auto readahead. Experiment data is in PR #3282. + size_t default_max_read_ahead_size = 256 * 1024; + size_t readahead_size = (verify_checksums_readahead_size != 0) + ? verify_checksums_readahead_size + : default_max_read_ahead_size; + + FilePrefetchBuffer prefetch_buffer( + reader.get(), readahead_size /* readadhead_size */, + readahead_size /* max_readahead_size */, !allow_mmap_reads /* enable */); + + Slice slice; + uint64_t offset = 0; + while (size > 0) { + size_t bytes_to_read = + static_cast(std::min(uint64_t{readahead_size}, size)); + if (!prefetch_buffer.TryReadFromCache(offset, bytes_to_read, &slice, + false)) { + return IOStatus::Corruption("file read failed"); + } + if (slice.size() == 0) { + return IOStatus::Corruption("file too small"); + } + checksum_generator->Update(slice.data(), slice.size()); + size -= slice.size(); + offset += slice.size(); + } + checksum_generator->Finalize(); + *file_checksum = checksum_generator->GetChecksum(); + *file_checksum_func_name = checksum_generator->Name(); + return IOStatus::OK(); +} + } // namespace ROCKSDB_NAMESPACE diff --git a/file/file_util.h b/file/file_util.h index 04694a5b9..ac7257e05 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -10,6 +10,7 @@ #include "options/db_options.h" #include "rocksdb/env.h" #include "rocksdb/file_system.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/status.h" #include "rocksdb/types.h" @@ -30,6 +31,12 @@ extern Status DeleteDBFile(const ImmutableDBOptions* db_options, extern bool IsWalDirSameAsDBPath(const ImmutableDBOptions* db_options); +extern IOStatus GenerateOneFileChecksum( + FileSystem* fs, const std::string& file_path, + FileChecksumGenFactory* checksum_factory, std::string* file_checksum, + std::string* file_checksum_func_name, + size_t verify_checksums_readahead_size, bool allow_mmap_reads); + inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, Env* env, IOOptions& opts) { if (!env) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 6d59111b8..337827191 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -111,10 +111,16 @@ struct RangePtr { RangePtr(const Slice* s, const Slice* l) : start(s), limit(l) {} }; +// It is valid that files_checksums and files_checksum_func_names are both +// empty (no checksum informaiton is provided for ingestion). Otherwise, +// their sizes should be the same as external_files. The file order should +// be the same in three vectors and guaranteed by the caller. struct IngestExternalFileArg { ColumnFamilyHandle* column_family = nullptr; std::vector external_files; IngestExternalFileOptions options; + std::vector files_checksums; + std::vector files_checksum_func_names; }; struct GetMergeOperandsOptions { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1404031a0..040892e1e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1571,6 +1571,26 @@ struct IngestExternalFileOptions { // Using a large readahead size (> 2MB) can typically improve the performance // of forward iteration on spinning disks. size_t verify_checksums_readahead_size = 0; + // Set to TRUE if user wants to verify the sst file checksum of ingested + // files. The DB checksum function will generate the checksum of each + // ingested file (if file_checksum_gen_factory is set) and compare the + // checksum function name and checksum with the ingested checksum information. + // + // If this option is set to True: 1) if DB does not enable checksum + // (file_checksum_gen_factory == nullptr), the ingested checksum information + // will be ignored; 2) If DB enable the checksum function, we calculate the + // sst file checksum after the file is moved or copied and compare the + // checksum and checksum name. If checksum or checksum function name does + // not match, ingestion will be failed. If the verification is sucessful, + // checksum and checksum function name will be stored in Manifest. + // If this option is set to FALSE, 1) if DB does not enable checksum, + // the ingested checksum information will be ignored; 2) if DB enable the + // checksum, we only verify the ingested checksum function name and we + // trust the ingested checksum. If the checksum function name matches, we + // store the checksum in Manifest. DB does not calculate the checksum during + // ingestion. However, if no checksum information is provided with the + // ingested files, DB will generate the checksum and store in the Manifest. + bool verify_file_checksum = true; }; enum TraceFilterType : uint64_t {