Implement XXH3 block checksum type (#9069)

Summary:
XXH3 - latest hash function that is extremely fast on large
data, easily faster than crc32c on most any x86_64 hardware. In
integrating this hash function, I have handled the compression type byte
in a non-standard way to avoid using the streaming API (extra data
movement and active code size because of hash function complexity). This
approach got a thumbs-up from Yann Collet.

Existing functionality change:
* reject bad ChecksumType in options with InvalidArgument

This change split off from https://github.com/facebook/rocksdb/issues/9058 because context-aware checksum is
likely to be handled through different configuration than ChecksumType.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9069

Test Plan:
tests updated, and substantially expanded. Unit tests now check
that we don't accidentally change the values generated by the checksum
algorithms ("schema test") and that we properly handle
invalid/unrecognized checksum types in options or in file footer.

DBTestBase::ChangeOptions (etc.) updated from two to one configuration
changing from default CRC32c ChecksumType. The point of this test code
is to detect possible interactions among features, and the likelihood of
some bad interaction being detected by including configurations other
than XXH3 and CRC32c--and then not detected by stress/crash test--is
extremely low.

Stress/crash test also updated (manual run long enough to see it accepts
new checksum type). db_bench also updated for microbenchmarking
checksums.

 ### Performance microbenchmark (PORTABLE=0 DEBUG_LEVEL=0, Broadwell processor)

./db_bench -benchmarks=crc32c,xxhash,xxhash64,xxh3,crc32c,xxhash,xxhash64,xxh3,crc32c,xxhash,xxhash64,xxh3
crc32c       :       0.200 micros/op 5005220 ops/sec; 19551.6 MB/s (4096 per op)
xxhash       :       0.807 micros/op 1238408 ops/sec; 4837.5 MB/s (4096 per op)
xxhash64     :       0.421 micros/op 2376514 ops/sec; 9283.3 MB/s (4096 per op)
xxh3         :       0.171 micros/op 5858391 ops/sec; 22884.3 MB/s (4096 per op)
crc32c       :       0.206 micros/op 4859566 ops/sec; 18982.7 MB/s (4096 per op)
xxhash       :       0.793 micros/op 1260850 ops/sec; 4925.2 MB/s (4096 per op)
xxhash64     :       0.410 micros/op 2439182 ops/sec; 9528.1 MB/s (4096 per op)
xxh3         :       0.161 micros/op 6202872 ops/sec; 24230.0 MB/s (4096 per op)
crc32c       :       0.203 micros/op 4924686 ops/sec; 19237.1 MB/s (4096 per op)
xxhash       :       0.839 micros/op 1192388 ops/sec; 4657.8 MB/s (4096 per op)
xxhash64     :       0.424 micros/op 2357391 ops/sec; 9208.6 MB/s (4096 per op)
xxh3         :       0.162 micros/op 6182678 ops/sec; 24151.1 MB/s (4096 per op)

As you can see, especially once warmed up, xxh3 is fastest.

 ### Performance macrobenchmark (PORTABLE=0 DEBUG_LEVEL=0, Broadwell processor)

Test

    for I in `seq 1 50`; do for CHK in 0 1 2 3 4; do TEST_TMPDIR=/dev/shm/rocksdb$CHK ./db_bench -benchmarks=fillseq -memtablerep=vector -allow_concurrent_memtable_write=false -num=30000000 -checksum_type=$CHK 2>&1 | grep 'micros/op' | tee -a results-$CHK & done; wait; done

Results (ops/sec)

    for FILE in results*; do echo -n "$FILE "; awk '{ s += $5; c++; } END { print 1.0 * s / c; }' < $FILE; done

results-0 252118 # kNoChecksum
results-1 251588 # kCRC32c
results-2 251863 # kxxHash
results-3 252016 # kxxHash64
results-4 252038 # kXXH3

Reviewed By: mrambacher

Differential Revision: D31905249

Pulled By: pdillinger

fbshipit-source-id: cb9b998ebe2523fc7c400eedf62124a78bf4b4d1
This commit is contained in:
Peter Dillinger 2021-10-28 22:13:47 -07:00 committed by Facebook GitHub Bot
parent f24c39ab3d
commit a7d4bea43a
16 changed files with 305 additions and 121 deletions

View File

@ -1,5 +1,8 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### New Features
* Added new ChecksumType kXXH3 which is faster than kCRC32c on almost all x86\_64 hardware.
### Bug Fixes ### Bug Fixes
* Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption. * Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption.

View File

@ -10,6 +10,7 @@
#include <cstring> #include <cstring>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "options/options_helper.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
@ -974,13 +975,14 @@ TEST_F(DBBasicTest, MultiGetEmpty) {
TEST_F(DBBasicTest, ChecksumTest) { TEST_F(DBBasicTest, ChecksumTest) {
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
Options options = CurrentOptions(); Options options = CurrentOptions();
// change when new checksum type added
int max_checksum = static_cast<int>(kxxHash64);
const int kNumPerFile = 2; const int kNumPerFile = 2;
const auto algs = GetSupportedChecksums();
const int algs_size = static_cast<int>(algs.size());
// generate one table with each type of checksum // generate one table with each type of checksum
for (int i = 0; i <= max_checksum; ++i) { for (int i = 0; i < algs_size; ++i) {
table_options.checksum = static_cast<ChecksumType>(i); table_options.checksum = algs[i];
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options); Reopen(options);
for (int j = 0; j < kNumPerFile; ++j) { for (int j = 0; j < kNumPerFile; ++j) {
@ -990,15 +992,20 @@ TEST_F(DBBasicTest, ChecksumTest) {
} }
// with each valid checksum type setting... // with each valid checksum type setting...
for (int i = 0; i <= max_checksum; ++i) { for (int i = 0; i < algs_size; ++i) {
table_options.checksum = static_cast<ChecksumType>(i); table_options.checksum = algs[i];
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options); Reopen(options);
// verify every type of checksum (should be regardless of that setting) // verify every type of checksum (should be regardless of that setting)
for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) { for (int j = 0; j < algs_size * kNumPerFile; ++j) {
ASSERT_EQ(Key(j), Get(Key(j))); ASSERT_EQ(Key(j), Get(Key(j)));
} }
} }
// Now test invalid checksum type
table_options.checksum = static_cast<ChecksumType>(123);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
ASSERT_TRUE(TryReopen(options).IsInvalidArgument());
} }
// On Windows you can have either memory mapped file or a file // On Windows you can have either memory mapped file or a file

View File

@ -475,12 +475,8 @@ Options DBTestBase::GetOptions(
case kInfiniteMaxOpenFiles: case kInfiniteMaxOpenFiles:
options.max_open_files = -1; options.max_open_files = -1;
break; break;
case kxxHashChecksum: { case kXXH3Checksum: {
table_options.checksum = kxxHash; table_options.checksum = kXXH3;
break;
}
case kxxHash64Checksum: {
table_options.checksum = kxxHash64;
break; break;
} }
case kFIFOCompaction: { case kFIFOCompaction: {

View File

@ -854,7 +854,7 @@ class DBTestBase : public testing::Test {
kUniversalCompactionMultiLevel = 20, kUniversalCompactionMultiLevel = 20,
kCompressedBlockCache = 21, kCompressedBlockCache = 21,
kInfiniteMaxOpenFiles = 22, kInfiniteMaxOpenFiles = 22,
kxxHashChecksum = 23, kXXH3Checksum = 23,
kFIFOCompaction = 24, kFIFOCompaction = 24,
kOptimizeFiltersForHits = 25, kOptimizeFiltersForHits = 25,
kRowCache = 26, kRowCache = 26,
@ -869,7 +869,6 @@ class DBTestBase : public testing::Test {
kBlockBasedTableWithPartitionedIndexFormat4, kBlockBasedTableWithPartitionedIndexFormat4,
kPartitionedFilterWithNewTableReaderForCompactions, kPartitionedFilterWithNewTableReaderForCompactions,
kUniversalSubcompactions, kUniversalSubcompactions,
kxxHash64Checksum,
kUnorderedWrite, kUnorderedWrite,
// This must be the last line // This must be the last line
kEnd, kEnd,

View File

@ -10,6 +10,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "file/filename.h" #include "file/filename.h"
#include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/sst_file_reader.h" #include "rocksdb/sst_file_reader.h"
@ -2383,10 +2384,9 @@ TEST_F(ExternalSSTFileTest, IngestFileWrittenWithCompressionDictionary) {
// Very slow, not worth the cost to run regularly // Very slow, not worth the cost to run regularly
TEST_F(ExternalSSTFileTest, DISABLED_HugeBlockChecksum) { TEST_F(ExternalSSTFileTest, DISABLED_HugeBlockChecksum) {
int max_checksum = static_cast<int>(kxxHash64); for (auto t : GetSupportedChecksums()) {
for (int i = 0; i <= max_checksum; ++i) {
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.checksum = static_cast<ChecksumType>(i); table_options.checksum = t;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));

View File

@ -49,6 +49,7 @@ enum ChecksumType : char {
kCRC32c = 0x1, kCRC32c = 0x1,
kxxHash = 0x2, kxxHash = 0x2,
kxxHash64 = 0x3, kxxHash64 = 0x3,
kXXH3 = 0x4, // Supported since RocksDB 6.27
}; };
// `PinningTier` is used to specify which tier of block-based tables should // `PinningTier` is used to specify which tier of block-based tables should

View File

@ -7,6 +7,7 @@
#include <cassert> #include <cassert>
#include <cctype> #include <cctype>
#include <cstdlib> #include <cstdlib>
#include <set>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
@ -329,7 +330,8 @@ std::unordered_map<std::string, ChecksumType>
OptionsHelper::checksum_type_string_map = {{"kNoChecksum", kNoChecksum}, OptionsHelper::checksum_type_string_map = {{"kNoChecksum", kNoChecksum},
{"kCRC32c", kCRC32c}, {"kCRC32c", kCRC32c},
{"kxxHash", kxxHash}, {"kxxHash", kxxHash},
{"kxxHash64", kxxHash64}}; {"kxxHash64", kxxHash64},
{"kXXH3", kXXH3}};
std::unordered_map<std::string, CompressionType> std::unordered_map<std::string, CompressionType>
OptionsHelper::compression_type_string_map = { OptionsHelper::compression_type_string_map = {
@ -345,25 +347,37 @@ std::unordered_map<std::string, CompressionType>
{"kDisableCompressionOption", kDisableCompressionOption}}; {"kDisableCompressionOption", kDisableCompressionOption}};
std::vector<CompressionType> GetSupportedCompressions() { std::vector<CompressionType> GetSupportedCompressions() {
std::vector<CompressionType> supported_compressions; // std::set internally to deduplicate potential name aliases
std::set<CompressionType> supported_compressions;
for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) { for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) {
CompressionType t = comp_to_name.second; CompressionType t = comp_to_name.second;
if (t != kDisableCompressionOption && CompressionTypeSupported(t)) { if (t != kDisableCompressionOption && CompressionTypeSupported(t)) {
supported_compressions.push_back(t); supported_compressions.insert(t);
} }
} }
return supported_compressions; return std::vector<CompressionType>(supported_compressions.begin(),
supported_compressions.end());
} }
std::vector<CompressionType> GetSupportedDictCompressions() { std::vector<CompressionType> GetSupportedDictCompressions() {
std::vector<CompressionType> dict_compression_types; std::set<CompressionType> dict_compression_types;
for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) { for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) {
CompressionType t = comp_to_name.second; CompressionType t = comp_to_name.second;
if (t != kDisableCompressionOption && DictCompressionTypeSupported(t)) { if (t != kDisableCompressionOption && DictCompressionTypeSupported(t)) {
dict_compression_types.push_back(t); dict_compression_types.insert(t);
} }
} }
return dict_compression_types; return std::vector<CompressionType>(dict_compression_types.begin(),
dict_compression_types.end());
}
std::vector<ChecksumType> GetSupportedChecksums() {
std::set<ChecksumType> checksum_types;
for (const auto& e : OptionsHelper::checksum_type_string_map) {
checksum_types.insert(e.second);
}
return std::vector<ChecksumType>(checksum_types.begin(),
checksum_types.end());
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE

View File

@ -28,6 +28,8 @@ std::vector<CompressionType> GetSupportedCompressions();
std::vector<CompressionType> GetSupportedDictCompressions(); std::vector<CompressionType> GetSupportedDictCompressions();
std::vector<ChecksumType> GetSupportedChecksums();
// Checks that the combination of DBOptions and ColumnFamilyOptions are valid // Checks that the combination of DBOptions and ColumnFamilyOptions are valid
Status ValidateOptions(const DBOptions& db_opts, Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts); const ColumnFamilyOptions& cf_opts);

View File

@ -1207,6 +1207,60 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
} }
} }
void BlockBasedTableBuilder::ComputeBlockTrailer(
const Slice& block_contents, CompressionType compression_type,
ChecksumType checksum_type, std::array<char, kBlockTrailerSize>* trailer) {
(*trailer)[0] = compression_type;
uint32_t checksum = 0;
switch (checksum_type) {
case kNoChecksum:
break;
case kCRC32c: {
uint32_t crc =
crc32c::Value(block_contents.data(), block_contents.size());
// Extend to cover compression type
crc = crc32c::Extend(crc, trailer->data(), 1);
checksum = crc32c::Mask(crc);
break;
}
case kxxHash: {
XXH32_state_t* const state = XXH32_createState();
XXH32_reset(state, 0);
XXH32_update(state, block_contents.data(), block_contents.size());
// Extend to cover compression type
XXH32_update(state, trailer->data(), 1);
checksum = XXH32_digest(state);
XXH32_freeState(state);
break;
}
case kxxHash64: {
XXH64_state_t* const state = XXH64_createState();
XXH64_reset(state, 0);
XXH64_update(state, block_contents.data(), block_contents.size());
// Extend to cover compression type
XXH64_update(state, trailer->data(), 1);
checksum = Lower32of64(XXH64_digest(state));
XXH64_freeState(state);
break;
}
case kXXH3: {
// XXH3 is a complicated hash function that is extremely fast on
// contiguous input, but that makes its streaming support rather
// complex. It is worth custom handling of the last byte (`type`)
// in order to avoid allocating a large state object and bringing
// that code complexity into CPU working set.
checksum = Lower32of64(
XXH3_64bits(block_contents.data(), block_contents.size()));
checksum = ModifyChecksumForCompressionType(checksum, compression_type);
break;
}
default:
assert(false);
break;
}
EncodeFixed32(trailer->data() + 1, checksum);
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, CompressionType type,
BlockHandle* handle, BlockHandle* handle,
@ -1223,50 +1277,14 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
assert(io_status().ok()); assert(io_status().ok());
io_s = r->file->Append(block_contents); io_s = r->file->Append(block_contents);
if (io_s.ok()) { if (io_s.ok()) {
char trailer[kBlockTrailerSize]; std::array<char, kBlockTrailerSize> trailer;
trailer[0] = type; ComputeBlockTrailer(block_contents, type, r->table_options.checksum,
uint32_t checksum = 0; &trailer);
switch (r->table_options.checksum) {
case kNoChecksum:
break;
case kCRC32c: {
uint32_t crc =
crc32c::Value(block_contents.data(), block_contents.size());
// Extend to cover compression type
crc = crc32c::Extend(crc, trailer, 1);
checksum = crc32c::Mask(crc);
break;
}
case kxxHash: {
XXH32_state_t* const state = XXH32_createState();
XXH32_reset(state, 0);
XXH32_update(state, block_contents.data(), block_contents.size());
// Extend to cover compression type
XXH32_update(state, trailer, 1);
checksum = XXH32_digest(state);
XXH32_freeState(state);
break;
}
case kxxHash64: {
XXH64_state_t* const state = XXH64_createState();
XXH64_reset(state, 0);
XXH64_update(state, block_contents.data(), block_contents.size());
// Extend to cover compression type
XXH64_update(state, trailer, 1);
checksum = Lower32of64(XXH64_digest(state));
XXH64_freeState(state);
break;
}
default:
assert(false);
break;
}
EncodeFixed32(trailer + 1, checksum);
assert(io_s.ok()); assert(io_s.ok());
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
static_cast<char*>(trailer)); trailer.data());
io_s = r->file->Append(Slice(trailer, kBlockTrailerSize)); io_s = r->file->Append(Slice(trailer.data(), trailer.size()));
if (io_s.ok()) { if (io_s.ok()) {
assert(s.ok()); assert(s.ok());
bool warm_cache; bool warm_cache;

View File

@ -10,6 +10,7 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <array>
#include <limits> #include <limits>
#include <string> #include <string>
#include <utility> #include <utility>
@ -20,6 +21,7 @@
#include "rocksdb/listener.h" #include "rocksdb/listener.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/meta_blocks.h" #include "table/meta_blocks.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "util/compression.h" #include "util/compression.h"
@ -98,6 +100,12 @@ class BlockBasedTableBuilder : public TableBuilder {
// Get file checksum function name // Get file checksum function name
const char* GetFileChecksumFuncName() const override; const char* GetFileChecksumFuncName() const override;
// Computes and populates block trailer for a block
static void ComputeBlockTrailer(const Slice& block_contents,
CompressionType compression_type,
ChecksumType checksum_type,
std::array<char, kBlockTrailerSize>* trailer);
private: private:
bool ok() const { return status().ok(); } bool ok() const { return status().ok(); }
@ -117,7 +125,6 @@ class BlockBasedTableBuilder : public TableBuilder {
BlockType block_type); BlockType block_type);
// Directly write data to the file. // Directly write data to the file.
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle, void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle,
BlockType block_type, const Slice* raw_data = nullptr); BlockType block_type, const Slice* raw_data = nullptr);
void SetupCacheKeyPrefix(const TableBuilderOptions& tbo); void SetupCacheKeyPrefix(const TableBuilderOptions& tbo);

View File

@ -16,11 +16,13 @@
#include <string> #include <string>
#include "logging/logging.h" #include "logging/logging.h"
#include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_type.h"
#include "table/block_based/block_based_table_builder.h" #include "table/block_based/block_based_table_builder.h"
#include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_based_table_reader.h"
@ -564,6 +566,14 @@ Status BlockBasedTableFactory::ValidateOptions(
"max_successive_merges larger than 0 is currently inconsistent with " "max_successive_merges larger than 0 is currently inconsistent with "
"unordered_write"); "unordered_write");
} }
std::string garbage;
if (!SerializeEnum<ChecksumType>(checksum_type_string_map,
table_options_.checksum, &garbage)) {
return Status::InvalidArgument(
"Unrecognized ChecksumType for checksum: " +
ROCKSDB_NAMESPACE::ToString(
static_cast<uint32_t>(table_options_.checksum)));
}
return TableFactory::ValidateOptions(db_opts, cf_opts); return TableFactory::ValidateOptions(db_opts, cf_opts);
} }

View File

@ -9,6 +9,7 @@
#include "table/block_based/reader_common.h" #include "table/block_based/reader_common.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "table/format.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/hash.h" #include "util/hash.h"
@ -47,6 +48,11 @@ Status VerifyBlockChecksum(ChecksumType type, const char* data,
case kxxHash64: case kxxHash64:
computed = Lower32of64(XXH64(data, len, 0)); computed = Lower32of64(XXH64(data, len, 0));
break; break;
case kXXH3:
computed = Lower32of64(XXH3_64bits(data, block_size));
// Treat compression type separately for speed in building table files
computed = ModifyChecksumForCompressionType(computed, data[block_size]);
break;
default: default:
s = Status::Corruption( s = Status::Corruption(
"unknown checksum type " + ToString(type) + " from footer of " + "unknown checksum type " + ToString(type) + " from footer of " +
@ -56,8 +62,9 @@ Status VerifyBlockChecksum(ChecksumType type, const char* data,
if (s.ok() && stored != computed) { if (s.ok() && stored != computed) {
s = Status::Corruption( s = Status::Corruption(
"block checksum mismatch: stored = " + ToString(stored) + "block checksum mismatch: stored = " + ToString(stored) +
", computed = " + ToString(computed) + " in " + file_name + ", computed = " + ToString(computed) + ", type = " + ToString(type) +
" offset " + ToString(offset) + " size " + ToString(block_size)); " in " + file_name + " offset " + ToString(offset) + " size " +
ToString(block_size));
} }
return s; return s;
} }

View File

@ -134,6 +134,7 @@ class Footer {
// Use this constructor when you plan to write out the footer using // Use this constructor when you plan to write out the footer using
// EncodeTo(). Never use this constructor with DecodeFrom(). // EncodeTo(). Never use this constructor with DecodeFrom().
// `version` is same as `format_version` for block-based table.
Footer(uint64_t table_magic_number, uint32_t version); Footer(uint64_t table_magic_number, uint32_t version);
// The version of the footer in this file // The version of the footer in this file
@ -225,6 +226,18 @@ inline CompressionType get_block_compression_type(const char* block_data,
return static_cast<CompressionType>(block_data[block_size]); return static_cast<CompressionType>(block_data[block_size]);
} }
// Custom handling for the last byte of a block, to avoid invoking streaming
// API to get an effective block checksum. This function is its own inverse
// because it uses xor.
inline uint32_t ModifyChecksumForCompressionType(uint32_t checksum,
char compression_type) {
// This strategy bears some resemblance to extending a CRC checksum by one
// more byte, except we don't need to re-mix the input checksum as long as
// we do this step only once (per checksum).
const uint32_t kRandomPrime = 0x6b9083d9;
return checksum ^ static_cast<uint8_t>(compression_type) * kRandomPrime;
}
// Represents the contents of a block read from an SST file. Depending on how // Represents the contents of a block read from an SST file. Depending on how
// it's created, it may or may not own the actual block bytes. As an example, // it's created, it may or may not own the actual block bytes. As an example,
// BlockContents objects representing data read from mmapped files only point // BlockContents objects representing data read from mmapped files only point

View File

@ -7,6 +7,9 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/table.h"
#include <gtest/gtest.h>
#include <stddef.h> #include <stddef.h>
#include <stdio.h> #include <stdio.h>
@ -26,8 +29,10 @@
#include "memtable/stl_wrappers.h" #include "memtable/stl_wrappers.h"
#include "meta_blocks.h" #include "meta_blocks.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
#include "options/options_helper.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/file_checksum.h" #include "rocksdb/file_checksum.h"
@ -2193,6 +2198,115 @@ TEST_P(BlockBasedTableTest, SkipPrefixBloomFilter) {
} }
} }
TEST_P(BlockBasedTableTest, BadChecksumType) {
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
Options options;
options.comparator = BytewiseComparator();
options.table_factory.reset(new BlockBasedTableFactory(table_options));
TableConstructor c(options.comparator);
InternalKey key("abc", 1, kTypeValue);
c.Add(key.Encode().ToString(), "test");
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
const ImmutableOptions ioptions(options);
const MutableCFOptions moptions(options);
const InternalKeyComparator internal_comparator(options.comparator);
c.Finish(options, ioptions, moptions, table_options, internal_comparator,
&keys, &kvmap);
// Corrupt checksum type (123 is invalid)
auto& sink = *c.TEST_GetSink();
size_t len = sink.contents_.size();
ASSERT_EQ(sink.contents_[len - Footer::kNewVersionsEncodedLength], kCRC32c);
sink.contents_[len - Footer::kNewVersionsEncodedLength] = char{123};
// (Re-)Open table file with bad checksum type
const ImmutableOptions new_ioptions(options);
const MutableCFOptions new_moptions(options);
Status s = c.Reopen(new_ioptions, new_moptions);
ASSERT_NOK(s);
ASSERT_MATCHES_REGEX(s.ToString(), "Corruption: unknown checksum type 123.*");
}
namespace {
std::string TrailerAsString(const std::string& contents,
CompressionType compression_type,
ChecksumType checksum_type) {
std::array<char, kBlockTrailerSize> trailer;
BlockBasedTableBuilder::ComputeBlockTrailer(contents, compression_type,
checksum_type, &trailer);
return Slice(trailer.data(), trailer.size()).ToString(/*hex*/ true);
}
} // namespace
// Make sure that checksum values don't change in later versions, even if
// consistent within current version. (Other tests check for consistency
// between written checksums and read-time validation, so here we only
// have to verify the writer side.)
TEST_P(BlockBasedTableTest, ChecksumSchemas) {
std::string b1 = "This is a short block!";
std::string b2;
for (int i = 0; i < 100; ++i) {
b2.append("This is a long block!");
}
CompressionType ct1 = kNoCompression;
CompressionType ct2 = kSnappyCompression;
CompressionType ct3 = kZSTD;
// Note: first byte of trailer is compression type, last 4 are checksum
for (ChecksumType t : GetSupportedChecksums()) {
switch (t) {
case kNoChecksum:
EXPECT_EQ(TrailerAsString(b1, ct1, t), "0000000000");
EXPECT_EQ(TrailerAsString(b1, ct2, t), "0100000000");
EXPECT_EQ(TrailerAsString(b1, ct3, t), "0700000000");
EXPECT_EQ(TrailerAsString(b2, ct1, t), "0000000000");
EXPECT_EQ(TrailerAsString(b2, ct2, t), "0100000000");
EXPECT_EQ(TrailerAsString(b2, ct3, t), "0700000000");
break;
case kCRC32c:
EXPECT_EQ(TrailerAsString(b1, ct1, t), "00583F0355");
EXPECT_EQ(TrailerAsString(b1, ct2, t), "012F9B0A57");
EXPECT_EQ(TrailerAsString(b1, ct3, t), "07ECE7DA1D");
EXPECT_EQ(TrailerAsString(b2, ct1, t), "00943EF0AB");
EXPECT_EQ(TrailerAsString(b2, ct2, t), "0143A2EDB1");
EXPECT_EQ(TrailerAsString(b2, ct3, t), "0700E53D63");
break;
case kxxHash:
EXPECT_EQ(TrailerAsString(b1, ct1, t), "004A2E5FB0");
EXPECT_EQ(TrailerAsString(b1, ct2, t), "010BD9F652");
EXPECT_EQ(TrailerAsString(b1, ct3, t), "07B4107E50");
EXPECT_EQ(TrailerAsString(b2, ct1, t), "0020F4D4BA");
EXPECT_EQ(TrailerAsString(b2, ct2, t), "018F1A1F99");
EXPECT_EQ(TrailerAsString(b2, ct3, t), "07A191A338");
break;
case kxxHash64:
EXPECT_EQ(TrailerAsString(b1, ct1, t), "00B74655EF");
EXPECT_EQ(TrailerAsString(b1, ct2, t), "01B6C8BBBE");
EXPECT_EQ(TrailerAsString(b1, ct3, t), "07AED9E3B4");
EXPECT_EQ(TrailerAsString(b2, ct1, t), "000D4999FE");
EXPECT_EQ(TrailerAsString(b2, ct2, t), "01F5932423");
EXPECT_EQ(TrailerAsString(b2, ct3, t), "076B31BAB1");
break;
case kXXH3:
EXPECT_EQ(TrailerAsString(b1, ct1, t), "00B37FB5E6");
EXPECT_EQ(TrailerAsString(b1, ct2, t), "016AFC258D");
EXPECT_EQ(TrailerAsString(b1, ct3, t), "075CE54616");
EXPECT_EQ(TrailerAsString(b2, ct1, t), "00FA2D482E");
EXPECT_EQ(TrailerAsString(b2, ct2, t), "0123AED845");
EXPECT_EQ(TrailerAsString(b2, ct3, t), "0715B7BBDE");
break;
default:
// Force this test to be updated on new ChecksumTypes
assert(false);
break;
}
}
}
void AddInternalKey(TableConstructor* c, const std::string& prefix, void AddInternalKey(TableConstructor* c, const std::string& prefix,
std::string value = "v", int /*suffix_len*/ = 800) { std::string value = "v", int /*suffix_len*/ = 800) {
static Random rnd(1023); static Random rnd(1023);
@ -4036,40 +4150,20 @@ TEST(TableTest, FooterTests) {
ASSERT_EQ(decoded_footer.index_handle().size(), index.size()); ASSERT_EQ(decoded_footer.index_handle().size(), index.size());
ASSERT_EQ(decoded_footer.version(), 0U); ASSERT_EQ(decoded_footer.version(), 0U);
} }
{ for (auto t : GetSupportedChecksums()) {
// xxhash block based // block based, various checksums
std::string encoded; std::string encoded;
Footer footer(kBlockBasedTableMagicNumber, 1); Footer footer(kBlockBasedTableMagicNumber, 1);
BlockHandle meta_index(10, 5), index(20, 15); BlockHandle meta_index(10, 5), index(20, 15);
footer.set_metaindex_handle(meta_index); footer.set_metaindex_handle(meta_index);
footer.set_index_handle(index); footer.set_index_handle(index);
footer.set_checksum(kxxHash); footer.set_checksum(t);
footer.EncodeTo(&encoded); footer.EncodeTo(&encoded);
Footer decoded_footer; Footer decoded_footer;
Slice encoded_slice(encoded); Slice encoded_slice(encoded);
ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice)); ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
ASSERT_EQ(decoded_footer.table_magic_number(), kBlockBasedTableMagicNumber); ASSERT_EQ(decoded_footer.table_magic_number(), kBlockBasedTableMagicNumber);
ASSERT_EQ(decoded_footer.checksum(), kxxHash); ASSERT_EQ(decoded_footer.checksum(), t);
ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
ASSERT_EQ(decoded_footer.metaindex_handle().size(), meta_index.size());
ASSERT_EQ(decoded_footer.index_handle().offset(), index.offset());
ASSERT_EQ(decoded_footer.index_handle().size(), index.size());
ASSERT_EQ(decoded_footer.version(), 1U);
}
{
// xxhash64 block based
std::string encoded;
Footer footer(kBlockBasedTableMagicNumber, 1);
BlockHandle meta_index(10, 5), index(20, 15);
footer.set_metaindex_handle(meta_index);
footer.set_index_handle(index);
footer.set_checksum(kxxHash64);
footer.EncodeTo(&encoded);
Footer decoded_footer;
Slice encoded_slice(encoded);
ASSERT_OK(decoded_footer.DecodeFrom(&encoded_slice));
ASSERT_EQ(decoded_footer.table_magic_number(), kBlockBasedTableMagicNumber);
ASSERT_EQ(decoded_footer.checksum(), kxxHash64);
ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset()); ASSERT_EQ(decoded_footer.metaindex_handle().offset(), meta_index.offset());
ASSERT_EQ(decoded_footer.metaindex_handle().size(), meta_index.size()); ASSERT_EQ(decoded_footer.metaindex_handle().size(), meta_index.size());
ASSERT_EQ(decoded_footer.index_handle().offset(), index.offset()); ASSERT_EQ(decoded_footer.index_handle().offset(), index.offset());
@ -4098,7 +4192,7 @@ TEST(TableTest, FooterTests) {
ASSERT_EQ(decoded_footer.version(), 0U); ASSERT_EQ(decoded_footer.version(), 0U);
} }
{ {
// xxhash block based // xxhash plain table (not currently used)
std::string encoded; std::string encoded;
Footer footer(kPlainTableMagicNumber, 1); Footer footer(kPlainTableMagicNumber, 1);
BlockHandle meta_index(10, 5), index(20, 15); BlockHandle meta_index(10, 5), index(20, 15);

View File

@ -59,6 +59,7 @@
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/stats_history.h" #include "rocksdb/stats_history.h"
#include "rocksdb/table.h"
#include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/options_type.h" #include "rocksdb/utilities/options_type.h"
@ -147,6 +148,8 @@ IF_ROCKSDB_LITE("",
"fill100K," "fill100K,"
"crc32c," "crc32c,"
"xxhash," "xxhash,"
"xxhash64,"
"xxh3,"
"compress," "compress,"
"uncompress," "uncompress,"
"acquireload," "acquireload,"
@ -205,8 +208,10 @@ IF_ROCKSDB_LITE("",
"overwrite\n" "overwrite\n"
"\tseekrandomwhilemerging -- seekrandom and 1 thread doing " "\tseekrandomwhilemerging -- seekrandom and 1 thread doing "
"merge\n" "merge\n"
"\tcrc32c -- repeated crc32c of 4K of data\n" "\tcrc32c -- repeated crc32c of <block size> data\n"
"\txxhash -- repeated xxHash of 4K of data\n" "\txxhash -- repeated xxHash of <block size> data\n"
"\txxhash64 -- repeated xxHash64 of <block size> data\n"
"\txxh3 -- repeated XXH3 of <block size> data\n"
"\tacquireload -- load N*1000 times\n" "\tacquireload -- load N*1000 times\n"
"\tfillseekseq -- write N values in sequential key, then read " "\tfillseekseq -- write N values in sequential key, then read "
"them by seeking to each key\n" "them by seeking to each key\n"
@ -733,6 +738,10 @@ DEFINE_bool(verify_checksum, true,
"Verify checksum for every block read" "Verify checksum for every block read"
" from storage"); " from storage");
DEFINE_int32(checksum_type,
ROCKSDB_NAMESPACE::BlockBasedTableOptions().checksum,
"ChecksumType as an int");
DEFINE_bool(statistics, false, "Database statistics"); DEFINE_bool(statistics, false, "Database statistics");
DEFINE_int32(stats_level, ROCKSDB_NAMESPACE::StatsLevel::kExceptDetailedTimers, DEFINE_int32(stats_level, ROCKSDB_NAMESPACE::StatsLevel::kExceptDetailedTimers,
"stats level for statistics"); "stats level for statistics");
@ -3434,6 +3443,10 @@ class Benchmark {
method = &Benchmark::Crc32c; method = &Benchmark::Crc32c;
} else if (name == "xxhash") { } else if (name == "xxhash") {
method = &Benchmark::xxHash; method = &Benchmark::xxHash;
} else if (name == "xxhash64") {
method = &Benchmark::xxHash64;
} else if (name == "xxh3") {
method = &Benchmark::xxh3;
} else if (name == "acquireload") { } else if (name == "acquireload") {
method = &Benchmark::AcquireLoad; method = &Benchmark::AcquireLoad;
} else if (name == "compress") { } else if (name == "compress") {
@ -3778,44 +3791,42 @@ class Benchmark {
return merge_stats; return merge_stats;
} }
void Crc32c(ThreadState* thread) { template <OperationType kOpType, typename FnType, typename... Args>
// Checksum about 500MB of data total static inline void ChecksumBenchmark(FnType fn, ThreadState* thread,
Args... args) {
const int size = FLAGS_block_size; // use --block_size option for db_bench const int size = FLAGS_block_size; // use --block_size option for db_bench
std::string labels = "(" + ToString(FLAGS_block_size) + " per op)"; std::string labels = "(" + ToString(FLAGS_block_size) + " per op)";
const char* label = labels.c_str(); const char* label = labels.c_str();
std::string data(size, 'x'); std::string data(size, 'x');
int64_t bytes = 0; uint64_t bytes = 0;
uint32_t crc = 0; uint32_t val = 0;
while (bytes < 500 * 1048576) { while (bytes < 5000U * uint64_t{1048576}) { // ~5GB
crc = crc32c::Value(data.data(), size); val += static_cast<uint32_t>(fn(data.data(), size, args...));
thread->stats.FinishedOps(nullptr, nullptr, 1, kCrc); thread->stats.FinishedOps(nullptr, nullptr, 1, kOpType);
bytes += size; bytes += size;
} }
// Print so result is not dead // Print so result is not dead
fprintf(stderr, "... crc=0x%x\r", static_cast<unsigned int>(crc)); fprintf(stderr, "... val=0x%x\r", static_cast<unsigned int>(val));
thread->stats.AddBytes(bytes); thread->stats.AddBytes(bytes);
thread->stats.AddMessage(label); thread->stats.AddMessage(label);
} }
void xxHash(ThreadState* thread) { void Crc32c(ThreadState* thread) {
// Checksum about 500MB of data total ChecksumBenchmark<kCrc>(crc32c::Value, thread);
const int size = 4096; }
const char* label = "(4K per op)";
std::string data(size, 'x');
int64_t bytes = 0;
unsigned int xxh32 = 0;
while (bytes < 500 * 1048576) {
xxh32 = XXH32(data.data(), size, 0);
thread->stats.FinishedOps(nullptr, nullptr, 1, kHash);
bytes += size;
}
// Print so result is not dead
fprintf(stderr, "... xxh32=0x%x\r", static_cast<unsigned int>(xxh32));
thread->stats.AddBytes(bytes); void xxHash(ThreadState* thread) {
thread->stats.AddMessage(label); ChecksumBenchmark<kHash>(XXH32, thread, /*seed*/ 0);
}
void xxHash64(ThreadState* thread) {
ChecksumBenchmark<kHash>(XXH64, thread, /*seed*/ 0);
}
void xxh3(ThreadState* thread) {
ChecksumBenchmark<kHash>(XXH3_64bits, thread);
} }
void AcquireLoad(ThreadState* thread) { void AcquireLoad(ThreadState* thread) {
@ -4067,6 +4078,8 @@ class Benchmark {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} else { } else {
BlockBasedTableOptions block_based_options; BlockBasedTableOptions block_based_options;
block_based_options.checksum =
static_cast<ChecksumType>(FLAGS_checksum_type);
if (FLAGS_use_hash_search) { if (FLAGS_use_hash_search) {
if (FLAGS_prefix_size == 0) { if (FLAGS_prefix_size == 0) {
fprintf(stderr, fprintf(stderr,

View File

@ -45,7 +45,7 @@ default_params = {
random.choice( random.choice(
["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress", ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress",
"zstd"]), "zstd"]),
"checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]), "checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64", "kXXH3"]),
"compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1), "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
"compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1), "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
# Disabled compression_parallel_threads as the feature is not stable # Disabled compression_parallel_threads as the feature is not stable