Generate file checksum in SstFileWriter (#6859)
Summary: If Option.file_checksum_gen_factory is set, rocksdb generates the file checksum during flush and compaction based on the checksum generator created by the factory and store the checksum and function name in vstorage and Manifest. This PR enable file checksum generation in SstFileWrite and store the checksum and checksum function name in the ExternalSstFileInfo, such that application can use them for other purpose, for example, ingest the file checksum with files in IngestExternalFile(). Pull Request resolved: https://github.com/facebook/rocksdb/pull/6859 Test Plan: add unit test and pass make asan_check. Reviewed By: ajkr Differential Revision: D21656247 Pulled By: zhichao-cao fbshipit-source-id: 78a3570c76031d8832e3d2de3d6c79cdf2b675d0
This commit is contained in:
parent
aaafcb80ab
commit
545e14b53b
@ -18,6 +18,7 @@
|
||||
|
||||
### New Feature
|
||||
* sst_dump to add a new --readahead_size argument. Users can specify read size when scanning the data. Sst_dump also tries to prefetch tail part of the SST files so usually some number of I/Os are saved there too.
|
||||
* Generate file checksum in SstFileWriter if Options.file_checksum_gen_factory is set. The checksum and checksum function name are stored in ExternalSstFileInfo after the sst file write is finished.
|
||||
|
||||
## 6.10 (5/2/2020)
|
||||
### Bug Fixes
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <functional>
|
||||
|
||||
#include "db/db_test_util.h"
|
||||
#include "db/version_edit.h"
|
||||
#include "port/port.h"
|
||||
#include "port/stack_trace.h"
|
||||
#include "rocksdb/sst_file_writer.h"
|
||||
@ -174,6 +175,111 @@ TEST_F(ExternalSSTFileBasicTest, Basic) {
|
||||
ASSERT_EQ(file1_info.num_range_del_entries, 0);
|
||||
ASSERT_EQ(file1_info.smallest_range_del_key, "");
|
||||
ASSERT_EQ(file1_info.largest_range_del_key, "");
|
||||
ASSERT_EQ(file1_info.file_checksum, kUnknownFileChecksum);
|
||||
ASSERT_EQ(file1_info.file_checksum_func_name, kUnknownFileChecksumFuncName);
|
||||
// sst_file_writer already finished, cannot add this value
|
||||
s = sst_file_writer.Put(Key(100), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
s = sst_file_writer.DeleteRange(Key(100), Key(200));
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
||||
DestroyAndReopen(options);
|
||||
// Add file using file path
|
||||
s = DeprecatedAddFile({file1});
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
||||
for (int k = 0; k < 100; k++) {
|
||||
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
||||
}
|
||||
|
||||
DestroyAndRecreateExternalSSTFilesDir();
|
||||
}
|
||||
|
||||
class ChecksumVerifyHelper {
|
||||
private:
|
||||
Options options_;
|
||||
|
||||
public:
|
||||
ChecksumVerifyHelper(Options& options) : options_(options) {}
|
||||
~ChecksumVerifyHelper() {}
|
||||
|
||||
Status GetSingleFileChecksumAndFuncName(
|
||||
const std::string& file_path, std::string* file_checksum,
|
||||
std::string* file_checksum_func_name) {
|
||||
Status s;
|
||||
EnvOptions soptions;
|
||||
std::unique_ptr<SequentialFile> file_reader;
|
||||
s = options_.env->NewSequentialFile(file_path, &file_reader, soptions);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
std::unique_ptr<char[]> scratch(new char[2048]);
|
||||
Slice result;
|
||||
FileChecksumGenFactory* file_checksum_gen_factory =
|
||||
options_.file_checksum_gen_factory.get();
|
||||
if (file_checksum_gen_factory == nullptr) {
|
||||
*file_checksum = kUnknownFileChecksum;
|
||||
*file_checksum_func_name = kUnknownFileChecksumFuncName;
|
||||
return Status::OK();
|
||||
} else {
|
||||
FileChecksumGenContext gen_context;
|
||||
std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
|
||||
file_checksum_gen_factory->CreateFileChecksumGenerator(gen_context);
|
||||
*file_checksum_func_name = file_checksum_gen->Name();
|
||||
s = file_reader->Read(2048, &result, scratch.get());
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
while (result.size() != 0) {
|
||||
file_checksum_gen->Update(scratch.get(), result.size());
|
||||
s = file_reader->Read(2048, &result, scratch.get());
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
file_checksum_gen->Finalize();
|
||||
*file_checksum = file_checksum_gen->GetChecksum();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(ExternalSSTFileBasicTest, BasicWithFileChecksumCrc32c) {
|
||||
Options options = CurrentOptions();
|
||||
options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
|
||||
ChecksumVerifyHelper checksum_helper(options);
|
||||
|
||||
SstFileWriter sst_file_writer(EnvOptions(), options);
|
||||
|
||||
// Current file size should be 0 after sst_file_writer init and before open a
|
||||
// file.
|
||||
ASSERT_EQ(sst_file_writer.FileSize(), 0);
|
||||
|
||||
// file1.sst (0 => 99)
|
||||
std::string file1 = sst_files_dir_ + "file1.sst";
|
||||
ASSERT_OK(sst_file_writer.Open(file1));
|
||||
for (int k = 0; k < 100; k++) {
|
||||
ASSERT_OK(sst_file_writer.Put(Key(k), Key(k) + "_val"));
|
||||
}
|
||||
ExternalSstFileInfo file1_info;
|
||||
Status s = sst_file_writer.Finish(&file1_info);
|
||||
ASSERT_TRUE(s.ok()) << s.ToString();
|
||||
std::string file_checksum, file_checksum_func_name;
|
||||
ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
|
||||
file1, &file_checksum, &file_checksum_func_name));
|
||||
|
||||
// Current file size should be non-zero after success write.
|
||||
ASSERT_GT(sst_file_writer.FileSize(), 0);
|
||||
|
||||
ASSERT_EQ(file1_info.file_path, file1);
|
||||
ASSERT_EQ(file1_info.num_entries, 100);
|
||||
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
||||
ASSERT_EQ(file1_info.largest_key, Key(99));
|
||||
ASSERT_EQ(file1_info.num_range_del_entries, 0);
|
||||
ASSERT_EQ(file1_info.smallest_range_del_key, "");
|
||||
ASSERT_EQ(file1_info.largest_range_del_key, "");
|
||||
ASSERT_EQ(file1_info.file_checksum, file_checksum);
|
||||
ASSERT_EQ(file1_info.file_checksum_func_name, file_checksum_func_name);
|
||||
// sst_file_writer already finished, cannot add this value
|
||||
s = sst_file_writer.Put(Key(100), "bad_val");
|
||||
ASSERT_FALSE(s.ok()) << s.ToString();
|
||||
|
@ -34,6 +34,8 @@ struct ExternalSstFileInfo {
|
||||
largest_key(""),
|
||||
smallest_range_del_key(""),
|
||||
largest_range_del_key(""),
|
||||
file_checksum(""),
|
||||
file_checksum_func_name(""),
|
||||
sequence_number(0),
|
||||
file_size(0),
|
||||
num_entries(0),
|
||||
@ -50,6 +52,8 @@ struct ExternalSstFileInfo {
|
||||
largest_key(_largest_key),
|
||||
smallest_range_del_key(""),
|
||||
largest_range_del_key(""),
|
||||
file_checksum(""),
|
||||
file_checksum_func_name(""),
|
||||
sequence_number(_sequence_number),
|
||||
file_size(_file_size),
|
||||
num_entries(_num_entries),
|
||||
@ -62,6 +66,8 @@ struct ExternalSstFileInfo {
|
||||
std::string
|
||||
smallest_range_del_key; // smallest range deletion user key in file
|
||||
std::string largest_range_del_key; // largest range deletion user key in file
|
||||
std::string file_checksum; // sst file checksum;
|
||||
std::string file_checksum_func_name; // The name of file checksum function
|
||||
SequenceNumber sequence_number; // sequence number of all keys in file
|
||||
uint64_t file_size; // file size in bytes
|
||||
uint64_t num_entries; // number of entries in file
|
||||
|
@ -243,10 +243,10 @@ Status SstFileWriter::Open(const std::string& file_path) {
|
||||
&int_tbl_prop_collector_factories, compression_type,
|
||||
sample_for_compression, compression_opts, r->skip_filters,
|
||||
r->column_family_name, unknown_level);
|
||||
r->file_writer.reset(
|
||||
new WritableFileWriter(NewLegacyWritableFileWrapper(std::move(sst_file)),
|
||||
file_path, r->env_options, r->ioptions.env,
|
||||
nullptr /* stats */, r->ioptions.listeners));
|
||||
r->file_writer.reset(new WritableFileWriter(
|
||||
NewLegacyWritableFileWrapper(std::move(sst_file)), file_path,
|
||||
r->env_options, r->ioptions.env, nullptr /* stats */,
|
||||
r->ioptions.listeners, r->ioptions.file_checksum_gen_factory));
|
||||
|
||||
// TODO(tec) : If table_factory is using compressed block cache, we will
|
||||
// be adding the external sst file blocks into it, which is wasteful.
|
||||
@ -300,6 +300,11 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
|
||||
s = r->file_writer->Close();
|
||||
}
|
||||
}
|
||||
if (s.ok()) {
|
||||
r->file_info.file_checksum = r->file_writer->GetFileChecksum();
|
||||
r->file_info.file_checksum_func_name =
|
||||
r->file_writer->GetFileChecksumFuncName();
|
||||
}
|
||||
if (!s.ok()) {
|
||||
r->ioptions.env->DeleteFile(r->file_info.file_path);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user