Checksum for each SST file and stores in MANIFEST (#6216)

Summary:
In the current code base, RocksDB generate the checksum for each block and verify the checksum at usage. Current PR enable SST file checksum. After a SST file is generated by Flush or Compaction, RocksDB generate the SST file checksum and store the checksum value and checksum method name in the vs_info and MANIFEST as part for the FileMetadata.

Added the enable_sst_file_checksum to Options to enable or disable file checksum. Added sst_file_checksum to Options such that user can plugin their own SST file checksum calculate method via overriding the SstFileChecksum class. The checksum information inlcuding uint32_t checksum value and a checksum name (string).  A new tool is added to LDB such that user can dump out a list of file checksum information from MANIFEST. If user enables the file checksum but does not provide the sst_file_checksum instance, RocksDB will use the default crc32checksum implemented in table/sst_file_checksum_crc32c.h
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6216

Test Plan: Added the testing case in table_test and ldb_cmd_test to verify checksum is correct in different level. Pass make asan_check.

Differential Revision: D19171461

Pulled By: zhichao-cao

fbshipit-source-id: b2e53479eefc5bb0437189eaa1941670e5ba8b87
This commit is contained in:
Zhichao Cao 2020-02-10 15:42:46 -08:00 committed by Facebook Github Bot
parent 594e815e32
commit 4369f2c7bb
49 changed files with 1355 additions and 71 deletions

View File

@ -665,6 +665,7 @@ set(SOURCES
util/random.cc
util/rate_limiter.cc
util/slice.cc
util/file_checksum_helper.cc
util/status.cc
util/string_util.cc
util/thread_local.cc

View File

@ -18,6 +18,9 @@
* The BlobDB garbage collector now emits the statistics `BLOB_DB_GC_NUM_FILES` (number of blob files obsoleted during GC), `BLOB_DB_GC_NUM_NEW_FILES` (number of new blob files generated during GC), `BLOB_DB_GC_FAILURES` (number of failed GC passes), `BLOB_DB_GC_NUM_KEYS_RELOCATED` (number of blobs relocated during GC), and `BLOB_DB_GC_BYTES_RELOCATED` (total size of blobs relocated during GC). On the other hand, the following statistics, which are not relevant for the new GC implementation, are now deprecated: `BLOB_DB_GC_NUM_KEYS_OVERWRITTEN`, `BLOB_DB_GC_NUM_KEYS_EXPIRED`, `BLOB_DB_GC_BYTES_OVERWRITTEN`, `BLOB_DB_GC_BYTES_EXPIRED`, and `BLOB_DB_GC_MICROS`.
* Disable recycle_log_file_num when an inconsistent recovery modes are requested: kPointInTimeRecovery and kAbsoluteConsistency
### New Features
* Added the checksum for each SST file generated by Flush or Compaction. Added sst_file_checksum_func to Options such that user can plugin their own SST file checksum function via override the FileChecksumFunc class. If user does not set the sst_file_checksum_func, SST file checksum calculation will not be enabled. The checksum information inlcuding uint32_t checksum value and a checksum function name (string). The checksum information is stored in FileMetadata in version store and also logged to MANIFEST. A new tool is added to LDB such that user can dump out a list of file checksum information from MANIFEST (stored in an unordered_map).
## 6.7.0 (01/21/2020)
### Public API Change
* Added a rocksdb::FileSystem class in include/rocksdb/file_system.h to encapsulate file creation/read/write operations, and an option DBOptions::file_system to allow a user to pass in an instance of rocksdb::FileSystem. If its a non-null value, this will take precendence over DBOptions::env for file operations. A new API rocksdb::FileSystem::Default() returns a platform default object. The DBOptions::env option and Env::Default() API will continue to be used for threading and other OS related functions, and where DBOptions::file_system is not specified, for file operations. For storage developers who are accustomed to rocksdb::Env, the interface in rocksdb::FileSystem is new and will probably undergo some changes as more storage systems are ported to it from rocksdb::Env. As of now, no env other than Posix has been ported to the new interface.

View File

@ -283,6 +283,7 @@ cpp_library(
"util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc",
"util/dynamic_bloom.cc",
"util/file_checksum_helper.cc",
"util/hash.cc",
"util/murmurhash.cc",
"util/random.cc",

View File

@ -131,9 +131,10 @@ Status BuildTable(
file->SetIOPriority(io_priority);
file->SetWriteLifeTimeHint(write_hint);
file_writer.reset(
new WritableFileWriter(std::move(file), fname, file_options, env,
ioptions.statistics, ioptions.listeners));
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, env, ioptions.statistics,
ioptions.listeners, ioptions.sst_file_checksum_func));
builder = NewTableBuilder(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,
@ -199,6 +200,9 @@ Status BuildTable(
if (table_properties) {
*table_properties = tp;
}
// Add the checksum information to file metadata.
meta->file_checksum = builder->GetFileChecksum();
meta->file_checksum_func_name = builder->GetFileChecksumFuncName();
}
delete builder;

View File

@ -1296,6 +1296,11 @@ Status CompactionJob::FinishCompactionOutputFile(
}
const uint64_t current_bytes = sub_compact->builder->FileSize();
if (s.ok()) {
// Add the checksum information to file metadata.
meta->file_checksum = sub_compact->builder->GetFileChecksum();
meta->file_checksum_func_name =
sub_compact->builder->GetFileChecksumFuncName();
meta->fd.file_size = current_bytes;
}
sub_compact->current_output()->finished = true;
@ -1508,7 +1513,8 @@ Status CompactionJob::OpenCompactionOutputFile(
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, file_options_,
env_, db_options_.statistics.get(), listeners));
env_, db_options_.statistics.get(), listeners,
db_options_.sst_file_checksum_func.get()));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where

View File

@ -187,7 +187,8 @@ class CompactionJobTest : public testing::Test {
VersionEdit edit;
edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
smallest_seqno, largest_seqno, false, oldest_blob_file_number,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
mutex_.Lock();
versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),

View File

@ -95,7 +95,8 @@ class CompactionPickerTest : public testing::Test {
InternalKey(smallest, smallest_seq, kTypeValue),
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
vstorage_->AddFile(level, f);

View File

@ -21,7 +21,7 @@ using std::unique_ptr;
namespace rocksdb {
namespace {
static const Comparator* comparator;
static const Comparator* kTestComparator = nullptr;
class KVIter : public Iterator {
public:
@ -74,7 +74,7 @@ void AssertItersEqual(Iterator* iter1, Iterator* iter2) {
void DoRandomIteraratorTest(DB* db, std::vector<std::string> source_strings,
Random* rnd, int num_writes, int num_iter_ops,
int num_trigger_flush) {
stl_wrappers::KVMap map((stl_wrappers::LessOfComparator(comparator)));
stl_wrappers::KVMap map((stl_wrappers::LessOfComparator(kTestComparator)));
for (int i = 0; i < num_writes; i++) {
if (num_trigger_flush > 0 && i != 0 && i % num_trigger_flush == 0) {
@ -263,7 +263,7 @@ class ComparatorDBTest
public:
ComparatorDBTest() : env_(Env::Default()), db_(nullptr) {
comparator = BytewiseComparator();
kTestComparator = BytewiseComparator();
dbname_ = test::PerThreadDBPath("comparator_db_test");
BlockBasedTableOptions toptions;
toptions.format_version = GetParam();
@ -275,7 +275,7 @@ class ComparatorDBTest
~ComparatorDBTest() override {
delete db_;
EXPECT_OK(DestroyDB(dbname_, last_options_));
comparator = BytewiseComparator();
kTestComparator = BytewiseComparator();
}
DB* GetDB() { return db_; }
@ -286,7 +286,7 @@ class ComparatorDBTest
} else {
comparator_guard.reset();
}
comparator = cmp;
kTestComparator = cmp;
last_options_.comparator = cmp;
}
@ -334,7 +334,7 @@ TEST_P(ComparatorDBTest, SimpleSuffixReverseComparator) {
for (int rnd_seed = 301; rnd_seed < 316; rnd_seed++) {
Options* opt = GetOptions();
opt->comparator = comparator;
opt->comparator = kTestComparator;
DestroyAndReopen();
Random rnd(rnd_seed);
@ -360,7 +360,7 @@ TEST_P(ComparatorDBTest, Uint64Comparator) {
for (int rnd_seed = 301; rnd_seed < 316; rnd_seed++) {
Options* opt = GetOptions();
opt->comparator = comparator;
opt->comparator = kTestComparator;
DestroyAndReopen();
Random rnd(rnd_seed);
Random64 rnd64(rnd_seed);
@ -384,7 +384,7 @@ TEST_P(ComparatorDBTest, DoubleComparator) {
for (int rnd_seed = 301; rnd_seed < 316; rnd_seed++) {
Options* opt = GetOptions();
opt->comparator = comparator;
opt->comparator = kTestComparator;
DestroyAndReopen();
Random rnd(rnd_seed);
@ -409,7 +409,7 @@ TEST_P(ComparatorDBTest, HashComparator) {
for (int rnd_seed = 301; rnd_seed < 316; rnd_seed++) {
Options* opt = GetOptions();
opt->comparator = comparator;
opt->comparator = kTestComparator;
DestroyAndReopen();
Random rnd(rnd_seed);
@ -428,7 +428,7 @@ TEST_P(ComparatorDBTest, TwoStrComparator) {
for (int rnd_seed = 301; rnd_seed < 316; rnd_seed++) {
Options* opt = GetOptions();
opt->comparator = comparator;
opt->comparator = kTestComparator;
DestroyAndReopen();
Random rnd(rnd_seed);

View File

@ -1258,7 +1258,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time);
f->oldest_ancester_time, f->file_creation_time,
f->file_checksum, f->file_checksum_func_name);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
@ -2669,7 +2670,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time);
f->file_creation_time, f->file_checksum,
f->file_checksum_func_name);
ROCKS_LOG_BUFFER(
log_buffer,

View File

@ -129,7 +129,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time);
f->oldest_ancester_time, f->file_creation_time,
f->file_checksum, f->file_checksum_func_name);
}
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),

View File

@ -1259,7 +1259,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.oldest_blob_file_number,
meta.oldest_ancester_time, meta.file_creation_time);
meta.oldest_ancester_time, meta.file_creation_time,
meta.file_checksum, meta.file_checksum_func_name);
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);

View File

@ -255,11 +255,11 @@ Status ExternalSstFileIngestionJob::Run() {
static_cast<uint64_t>(temp_current_time);
}
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, f.assigned_seqno, f.assigned_seqno,
false, kInvalidBlobFileNumber, oldest_ancester_time,
current_time);
edit_.AddFile(
f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(),
f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno,
f.assigned_seqno, false, kInvalidBlobFileNumber, oldest_ancester_time,
current_time, kUnknownFileChecksum, kUnknownFileChecksumFuncName);
}
return status;
}

View File

@ -416,7 +416,8 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction, meta_.oldest_blob_file_number,
meta_.oldest_ancester_time, meta_.file_creation_time);
meta_.oldest_ancester_time, meta_.file_creation_time,
meta_.file_checksum, meta_.file_checksum_func_name);
}
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first first flushed memtable.

View File

@ -153,7 +153,8 @@ Status ImportColumnFamilyJob::Run() {
f.fd.GetFileSize(), f.smallest_internal_key,
f.largest_internal_key, file_metadata.smallest_seqno,
file_metadata.largest_seqno, false, kInvalidBlobFileNumber,
oldest_ancester_time, current_time);
oldest_ancester_time, current_time, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
// If incoming sequence number is higher, update local sequence number.
if (file_metadata.largest_seqno > versions_->LastSequence()) {

View File

@ -586,7 +586,8 @@ class Repairer {
table->meta.largest, table->meta.fd.smallest_seqno,
table->meta.fd.largest_seqno, table->meta.marked_for_compaction,
table->meta.oldest_blob_file_number,
table->meta.oldest_ancester_time, table->meta.file_creation_time);
table->meta.oldest_ancester_time, table->meta.file_creation_time,
table->meta.file_checksum, table->meta.file_checksum_func_name);
}
assert(next_file_number_ > 0);
vset_.MarkFileNumberUsed(next_file_number_ - 1);

View File

@ -63,7 +63,8 @@ class VersionBuilderTest : public testing::Test {
file_number, path_id, file_size, GetInternalKey(smallest, smallest_seq),
GetInternalKey(largest, largest_seq), smallest_seqno, largest_seqno,
/* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
f->compensated_file_size = file_size;
f->num_entries = num_entries;
f->num_deletions = num_deletions;
@ -115,7 +116,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveTo) {
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.DeleteFile(3, 27U);
EnvOptions env_options;
@ -151,7 +153,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic) {
version_edit.AddFile(3, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
@ -190,7 +193,8 @@ TEST_F(VersionBuilderTest, ApplyAndSaveToDynamic2) {
version_edit.AddFile(4, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.DeleteFile(0, 1U);
version_edit.DeleteFile(0, 88U);
version_edit.DeleteFile(4, 6U);
@ -220,23 +224,28 @@ TEST_F(VersionBuilderTest, ApplyMultipleAndSaveTo) {
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
GetInternalKey("450"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
GetInternalKey("650"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
GetInternalKey("550"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
GetInternalKey("750"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
EnvOptions env_options;
@ -264,36 +273,43 @@ TEST_F(VersionBuilderTest, ApplyDeleteAndSaveTo) {
version_edit.AddFile(2, 666, 0, 100U, GetInternalKey("301"),
GetInternalKey("350"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 676, 0, 100U, GetInternalKey("401"),
GetInternalKey("450"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 636, 0, 100U, GetInternalKey("601"),
GetInternalKey("650"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 616, 0, 100U, GetInternalKey("501"),
GetInternalKey("550"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit.AddFile(2, 606, 0, 100U, GetInternalKey("701"),
GetInternalKey("750"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_builder.Apply(&version_edit);
VersionEdit version_edit2;
version_edit.AddFile(2, 808, 0, 100U, GetInternalKey("901"),
GetInternalKey("950"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_edit2.DeleteFile(2, 616);
version_edit2.DeleteFile(2, 636);
version_edit.AddFile(2, 806, 0, 100U, GetInternalKey("801"),
GetInternalKey("850"), 200, 200, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
version_builder.Apply(&version_edit2);
version_builder.SaveTo(&new_vstorage);

View File

@ -18,7 +18,10 @@
#include "util/string_util.h"
namespace rocksdb {
// The unknown file checksum.
const std::string kUnknownFileChecksum("");
// The unknown sst file checksum function name.
const std::string kUnknownFileChecksumFuncName("Unknown");
// Mask for an identified tag from the future which can be safely ignored.
const uint32_t kTagSafeIgnoreMask = 1 << 13;
@ -63,6 +66,8 @@ enum CustomTag : uint32_t {
kOldestBlobFileNumber = 4,
kOldestAncesterTime = 5,
kFileCreationTime = 6,
kFileChecksum = 7,
kFileChecksumFuncName = 8,
kPathId = 65,
};
// If this bit for the custom tag is set, opening DB should fail if
@ -226,6 +231,12 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
&varint_file_creation_time);
PutLengthPrefixedSlice(dst, Slice(varint_file_creation_time));
PutVarint32(dst, CustomTag::kFileChecksum);
PutLengthPrefixedSlice(dst, Slice(f.file_checksum));
PutVarint32(dst, CustomTag::kFileChecksumFuncName);
PutLengthPrefixedSlice(dst, Slice(f.file_checksum_func_name));
if (f.fd.GetPathId() != 0) {
PutVarint32(dst, CustomTag::kPathId);
char p = static_cast<char>(f.fd.GetPathId());
@ -349,6 +360,12 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
return "invalid file creation time";
}
break;
case kFileChecksum:
f.file_checksum = field.ToString();
break;
case kFileChecksumFuncName:
f.file_checksum_func_name = field.ToString();
break;
case kNeedCompaction:
if (field.size() != 1) {
return "need_compaction field wrong size";
@ -678,6 +695,10 @@ std::string VersionEdit::DebugString(bool hex_key) const {
AppendNumberTo(&r, f.oldest_ancester_time);
r.append(" file_creation_time:");
AppendNumberTo(&r, f.file_creation_time);
r.append(" file_checksum:");
r.append(f.file_checksum);
r.append(" file_checksum_func_name: ");
r.append(f.file_checksum_func_name);
}
r.append("\n ColumnFamily: ");
AppendNumberTo(&r, column_family_);

View File

@ -28,6 +28,9 @@ constexpr uint64_t kInvalidBlobFileNumber = 0;
constexpr uint64_t kUnknownOldestAncesterTime = 0;
constexpr uint64_t kUnknownFileCreationTime = 0;
extern const std::string kUnknownFileChecksum;
extern const std::string kUnknownFileChecksumFuncName;
extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id);
// A copyable structure contains information needed to read data from an SST
@ -134,6 +137,12 @@ struct FileMetaData {
// Unix time when the SST file is created.
uint64_t file_creation_time = kUnknownFileCreationTime;
// File checksum
std::string file_checksum = kUnknownFileChecksum;
// File checksum function name
std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
FileMetaData() = default;
FileMetaData(uint64_t file, uint32_t file_path_id, uint64_t file_size,
@ -141,14 +150,17 @@ struct FileMetaData {
const SequenceNumber& smallest_seq,
const SequenceNumber& largest_seq, bool marked_for_compact,
uint64_t oldest_blob_file, uint64_t _oldest_ancester_time,
uint64_t _file_creation_time)
uint64_t _file_creation_time, const std::string& _file_checksum,
const std::string& _file_checksum_func_name)
: fd(file, file_path_id, file_size, smallest_seq, largest_seq),
smallest(smallest_key),
largest(largest_key),
marked_for_compaction(marked_for_compact),
oldest_blob_file_number(oldest_blob_file),
oldest_ancester_time(_oldest_ancester_time),
file_creation_time(_file_creation_time) {
file_creation_time(_file_creation_time),
file_checksum(_file_checksum),
file_checksum_func_name(_file_checksum_func_name) {
TEST_SYNC_POINT_CALLBACK("FileMetaData::FileMetaData", this);
}
@ -314,13 +326,15 @@ class VersionEdit {
const InternalKey& largest, const SequenceNumber& smallest_seqno,
const SequenceNumber& largest_seqno, bool marked_for_compaction,
uint64_t oldest_blob_file_number, uint64_t oldest_ancester_time,
uint64_t file_creation_time) {
uint64_t file_creation_time, const std::string& file_checksum,
const std::string& file_checksum_func_name) {
assert(smallest_seqno <= largest_seqno);
new_files_.emplace_back(
level, FileMetaData(file, file_path_id, file_size, smallest, largest,
smallest_seqno, largest_seqno,
marked_for_compaction, oldest_blob_file_number,
oldest_ancester_time, file_creation_time));
oldest_ancester_time, file_creation_time,
file_checksum, file_checksum_func_name));
}
void AddFile(int level, const FileMetaData& f) {

View File

@ -37,7 +37,7 @@ TEST_F(VersionEditTest, EncodeDecode) {
InternalKey("foo", kBig + 500 + i, kTypeValue),
InternalKey("zoo", kBig + 600 + i, kTypeDeletion),
kBig + 500 + i, kBig + 600 + i, false, kInvalidBlobFileNumber,
888, 678);
888, 678, "234", "crc32c");
edit.DeleteFile(4, kBig + 700 + i);
}
@ -55,18 +55,22 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
kBig + 600, true, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
kBig + 601, false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
edit.AddFile(5, 302, 0, 100, InternalKey("foo", kBig + 502, kTypeValue),
InternalKey("zoo", kBig + 602, kTypeDeletion), kBig + 502,
kBig + 602, true, kInvalidBlobFileNumber, 666, 888);
kBig + 602, true, kInvalidBlobFileNumber, 666, 888,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
edit.AddFile(5, 303, 0, 100, InternalKey("foo", kBig + 503, kTypeBlobIndex),
InternalKey("zoo", kBig + 603, kTypeBlobIndex), kBig + 503,
kBig + 603, true, 1001, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
;
edit.DeleteFile(4, 700);
@ -106,10 +110,12 @@ TEST_F(VersionEditTest, ForwardCompatibleNewFile4) {
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
kBig + 600, true, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue),
InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501,
kBig + 601, false, kInvalidBlobFileNumber, 686, 868);
kBig + 601, false, kInvalidBlobFileNumber, 686, 868, "234",
"crc32c");
edit.DeleteFile(4, 700);
edit.SetComparatorName("foo");
@ -156,7 +162,8 @@ TEST_F(VersionEditTest, NewFile4NotSupportedField) {
edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue),
InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500,
kBig + 600, true, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
edit.SetComparatorName("foo");
edit.SetLogNumber(kBig + 100);
@ -185,7 +192,8 @@ TEST_F(VersionEditTest, EncodeEmptyFile) {
VersionEdit edit;
edit.AddFile(0, 0, 0, 0, InternalKey(), InternalKey(), 0, 0, false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
std::string buffer;
ASSERT_TRUE(!edit.EncodeTo(&buffer));
}

View File

@ -1464,7 +1464,8 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
file->largest.user_key().ToString(),
file->stats.num_reads_sampled.load(std::memory_order_relaxed),
file->being_compacted, file->oldest_blob_file_number,
file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime()});
file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
file->file_checksum, file->file_checksum_func_name});
files.back().num_entries = file->num_entries;
files.back().num_deletions = file->num_deletions;
level_size += file->fd.GetFileSize();
@ -4713,6 +4714,34 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
}
// Get the checksum information including the checksum and checksum function
// name of all SST files in VersionSet. Store the information in
// FileChecksumList which contains a map from file number to its checksum info.
// If DB is not running, make sure call VersionSet::Recover() to load the file
// metadata from Manifest to VersionSet before calling this function.
Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
// Clean the previously stored checksum information if any.
if (checksum_list == nullptr) {
return Status::InvalidArgument("checksum_list is nullptr");
}
checksum_list->reset();
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped() || !cfd->initialized()) {
continue;
}
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& file :
cfd->current()->storage_info()->LevelFiles(level)) {
checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
file->file_checksum,
file->file_checksum_func_name);
}
}
}
return Status::OK();
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
bool verbose, bool hex, bool json) {
// Open the specified manifest file.
@ -5002,7 +5031,8 @@ Status VersionSet::WriteCurrentStateToManifest(
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time);
f->oldest_ancester_time, f->file_creation_time,
f->file_checksum, f->file_checksum_func_name);
}
}
const auto iter = curr_state.find(cfd->GetID());
@ -5429,6 +5459,8 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
filemetadata.num_entries = file->num_entries;
filemetadata.num_deletions = file->num_deletions;
filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
filemetadata.file_checksum = file->file_checksum;
filemetadata.file_checksum_func_name = file->file_checksum_func_name;
metadata->push_back(filemetadata);
}
}

View File

@ -44,6 +44,7 @@
#include "options/db_options.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
#include "table/get_context.h"
#include "table/multiget_context.h"
#include "trace_replay/block_cache_tracer.h"
@ -895,6 +896,9 @@ class VersionSet {
const FileOptions& file_options,
int new_levels);
// Get the checksum information of all live files
Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list);
// printf contents (for debugging)
Status DumpManifest(Options& options, std::string& manifestFileName,
bool verbose, bool hex = false, bool json = false);

View File

@ -40,7 +40,8 @@ class GenerateLevelFilesBriefTest : public testing::Test {
InternalKey(smallest, smallest_seq, kTypeValue),
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, /* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
files_.push_back(f);
}
@ -135,7 +136,8 @@ class VersionStorageInfoTest : public testing::Test {
file_number, 0, file_size, GetInternalKey(smallest, 0),
GetInternalKey(largest, 0), /* smallest_seq */ 0, /* largest_seq */ 0,
/* marked_for_compact */ false, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime);
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
f->compensated_file_size = file_size;
vstorage_.AddFile(level, f);
}
@ -147,7 +149,8 @@ class VersionStorageInfoTest : public testing::Test {
file_number, 0, file_size, smallest, largest, /* smallest_seq */ 0,
/* largest_seq */ 0, /* marked_for_compact */ false,
kInvalidBlobFileNumber, kUnknownOldestAncesterTime,
kUnknownFileCreationTime);
kUnknownFileCreationTime, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
f->compensated_file_size = file_size;
vstorage_.AddFile(level, f);
}

View File

@ -12,6 +12,7 @@
#include <algorithm>
#include <mutex>
#include "db/version_edit.h"
#include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h"
#include "port/port.h"
@ -88,6 +89,7 @@ Status WritableFileWriter::Append(const Slice& data) {
TEST_KILL_RANDOM("WritableFileWriter::Append:1", rocksdb_kill_odds);
if (s.ok()) {
filesize_ += data.size();
CalculateFileChecksum(data);
}
return s;
}
@ -214,6 +216,14 @@ Status WritableFileWriter::Flush() {
return s;
}
const char* WritableFileWriter::GetFileChecksumFuncName() const {
if (checksum_func_ != nullptr) {
return checksum_func_->Name();
} else {
return kUnknownFileChecksumFuncName.c_str();
}
}
Status WritableFileWriter::Sync(bool use_fsync) {
Status s = Flush();
if (!s.ok()) {
@ -321,6 +331,18 @@ Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
return s;
}
void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
if (checksum_func_ != nullptr) {
if (is_first_checksum_) {
file_checksum_ = checksum_func_->Value(data.data(), data.size());
is_first_checksum_ = false;
} else {
file_checksum_ =
checksum_func_->Extend(file_checksum_, data.data(), data.size());
}
}
}
// This flushes the accumulated data in the buffer. We pad data with zeros if
// necessary to the whole page.
// However, during automatic flushes padding would not be necessary.

View File

@ -10,8 +10,10 @@
#pragma once
#include <atomic>
#include <string>
#include "db/version_edit.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/file_system.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
@ -47,6 +49,7 @@ class WritableFileWriter {
#endif // ROCKSDB_LITE
bool ShouldNotifyListeners() const { return !listeners_.empty(); }
void CalculateFileChecksum(const Slice& data);
std::unique_ptr<FSWritableFile> writable_file_;
std::string file_name_;
@ -68,13 +71,17 @@ class WritableFileWriter {
RateLimiter* rate_limiter_;
Statistics* stats_;
std::vector<std::shared_ptr<EventListener>> listeners_;
FileChecksumFunc* checksum_func_;
std::string file_checksum_ = kUnknownFileChecksum;
bool is_first_checksum_ = true;
public:
WritableFileWriter(
std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
const FileOptions& options, Env* env = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {})
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
FileChecksumFunc* checksum_func = nullptr)
: writable_file_(std::move(file)),
file_name_(_file_name),
env_(env),
@ -89,7 +96,8 @@ class WritableFileWriter {
bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter),
stats_(stats),
listeners_() {
listeners_(),
checksum_func_(checksum_func) {
TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
reinterpret_cast<void*>(max_buffer_size_));
buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
@ -141,6 +149,14 @@ class WritableFileWriter {
bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
void TEST_SetFileChecksumFunc(FileChecksumFunc* checksum_func) {
checksum_func_ = checksum_func;
}
const std::string& GetFileChecksum() const { return file_checksum_; }
const char* GetFileChecksumFuncName() const;
private:
// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode

View File

@ -0,0 +1,86 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <cassert>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "rocksdb/status.h"
namespace rocksdb {
// FileChecksumFunc is the function class to generates the checksum value
// for each file when the file is written to the file system.
class FileChecksumFunc {
public:
virtual ~FileChecksumFunc() {}
// Return the checksum of concat (A, data[0,n-1]) where init_checksum is the
// returned value of some string A. It is used to maintain the checksum of a
// stream of data
virtual std::string Extend(const std::string& init_checksum, const char* data,
size_t n) = 0;
// Return the checksum value of data[0,n-1]
virtual std::string Value(const char* data, size_t n) = 0;
// Return a processed value of the checksum for store in somewhere
virtual std::string ProcessChecksum(const std::string& checksum) = 0;
// Returns a name that identifies the current file checksum function.
virtual const char* Name() const = 0;
};
// FileChecksumList stores the checksum information of a list of files (e.g.,
// SST files). The FileChecksumLIst can be used to store the checksum
// information of all SST file getting from the MANIFEST, which are
// the checksum information of all valid SST file of a DB instance. It can
// also be used to store the checksum information of a list of SST files to
// be ingested.
class FileChecksumList {
public:
virtual ~FileChecksumList() {}
// Clean the previously stored file checksum information.
virtual void reset() = 0;
// Get the number of checksums in the checksum list
virtual size_t size() const = 0;
// Return all the file checksum information being stored in a unordered_map.
// File_number is the key, the first part of the value is checksum value,
// and the second part of the value is checksum function name.
virtual Status GetAllFileChecksums(
std::vector<uint64_t>* file_numbers, std::vector<std::string>* checksums,
std::vector<std::string>* checksum_func_names) = 0;
// Given the file_number, it searches if the file checksum information is
// stored.
virtual Status SearchOneFileChecksum(uint64_t file_number,
std::string* checksum,
std::string* checksum_func_name) = 0;
// Insert the checksum information of one file to the FileChecksumList.
virtual Status InsertOneFileChecksum(
uint64_t file_number, const std::string& checksum,
const std::string& checksum_func_name) = 0;
// Remove the checksum information of one SST file.
virtual Status RemoveOneFileChecksum(uint64_t file_number) = 0;
};
// Create a new file checksum list.
extern FileChecksumList* NewFileChecksumList();
// Create a Crc32c based file checksum function
extern FileChecksumFunc* CreateFileChecksumFuncCrc32c();
} // namespace rocksdb

View File

@ -70,7 +70,9 @@ struct SstFileMetaData {
const std::string& _smallestkey,
const std::string& _largestkey, uint64_t _num_reads_sampled,
bool _being_compacted, uint64_t _oldest_blob_file_number,
uint64_t _oldest_ancester_time, uint64_t _file_creation_time)
uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
std::string& _file_checksum,
std::string& _file_checksum_func_name)
: size(_size),
name(_file_name),
file_number(_file_number),
@ -85,7 +87,9 @@ struct SstFileMetaData {
num_deletions(0),
oldest_blob_file_number(_oldest_blob_file_number),
oldest_ancester_time(_oldest_ancester_time),
file_creation_time(_file_creation_time) {}
file_creation_time(_file_creation_time),
file_checksum(_file_checksum),
file_checksum_func_name(_file_checksum_func_name) {}
// File size in bytes.
size_t size;
@ -117,6 +121,18 @@ struct SstFileMetaData {
// Timestamp when the SST file is created, provided by Env::GetCurrentTime().
// 0 if the information is not available.
uint64_t file_creation_time;
// The checksum of a SST file, the value is decided by the file content and
// the checksum algorithm used for this SST file. The checksum function is
// identified by the file_checksum_func_name. If the checksum function is
// not specified, file_checksum is "0" by default.
std::string file_checksum;
// The name of the checksum function used to generate the file checksum
// value. If file checksum is not enabled (e.g., sst_file_checksum_func is
// null), file_checksum_func_name is UnknownFileChecksumFuncName, which is
// "Unknown".
std::string file_checksum_func_name;
};
// The full set of metadata associated with each SST file.

View File

@ -19,6 +19,7 @@
#include "rocksdb/advanced_options.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/listener.h"
#include "rocksdb/universal_compaction.h"
#include "rocksdb/version.h"
@ -1122,6 +1123,13 @@ struct DBOptions {
//
// Default: 0
size_t log_readahead_size = 0;
// If user does NOT provide SST file checksum function, the SST file checksum
// will NOT be used. The single checksum instance are shared by options and
// file writers. Make sure the algorithm is thread safe.
//
// Default: nullptr
std::shared_ptr<FileChecksumFunc> sst_file_checksum_func = nullptr;
};
// Options to control the behavior of a database (passed to DB::Open)

View File

@ -77,7 +77,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
memtable_insert_with_hint_prefix_extractor(
cf_options.memtable_insert_with_hint_prefix_extractor.get()),
cf_paths(cf_options.cf_paths),
compaction_thread_limiter(cf_options.compaction_thread_limiter) {}
compaction_thread_limiter(cf_options.compaction_thread_limiter),
sst_file_checksum_func(db_options.sst_file_checksum_func.get()) {}
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) {

View File

@ -125,6 +125,8 @@ struct ImmutableCFOptions {
std::vector<DbPath> cf_paths;
std::shared_ptr<ConcurrentTaskLimiter> compaction_thread_limiter;
FileChecksumFunc* sst_file_checksum_func;
};
struct MutableCFOptions {

View File

@ -7,6 +7,7 @@
#include <cinttypes>
#include "db/version_edit.h"
#include "logging/logging.h"
#include "port/port.h"
#include "rocksdb/cache.h"
@ -93,7 +94,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
avoid_unnecessary_blocking_io(options.avoid_unnecessary_blocking_io),
persist_stats_to_disk(options.persist_stats_to_disk),
write_dbid_to_manifest(options.write_dbid_to_manifest),
log_readahead_size(options.log_readahead_size) {
log_readahead_size(options.log_readahead_size),
sst_file_checksum_func(options.sst_file_checksum_func) {
}
void ImmutableDBOptions::Dump(Logger* log) const {
@ -244,6 +246,10 @@ void ImmutableDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(
log, " Options.log_readahead_size: %" ROCKSDB_PRIszt,
log_readahead_size);
ROCKS_LOG_HEADER(log, " Options.sst_file_checksum_func: %s",
sst_file_checksum_func
? sst_file_checksum_func->Name()
: kUnknownFileChecksumFuncName.c_str());
}
MutableDBOptions::MutableDBOptions()

View File

@ -87,6 +87,7 @@ struct ImmutableDBOptions {
bool persist_stats_to_disk;
bool write_dbid_to_manifest;
size_t log_readahead_size;
std::shared_ptr<FileChecksumFunc> sst_file_checksum_func;
};
struct MutableDBOptions {

View File

@ -144,6 +144,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.avoid_unnecessary_blocking_io =
immutable_db_options.avoid_unnecessary_blocking_io;
options.log_readahead_size = immutable_db_options.log_readahead_size;
options.sst_file_checksum_func = immutable_db_options.sst_file_checksum_func;
return options;
}

View File

@ -199,6 +199,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
sizeof(std::vector<std::shared_ptr<EventListener>>)},
{offsetof(struct DBOptions, row_cache), sizeof(std::shared_ptr<Cache>)},
{offsetof(struct DBOptions, wal_filter), sizeof(const WalFilter*)},
{offsetof(struct DBOptions, sst_file_checksum_func),
sizeof(std::shared_ptr<FileChecksumFunc>)},
};
char* options_ptr = new char[sizeof(DBOptions)];

1
src.mk
View File

@ -173,6 +173,7 @@ LIB_SOURCES = \
util/random.cc \
util/rate_limiter.cc \
util/slice.cc \
util/file_checksum_helper.cc \
util/status.cc \
util/string_util.cc \
util/thread_local.cc \

View File

@ -1164,6 +1164,9 @@ Status BlockBasedTableBuilder::Finish() {
if (ok()) {
WriteFooter(metaindex_block_handle, index_block_handle);
}
if (r->file != nullptr) {
file_checksum_ = r->file->GetFileChecksum();
}
r->state = Rep::State::kClosed;
return r->status;
}
@ -1199,6 +1202,14 @@ TableProperties BlockBasedTableBuilder::GetTableProperties() const {
return ret;
}
const char* BlockBasedTableBuilder::GetFileChecksumFuncName() const {
if (rep_->file != nullptr) {
return rep_->file->GetFileChecksumFuncName();
} else {
return kUnknownFileChecksumFuncName.c_str();
}
}
const std::string BlockBasedTable::kFilterBlockPrefix = "filter.";
const std::string BlockBasedTable::kFullFilterBlockPrefix = "fullfilter.";
const std::string BlockBasedTable::kPartitionedFilterBlockPrefix =

View File

@ -14,6 +14,7 @@
#include <utility>
#include <vector>
#include "db/version_edit.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/listener.h"
#include "rocksdb/options.h"
@ -91,6 +92,12 @@ class BlockBasedTableBuilder : public TableBuilder {
// Get table properties
TableProperties GetTableProperties() const override;
// Get file checksum
const std::string& GetFileChecksum() const override { return file_checksum_; }
// Get file checksum function name
const char* GetFileChecksumFuncName() const override;
private:
bool ok() const { return status().ok(); }
@ -136,6 +143,9 @@ class BlockBasedTableBuilder : public TableBuilder {
// Some compression libraries fail when the raw size is bigger than int. If
// uncompressed size is bigger than kCompressionSizeLimit, don't compress it
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
// Store file checksum. If checksum is disabled, its value is "0".
std::string file_checksum_ = kUnknownFileChecksum;
};
Slice CompressBlock(const Slice& raw, const CompressionInfo& info,

View File

@ -387,6 +387,10 @@ Status CuckooTableBuilder::Finish() {
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
s = file_->Append(footer_encoding);
if (file_ != nullptr) {
file_checksum_ = file_->GetFileChecksum();
}
return s;
}
@ -512,5 +516,13 @@ bool CuckooTableBuilder::MakeSpaceForKey(
return null_found;
}
const char* CuckooTableBuilder::GetFileChecksumFuncName() const {
if (file_ != nullptr) {
return file_->GetFileChecksumFuncName();
} else {
return kUnknownFileChecksumFuncName.c_str();
}
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -10,11 +10,12 @@
#include <string>
#include <utility>
#include <vector>
#include "db/version_edit.h"
#include "port/port.h"
#include "rocksdb/status.h"
#include "table/table_builder.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/table_builder.h"
#include "util/autovector.h"
namespace rocksdb {
@ -66,6 +67,12 @@ class CuckooTableBuilder: public TableBuilder {
TableProperties GetTableProperties() const override { return properties_; }
// Get file checksum
const std::string& GetFileChecksum() const override { return file_checksum_; }
// Get file checksum function name
const char* GetFileChecksumFuncName() const override;
private:
struct CuckooBucket {
CuckooBucket()
@ -119,6 +126,9 @@ class CuckooTableBuilder: public TableBuilder {
std::string smallest_user_key_ = "";
bool closed_; // Either Finish() or Abandon() has been called.
// Store file checksum. If checksum is disabled, its value is "0"
std::string file_checksum_ = kUnknownFileChecksum;
};
} // namespace rocksdb

View File

@ -12,6 +12,7 @@
#include <string>
#include <utility>
#include "db/version_edit.h"
#include "port/port.h"
#include "rocksdb/comparator.h"
#include "rocksdb/table.h"
@ -153,10 +154,18 @@ class MockTableBuilder : public TableBuilder {
return TableProperties();
}
// Get file checksum
const std::string& GetFileChecksum() const override { return file_checksum_; }
// Get file checksum function name
const char* GetFileChecksumFuncName() const override {
return kUnknownFileChecksumFuncName.c_str();
}
private:
uint32_t id_;
MockTableFileSystem* file_system_;
stl_wrappers::KVMap table_;
std::string file_checksum_ = kUnknownFileChecksum;
};
class MockTableFactory : public TableFactory {

View File

@ -284,6 +284,9 @@ Status PlainTableBuilder::Finish() {
offset_ += footer_encoding.size();
}
if (file_ != nullptr) {
file_checksum_ = file_->GetFileChecksum();
}
return s;
}
@ -299,5 +302,13 @@ uint64_t PlainTableBuilder::FileSize() const {
return offset_;
}
const char* PlainTableBuilder::GetFileChecksumFuncName() const {
if (file_ != nullptr) {
return file_->GetFileChecksumFuncName();
} else {
return kUnknownFileChecksumFuncName.c_str();
}
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -9,6 +9,7 @@
#include <stdint.h>
#include <string>
#include <vector>
#include "db/version_edit.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
@ -83,6 +84,12 @@ class PlainTableBuilder: public TableBuilder {
bool SaveIndexInFile() const { return store_index_in_file_; }
// Get file checksum
const std::string& GetFileChecksum() const override { return file_checksum_; }
// Get file checksum function name
const char* GetFileChecksumFuncName() const override;
private:
Arena arena_;
const ImmutableCFOptions& ioptions_;
@ -108,6 +115,9 @@ class PlainTableBuilder: public TableBuilder {
const SliceTransform* prefix_extractor_;
// Store file checksum. If checksum is disabled, its value is "0".
std::string file_checksum_ = kUnknownFileChecksum;
Slice GetPrefix(const Slice& target) const {
assert(target.size() >= 8); // target is internal key
return GetPrefixFromUserKey(GetUserKey(target));

View File

@ -159,6 +159,12 @@ class TableBuilder {
// Returns table properties
virtual TableProperties GetTableProperties() const = 0;
// Return file checksum
virtual const std::string& GetFileChecksum() const = 0;
// Return file checksum function name
virtual const char* GetFileChecksumFuncName() const = 0;
};
} // namespace rocksdb

View File

@ -8,7 +8,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <stdio.h>
#include <algorithm>
#include <iostream>
#include <map>
@ -28,6 +27,7 @@
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/file_system.h"
#include "rocksdb/iterator.h"
#include "rocksdb/memtablerep.h"
@ -51,6 +51,7 @@
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/compression.h"
#include "util/file_checksum_helper.h"
#include "util/random.h"
#include "util/string_util.h"
#include "utilities/merge_operators.h"
@ -1171,6 +1172,119 @@ class PlainTableTest : public TableTest {};
class TablePropertyTest : public testing::Test {};
class BBTTailPrefetchTest : public TableTest {};
// The helper class to test the file checksum
class FileChecksumTestHelper {
public:
FileChecksumTestHelper(bool convert_to_internal_key = false)
: convert_to_internal_key_(convert_to_internal_key) {
sink_ = new test::StringSink();
}
~FileChecksumTestHelper() {}
void CreateWriteableFile() {
file_writer_.reset(test::GetWritableFileWriter(sink_, "" /* don't care */));
}
void SetFileChecksumFunc(FileChecksumFunc* checksum_func) {
if (file_writer_ != nullptr) {
file_writer_->TEST_SetFileChecksumFunc(checksum_func);
}
}
WritableFileWriter* GetFileWriter() { return file_writer_.get(); }
Status ResetTableBuilder(std::unique_ptr<TableBuilder>&& builder) {
assert(builder != nullptr);
table_builder_ = std::move(builder);
return Status::OK();
}
void AddKVtoKVMap(int num_entries) {
Random rnd(test::RandomSeed());
for (int i = 0; i < num_entries; i++) {
std::string v;
test::RandomString(&rnd, 100, &v);
kv_map_[test::RandomKey(&rnd, 20)] = v;
}
}
Status WriteKVAndFlushTable() {
for (const auto kv : kv_map_) {
if (convert_to_internal_key_) {
ParsedInternalKey ikey(kv.first, kMaxSequenceNumber, kTypeValue);
std::string encoded;
AppendInternalKey(&encoded, ikey);
table_builder_->Add(encoded, kv.second);
} else {
table_builder_->Add(kv.first, kv.second);
}
EXPECT_TRUE(table_builder_->status().ok());
}
Status s = table_builder_->Finish();
file_writer_->Flush();
EXPECT_TRUE(s.ok());
EXPECT_EQ(sink_->contents().size(), table_builder_->FileSize());
return s;
}
std::string GetFileChecksum() { return table_builder_->GetFileChecksum(); }
const char* GetFileChecksumFuncName() {
return table_builder_->GetFileChecksumFuncName();
}
Status CalculateFileChecksum(FileChecksumFunc* file_checksum_func,
std::string* checksum) {
assert(file_checksum_func != nullptr);
cur_uniq_id_ = checksum_uniq_id_++;
test::StringSink* ss_rw =
rocksdb::test::GetStringSinkFromLegacyWriter(file_writer_.get());
file_reader_.reset(test::GetRandomAccessFileReader(
new test::StringSource(ss_rw->contents())));
std::unique_ptr<char[]> scratch(new char[2048]);
Slice result;
uint64_t offset = 0;
std::string tmp_checksum;
bool first_read = true;
Status s;
s = file_reader_->Read(offset, 2048, &result, scratch.get(), false);
if (!s.ok()) {
return s;
}
while (result.size() != 0) {
if (first_read) {
first_read = false;
tmp_checksum = file_checksum_func->Value(scratch.get(), result.size());
} else {
tmp_checksum = file_checksum_func->Extend(tmp_checksum, scratch.get(),
result.size());
}
offset += static_cast<uint64_t>(result.size());
s = file_reader_->Read(offset, 2048, &result, scratch.get(), false);
if (!s.ok()) {
return s;
}
}
EXPECT_EQ(offset, static_cast<uint64_t>(table_builder_->FileSize()));
*checksum = tmp_checksum;
return Status::OK();
}
private:
bool convert_to_internal_key_;
uint64_t cur_uniq_id_;
std::unique_ptr<WritableFileWriter> file_writer_;
std::unique_ptr<RandomAccessFileReader> file_reader_;
std::unique_ptr<TableBuilder> table_builder_;
stl_wrappers::KVMap kv_map_;
test::StringSink* sink_;
static uint64_t checksum_uniq_id_;
};
uint64_t FileChecksumTestHelper::checksum_uniq_id_ = 1;
INSTANTIATE_TEST_CASE_P(FormatDef, BlockBasedTableTest,
testing::Values(test::kDefaultFormatVersion));
INSTANTIATE_TEST_CASE_P(FormatLatest, BlockBasedTableTest,
@ -3044,6 +3158,90 @@ TEST_P(BlockBasedTableTest, MemoryAllocator) {
EXPECT_GT(custom_memory_allocator->numAllocations.load(), 0);
}
// Test the file checksum of block based table
TEST_P(BlockBasedTableTest, NoFileChecksum) {
Options options;
ImmutableCFOptions ioptions(options);
MutableCFOptions moptions(options);
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
std::unique_ptr<InternalKeyComparator> comparator(
new InternalKeyComparator(BytewiseComparator()));
SequenceNumber largest_seqno = 0;
int level = 0;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
if (largest_seqno != 0) {
// Pretend that it's an external file written by SstFileWriter.
int_tbl_prop_collector_factories.emplace_back(
new SstFileWriterPropertiesCollectorFactory(2 /* version */,
0 /* global_seqno*/));
}
std::string column_family_name;
FileChecksumTestHelper f(true);
f.CreateWriteableFile();
std::unique_ptr<TableBuilder> builder;
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, *comparator,
&int_tbl_prop_collector_factories,
options.compression, options.sample_for_compression,
options.compression_opts, false /* skip_filters */,
column_family_name, level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
f.GetFileWriter()));
f.ResetTableBuilder(std::move(builder));
f.AddKVtoKVMap(1000);
f.WriteKVAndFlushTable();
ASSERT_STREQ(f.GetFileChecksumFuncName(),
kUnknownFileChecksumFuncName.c_str());
ASSERT_STREQ(f.GetFileChecksum().c_str(), kUnknownFileChecksum.c_str());
}
TEST_P(BlockBasedTableTest, Crc32FileChecksum) {
Options options;
options.sst_file_checksum_func =
std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
ImmutableCFOptions ioptions(options);
MutableCFOptions moptions(options);
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
std::unique_ptr<InternalKeyComparator> comparator(
new InternalKeyComparator(BytewiseComparator()));
SequenceNumber largest_seqno = 0;
int level = 0;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
if (largest_seqno != 0) {
// Pretend that it's an external file written by SstFileWriter.
int_tbl_prop_collector_factories.emplace_back(
new SstFileWriterPropertiesCollectorFactory(2 /* version */,
0 /* global_seqno*/));
}
std::string column_family_name;
FileChecksumTestHelper f(true);
f.CreateWriteableFile();
f.SetFileChecksumFunc(options.sst_file_checksum_func.get());
std::unique_ptr<TableBuilder> builder;
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, *comparator,
&int_tbl_prop_collector_factories,
options.compression, options.sample_for_compression,
options.compression_opts, false /* skip_filters */,
column_family_name, level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
f.GetFileWriter()));
f.ResetTableBuilder(std::move(builder));
f.AddKVtoKVMap(1000);
f.WriteKVAndFlushTable();
ASSERT_STREQ(f.GetFileChecksumFuncName(), "FileChecksumCrc32c");
std::string checksum;
ASSERT_OK(
f.CalculateFileChecksum(options.sst_file_checksum_func.get(), &checksum));
ASSERT_STREQ(f.GetFileChecksum().c_str(), checksum.c_str());
}
// Plain table is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
TEST_F(PlainTableTest, BasicPlainTableProperties) {
@ -3101,6 +3299,78 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
ASSERT_EQ(26ul, props->num_entries);
ASSERT_EQ(1ul, props->num_data_blocks);
}
TEST_F(PlainTableTest, NoFileChecksum) {
PlainTableOptions plain_table_options;
plain_table_options.user_key_len = 20;
plain_table_options.bloom_bits_per_key = 8;
plain_table_options.hash_table_ratio = 0;
PlainTableFactory factory(plain_table_options);
Options options;
const ImmutableCFOptions ioptions(options);
const MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
std::string column_family_name;
int unknown_level = -1;
FileChecksumTestHelper f(true);
f.CreateWriteableFile();
std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, 0 /* sample_for_compression */, CompressionOptions(),
false /* skip_filters */, column_family_name, unknown_level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
f.GetFileWriter()));
f.ResetTableBuilder(std::move(builder));
f.AddKVtoKVMap(1000);
f.WriteKVAndFlushTable();
ASSERT_STREQ(f.GetFileChecksumFuncName(),
kUnknownFileChecksumFuncName.c_str());
EXPECT_EQ(f.GetFileChecksum(), kUnknownFileChecksum.c_str());
}
TEST_F(PlainTableTest, Crc32FileChecksum) {
PlainTableOptions plain_table_options;
plain_table_options.user_key_len = 20;
plain_table_options.bloom_bits_per_key = 8;
plain_table_options.hash_table_ratio = 0;
PlainTableFactory factory(plain_table_options);
Options options;
options.sst_file_checksum_func =
std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
const ImmutableCFOptions ioptions(options);
const MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
std::string column_family_name;
int unknown_level = -1;
FileChecksumTestHelper f(true);
f.CreateWriteableFile();
f.SetFileChecksumFunc(options.sst_file_checksum_func.get());
std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, 0 /* sample_for_compression */, CompressionOptions(),
false /* skip_filters */, column_family_name, unknown_level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
f.GetFileWriter()));
f.ResetTableBuilder(std::move(builder));
f.AddKVtoKVMap(1000);
f.WriteKVAndFlushTable();
ASSERT_STREQ(f.GetFileChecksumFuncName(), "FileChecksumCrc32c");
std::string checksum;
ASSERT_OK(
f.CalculateFileChecksum(options.sst_file_checksum_func.get(), &checksum));
EXPECT_STREQ(f.GetFileChecksum().c_str(), checksum.c_str());
}
#endif // !ROCKSDB_LITE
TEST_F(GeneralTableTest, ApproximateOffsetOfPlain) {

View File

@ -17,6 +17,7 @@
#include "file/filename.h"
#include "port/port_dirent.h"
#include "rocksdb/cache.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/utilities/backupable_db.h"
#include "rocksdb/utilities/checkpoint.h"
@ -29,6 +30,7 @@
#include "tools/sst_dump_tool_imp.h"
#include "util/cast_util.h"
#include "util/coding.h"
#include "util/file_checksum_helper.h"
#include "util/stderr_logger.h"
#include "util/string_util.h"
#include "utilities/merge_operators.h"
@ -46,6 +48,8 @@
namespace rocksdb {
class FileChecksumFuncCrc32c;
const std::string LDBCommand::ARG_ENV_URI = "env_uri";
const std::string LDBCommand::ARG_DB = "db";
const std::string LDBCommand::ARG_PATH = "path";
@ -218,6 +222,10 @@ LDBCommand* LDBCommand::SelectCommand(const ParsedParams& parsed_params) {
return new ManifestDumpCommand(parsed_params.cmd_params,
parsed_params.option_map,
parsed_params.flags);
} else if (parsed_params.cmd == FileChecksumDumpCommand::Name()) {
return new FileChecksumDumpCommand(parsed_params.cmd_params,
parsed_params.option_map,
parsed_params.flags);
} else if (parsed_params.cmd == ListColumnFamiliesCommand::Name()) {
return new ListColumnFamiliesCommand(parsed_params.cmd_params,
parsed_params.option_map,
@ -1139,6 +1147,100 @@ void ManifestDumpCommand::DoCommand() {
}
}
// ----------------------------------------------------------------------------
namespace {
void GetLiveFilesChecksumInfoFromVersionSet(Options options,
const std::string& db_path,
FileChecksumList* checksum_list) {
EnvOptions sopt;
Status s;
std::string dbname(db_path);
std::shared_ptr<Cache> tc(NewLRUCache(options.max_open_files - 10,
options.table_cache_numshardbits));
// Notice we are using the default options not through SanitizeOptions(),
// if VersionSet::GetLiveFilesChecksumInfo depends on any option done by
// SanitizeOptions(), we need to initialize it manually.
options.db_paths.emplace_back(db_path, 0);
options.num_levels = 64;
WriteController wc(options.delayed_write_rate);
WriteBufferManager wb(options.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options);
VersionSet versions(dbname, &immutable_db_options, sopt, tc.get(), &wb, &wc,
/*block_cache_tracer=*/nullptr);
std::vector<std::string> cf_name_list;
s = versions.ListColumnFamilies(&cf_name_list, db_path,
options.file_system.get());
if (s.ok()) {
std::vector<ColumnFamilyDescriptor> cf_list;
for (const auto& name : cf_name_list) {
cf_list.emplace_back(name, ColumnFamilyOptions(options));
}
s = versions.Recover(cf_list, true);
}
if (s.ok()) {
s = versions.GetLiveFilesChecksumInfo(checksum_list);
}
if (!s.ok()) {
fprintf(stderr, "Error Status: %s", s.ToString().c_str());
}
}
} // namespace
const std::string FileChecksumDumpCommand::ARG_PATH = "path";
void FileChecksumDumpCommand::Help(std::string& ret) {
ret.append(" ");
ret.append(FileChecksumDumpCommand::Name());
ret.append(" [--" + ARG_PATH + "=<path_to_manifest_file>]");
ret.append("\n");
}
FileChecksumDumpCommand::FileChecksumDumpCommand(
const std::vector<std::string>& /*params*/,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags)
: LDBCommand(options, flags, false, BuildCmdLineOptions({ARG_PATH})),
path_("") {
std::map<std::string, std::string>::const_iterator itr =
options.find(ARG_PATH);
if (itr != options.end()) {
path_ = itr->second;
if (path_.empty()) {
exec_state_ = LDBCommandExecuteResult::Failed("--path: missing pathname");
}
}
}
void FileChecksumDumpCommand::DoCommand() {
// print out the checksum information in the following format:
// sst file number, checksum function name, checksum value
// sst file number, checksum function name, checksum value
// ......
std::unique_ptr<FileChecksumList> checksum_list(NewFileChecksumList());
GetLiveFilesChecksumInfoFromVersionSet(options_, db_path_,
checksum_list.get());
if (checksum_list != nullptr) {
std::vector<uint64_t> file_numbers;
std::vector<std::string> checksums;
std::vector<std::string> checksum_func_names;
Status s = checksum_list->GetAllFileChecksums(&file_numbers, &checksums,
&checksum_func_names);
if (s.ok()) {
for (size_t i = 0; i < file_numbers.size(); i++) {
assert(i < file_numbers.size());
assert(i < checksums.size());
assert(i < checksum_func_names.size());
fprintf(stdout, "%" PRId64 ", %s, %s\n", file_numbers[i],
checksum_func_names[i].c_str(), checksums[i].c_str());
}
}
fprintf(stdout, "Print SST file checksum information finished \n");
}
}
// ----------------------------------------------------------------------------
void ListColumnFamiliesCommand::Help(std::string& ret) {

View File

@ -171,6 +171,25 @@ class ManifestDumpCommand : public LDBCommand {
static const std::string ARG_PATH;
};
class FileChecksumDumpCommand : public LDBCommand {
public:
static std::string Name() { return "file_checksum_dump"; }
FileChecksumDumpCommand(const std::vector<std::string>& params,
const std::map<std::string, std::string>& options,
const std::vector<std::string>& flags);
static void Help(std::string& ret);
void DoCommand() override;
bool NoDBOpen() override { return true; }
private:
std::string path_;
static const std::string ARG_PATH;
};
class ListColumnFamiliesCommand : public LDBCommand {
public:
static std::string Name() { return "list_column_families"; }

View File

@ -6,10 +6,16 @@
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/ldb_cmd.h"
#include "db/version_edit.h"
#include "db/version_set.h"
#include "env/composite_env_wrapper.h"
#include "file/filename.h"
#include "port/stack_trace.h"
#include "rocksdb/file_checksum.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/file_checksum_helper.h"
using std::string;
using std::vector;
@ -103,6 +109,327 @@ TEST_F(LdbCmdTest, MemEnv) {
LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
}
class FileChecksumTestHelper {
private:
Options options_;
DB* db_;
std::string dbname_;
Status VerifyChecksum(LiveFileMetaData& file_meta) {
std::string cur_checksum;
std::string checksum_func_name;
Status s;
EnvOptions soptions;
std::unique_ptr<SequentialFile> file_reader;
std::string file_path = dbname_ + "/" + file_meta.name;
s = options_.env->NewSequentialFile(file_path, &file_reader, soptions);
if (!s.ok()) {
return s;
}
std::unique_ptr<char[]> scratch(new char[2048]);
bool first_read = true;
Slice result;
FileChecksumFunc* file_checksum_func =
options_.sst_file_checksum_func.get();
if (file_checksum_func == nullptr) {
cur_checksum = kUnknownFileChecksum;
checksum_func_name = kUnknownFileChecksumFuncName;
} else {
checksum_func_name = file_checksum_func->Name();
s = file_reader->Read(2048, &result, scratch.get());
if (!s.ok()) {
return s;
}
while (result.size() != 0) {
if (first_read) {
first_read = false;
cur_checksum =
file_checksum_func->Value(scratch.get(), result.size());
} else {
cur_checksum = file_checksum_func->Extend(cur_checksum, scratch.get(),
result.size());
}
s = file_reader->Read(2048, &result, scratch.get());
if (!s.ok()) {
return s;
}
}
}
std::string stored_checksum = file_meta.file_checksum;
std::string stored_checksum_func_name = file_meta.file_checksum_func_name;
if ((cur_checksum != stored_checksum) ||
(checksum_func_name != stored_checksum_func_name)) {
return Status::Corruption(
"Checksum does not match! The file: " + file_meta.name +
", checksum name: " + stored_checksum_func_name + " and checksum " +
stored_checksum + ". However, expected checksum name: " +
checksum_func_name + " and checksum " + cur_checksum);
}
return Status::OK();
}
public:
FileChecksumTestHelper(Options& options, DB* db, std::string db_name)
: options_(options), db_(db), dbname_(db_name) {}
~FileChecksumTestHelper() {}
// Verify the checksum information in Manifest.
Status VerifyChecksumInManifest(
const std::vector<LiveFileMetaData>& live_files) {
// Step 1: verify if the dbname_ is correct
if (dbname_[dbname_.length() - 1] != '/') {
dbname_.append("/");
}
// Step 2, get the the checksum information by recovering the VersionSet
// from Manifest.
std::unique_ptr<FileChecksumList> checksum_list(NewFileChecksumList());
EnvOptions sopt;
std::shared_ptr<Cache> tc(NewLRUCache(options_.max_open_files - 10,
options_.table_cache_numshardbits));
options_.db_paths.emplace_back(dbname_, 0);
options_.num_levels = 64;
WriteController wc(options_.delayed_write_rate);
WriteBufferManager wb(options_.db_write_buffer_size);
ImmutableDBOptions immutable_db_options(options_);
VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb,
&wc, nullptr);
std::vector<std::string> cf_name_list;
Status s;
s = versions.ListColumnFamilies(&cf_name_list, dbname_,
options_.file_system.get());
if (s.ok()) {
std::vector<ColumnFamilyDescriptor> cf_list;
for (const auto& name : cf_name_list) {
fprintf(stdout, "cf_name: %s", name.c_str());
cf_list.emplace_back(name, ColumnFamilyOptions(options_));
}
s = versions.Recover(cf_list, true);
}
if (s.ok()) {
s = versions.GetLiveFilesChecksumInfo(checksum_list.get());
}
if (!s.ok()) {
return s;
}
// Step 3 verify the checksum
if (live_files.size() != checksum_list->size()) {
return Status::Corruption("The number of files does not match!");
}
for (size_t i = 0; i < live_files.size(); i++) {
std::string stored_checksum = "";
std::string stored_func_name = "";
s = checksum_list->SearchOneFileChecksum(
live_files[i].file_number, &stored_checksum, &stored_func_name);
if (s.IsNotFound()) {
return s;
}
if (live_files[i].file_checksum != stored_checksum ||
live_files[i].file_checksum_func_name != stored_func_name) {
return Status::Corruption(
"Checksum does not match! The file: " +
ToString(live_files[i].file_number) +
". In Manifest, checksum name: " + stored_func_name +
" and checksum " + stored_checksum +
". However, expected checksum name: " +
live_files[i].file_checksum_func_name + " and checksum " +
live_files[i].file_checksum);
}
}
return Status::OK();
}
// Verify the checksum of each file by recalculting the checksum and
// comparing it with the one being generated when a SST file is created.
Status VerifyEachFileChecksum() {
assert(db_ != nullptr);
std::vector<LiveFileMetaData> live_files;
db_->GetLiveFilesMetaData(&live_files);
for (auto a_file : live_files) {
Status cs = VerifyChecksum(a_file);
if (!cs.ok()) {
return cs;
}
}
return Status::OK();
}
};
TEST_F(LdbCmdTest, DumpFileChecksumNoChecksum) {
Env* base_env = TryLoadCustomOrDefaultEnv();
std::unique_ptr<Env> env(NewMemEnv(base_env));
Options opts;
opts.env = env.get();
opts.create_if_missing = true;
opts.file_system.reset(new LegacyFileSystemWrapper(opts.env));
DB* db = nullptr;
std::string dbname = test::TmpDir();
ASSERT_OK(DB::Open(opts, dbname, &db));
WriteOptions wopts;
FlushOptions fopts;
fopts.wait = true;
Random rnd(test::RandomSeed());
for (int i = 0; i < 200; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
for (int i = 100; i < 300; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
for (int i = 200; i < 400; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
for (int i = 300; i < 400; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
char arg1[] = "./ldb";
char arg2[1024];
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
char arg3[] = "file_checksum_dump";
char* argv[] = {arg1, arg2, arg3};
ASSERT_EQ(0,
LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
// Verify each sst file checksum value and checksum name
FileChecksumTestHelper fct_helper(opts, db, dbname);
ASSERT_OK(fct_helper.VerifyEachFileChecksum());
// Manually trigger compaction
char b_buf[16];
snprintf(b_buf, sizeof(b_buf), "%08d", 0);
char e_buf[16];
snprintf(e_buf, sizeof(e_buf), "%08d", 399);
Slice begin(b_buf);
Slice end(e_buf);
CompactRangeOptions options;
ASSERT_OK(db->CompactRange(options, &begin, &end));
// Verify each sst file checksum after compaction
FileChecksumTestHelper fct_helper_ac(opts, db, dbname);
ASSERT_OK(fct_helper_ac.VerifyEachFileChecksum());
ASSERT_EQ(0,
LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
// Verify the checksum information in memory is the same as that in Manifest;
std::vector<LiveFileMetaData> live_files;
db->GetLiveFilesMetaData(&live_files);
delete db;
ASSERT_OK(fct_helper_ac.VerifyChecksumInManifest(live_files));
}
TEST_F(LdbCmdTest, DumpFileChecksumCRC32) {
Env* base_env = TryLoadCustomOrDefaultEnv();
std::unique_ptr<Env> env(NewMemEnv(base_env));
Options opts;
opts.env = env.get();
opts.create_if_missing = true;
opts.sst_file_checksum_func =
std::shared_ptr<FileChecksumFunc>(CreateFileChecksumFuncCrc32c());
opts.file_system.reset(new LegacyFileSystemWrapper(opts.env));
DB* db = nullptr;
std::string dbname = test::TmpDir();
ASSERT_OK(DB::Open(opts, dbname, &db));
WriteOptions wopts;
FlushOptions fopts;
fopts.wait = true;
Random rnd(test::RandomSeed());
for (int i = 0; i < 100; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
for (int i = 50; i < 150; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
for (int i = 100; i < 200; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
for (int i = 150; i < 250; i++) {
char buf[16];
snprintf(buf, sizeof(buf), "%08d", i);
std::string v;
test::RandomString(&rnd, 100, &v);
ASSERT_OK(db->Put(wopts, buf, v));
}
ASSERT_OK(db->Flush(fopts));
char arg1[] = "./ldb";
char arg2[1024];
snprintf(arg2, sizeof(arg2), "--db=%s", dbname.c_str());
char arg3[] = "file_checksum_dump";
char* argv[] = {arg1, arg2, arg3};
ASSERT_EQ(0,
LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
// Verify each sst file checksum value and checksum name
FileChecksumTestHelper fct_helper(opts, db, dbname);
ASSERT_OK(fct_helper.VerifyEachFileChecksum());
// Manually trigger compaction
char b_buf[16];
snprintf(b_buf, sizeof(b_buf), "%08d", 0);
char e_buf[16];
snprintf(e_buf, sizeof(e_buf), "%08d", 249);
Slice begin(b_buf);
Slice end(e_buf);
CompactRangeOptions options;
ASSERT_OK(db->CompactRange(options, &begin, &end));
// Verify each sst file checksum after compaction
FileChecksumTestHelper fct_helper_ac(opts, db, dbname);
ASSERT_OK(fct_helper_ac.VerifyEachFileChecksum());
ASSERT_EQ(0,
LDBCommandRunner::RunCommand(3, argv, opts, LDBOptions(), nullptr));
// Verify the checksum information in memory is the same as that in Manifest;
std::vector<LiveFileMetaData> live_files;
db->GetLiveFilesMetaData(&live_files);
delete db;
ASSERT_OK(fct_helper_ac.VerifyChecksumInManifest(live_files));
}
TEST_F(LdbCmdTest, OptionParsing) {
// test parsing flags
Options opts;

View File

@ -84,6 +84,7 @@ void LDBCommandRunner::PrintHelp(const LDBOptions& ldb_options,
DBDumperCommand::Help(ret);
DBLoaderCommand::Help(ret);
ManifestDumpCommand::Help(ret);
FileChecksumDumpCommand::Help(ret);
ListColumnFamiliesCommand::Help(ret);
CreateColumnFamilyCommand::Help(ret);
DropColumnFamilyCommand::Help(ret);

View File

@ -0,0 +1,85 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/file_checksum_helper.h"
namespace rocksdb {
void FileChecksumListImpl::reset() { checksum_map_.clear(); }
size_t FileChecksumListImpl::size() const { return checksum_map_.size(); }
Status FileChecksumListImpl::GetAllFileChecksums(
std::vector<uint64_t>* file_numbers, std::vector<std::string>* checksums,
std::vector<std::string>* checksum_func_names) {
if (file_numbers == nullptr || checksums == nullptr ||
checksum_func_names == nullptr) {
return Status::InvalidArgument("Pointer has not been initiated");
}
for (auto i : checksum_map_) {
file_numbers->push_back(i.first);
checksums->push_back(i.second.first);
checksum_func_names->push_back(i.second.second);
}
return Status::OK();
}
Status FileChecksumListImpl::SearchOneFileChecksum(
uint64_t file_number, std::string* checksum,
std::string* checksum_func_name) {
if (checksum == nullptr || checksum_func_name == nullptr) {
return Status::InvalidArgument("Pointer has not been initiated");
}
auto it = checksum_map_.find(file_number);
if (it == checksum_map_.end()) {
return Status::NotFound();
} else {
*checksum = it->second.first;
*checksum_func_name = it->second.second;
}
return Status::OK();
}
Status FileChecksumListImpl::InsertOneFileChecksum(
uint64_t file_number, const std::string& checksum,
const std::string& checksum_func_name) {
auto it = checksum_map_.find(file_number);
if (it == checksum_map_.end()) {
checksum_map_.insert(std::make_pair(
file_number, std::make_pair(checksum, checksum_func_name)));
} else {
it->second.first = checksum;
it->second.second = checksum_func_name;
}
return Status::OK();
}
Status FileChecksumListImpl::RemoveOneFileChecksum(uint64_t file_number) {
auto it = checksum_map_.find(file_number);
if (it == checksum_map_.end()) {
return Status::NotFound();
} else {
checksum_map_.erase(it);
}
return Status::OK();
}
FileChecksumList* NewFileChecksumList() {
FileChecksumListImpl* checksum_list = new FileChecksumListImpl();
return checksum_list;
}
FileChecksumFunc* CreateFileChecksumFuncCrc32c() {
FileChecksumFunc* file_checksum_crc32c = new FileChecksumFuncCrc32c();
return file_checksum_crc32c;
}
} // namespace rocksdb

117
util/file_checksum_helper.h Normal file
View File

@ -0,0 +1,117 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cassert>
#include <unordered_map>
#include "port/port.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/status.h"
#include "util/crc32c.h"
#include "util/string_util.h"
namespace rocksdb {
// This is the class to generate the file checksum based on Crc32. It
// will be used as the default checksum method for SST file checksum
class FileChecksumFuncCrc32c : public FileChecksumFunc {
public:
std::string Extend(const std::string& init_checksum, const char* data,
size_t n) override {
assert(data != nullptr);
uint32_t checksum_value = StringToUint32(init_checksum);
return Uint32ToString(crc32c::Extend(checksum_value, data, n));
}
std::string Value(const char* data, size_t n) override {
assert(data != nullptr);
return Uint32ToString(crc32c::Value(data, n));
}
std::string ProcessChecksum(const std::string& checksum) override {
uint32_t checksum_value = StringToUint32(checksum);
return Uint32ToString(crc32c::Mask(checksum_value));
}
const char* Name() const override { return "FileChecksumCrc32c"; }
// Convert a uint32_t type data into a 4 bytes string.
static std::string Uint32ToString(uint32_t v) {
std::string s;
if (port::kLittleEndian) {
s.append(reinterpret_cast<char*>(&v), sizeof(v));
} else {
char buf[sizeof(v)];
buf[0] = v & 0xff;
buf[1] = (v >> 8) & 0xff;
buf[2] = (v >> 16) & 0xff;
buf[3] = (v >> 24) & 0xff;
s.append(buf, sizeof(v));
}
size_t i = 0, j = s.size() - 1;
while (i < j) {
char tmp = s[i];
s[i] = s[j];
s[j] = tmp;
++i;
--j;
}
return s;
}
// Convert a 4 bytes size string into a uint32_t type data.
static uint32_t StringToUint32(std::string s) {
assert(s.size() == sizeof(uint32_t));
size_t i = 0, j = s.size() - 1;
while (i < j) {
char tmp = s[i];
s[i] = s[j];
s[j] = tmp;
++i;
--j;
}
uint32_t v = 0;
if (port::kLittleEndian) {
memcpy(&v, s.c_str(), sizeof(uint32_t));
} else {
const char* buf = s.c_str();
v |= static_cast<uint32_t>(buf[0]);
v |= (static_cast<uint32_t>(buf[1]) << 8);
v |= (static_cast<uint32_t>(buf[2]) << 16);
v |= (static_cast<uint32_t>(buf[3]) << 24);
}
return v;
}
};
// The default implementaion of FileChecksumList
class FileChecksumListImpl : public FileChecksumList {
public:
FileChecksumListImpl() {}
void reset() override;
size_t size() const override;
Status GetAllFileChecksums(
std::vector<uint64_t>* file_numbers, std::vector<std::string>* checksums,
std::vector<std::string>* checksum_func_names) override;
Status SearchOneFileChecksum(uint64_t file_number, std::string* checksum,
std::string* checksum_func_name) override;
Status InsertOneFileChecksum(uint64_t file_number,
const std::string& checksum,
const std::string& checksum_func_name) override;
Status RemoveOneFileChecksum(uint64_t file_number) override;
private:
// Key is the file number, the first portion of the value is checksum, the
// second portion of the value is checksum function name.
std::unordered_map<uint64_t, std::pair<std::string, std::string>>
checksum_map_;
};
} // namespace rocksdb