Fix SstFileReader not able to open ingested file (#5097)
Summary:
Since `SstFileReader` don't know largest seqno of a file, it will fail this check when it open a file with global seqno: ca89ac2ba9/table/block_based_table_reader.cc (L730)
Changes:
* Pass largest_seqno=kMaxSequenceNumber from `SstFileReader` and allow it to bypass the above check.
* `BlockBasedTable::VerifyChecksum` also double check if checksum will match when excluding global seqno (this is to make the new test in sst_table_reader_test pass).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5097
Differential Revision: D14607434
Pulled By: riversand963
fbshipit-source-id: 9008599227c5fccbf9b73fee46b3bf4a1523f023
This commit is contained in:
parent
7ca9eb7542
commit
75133b1b6b
@ -9,6 +9,7 @@
|
||||
|
||||
### Bug Fixes
|
||||
* Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms.
|
||||
* Fix SstFileReader not able to open file ingested with write_glbal_seqno=true.
|
||||
|
||||
|
||||
## Unreleased
|
||||
|
@ -727,17 +727,24 @@ Status GetGlobalSequenceNumber(const TableProperties& table_properties,
|
||||
if (seqno_pos != props.end()) {
|
||||
global_seqno = DecodeFixed64(seqno_pos->second.c_str());
|
||||
}
|
||||
if (global_seqno != 0 && global_seqno != largest_seqno) {
|
||||
// SstTableReader open table reader with kMaxSequenceNumber as largest_seqno
|
||||
// to denote it is unknown.
|
||||
if (largest_seqno < kMaxSequenceNumber) {
|
||||
if (global_seqno == 0) {
|
||||
global_seqno = largest_seqno;
|
||||
}
|
||||
if (global_seqno != largest_seqno) {
|
||||
std::array<char, 200> msg_buf;
|
||||
snprintf(msg_buf.data(), msg_buf.max_size(),
|
||||
snprintf(
|
||||
msg_buf.data(), msg_buf.max_size(),
|
||||
"An external sst file with version %u have global seqno property "
|
||||
"with value %s, while largest seqno in the file is %llu",
|
||||
version, seqno_pos->second.c_str(),
|
||||
static_cast<unsigned long long>(largest_seqno));
|
||||
return Status::Corruption(msg_buf.data());
|
||||
}
|
||||
global_seqno = largest_seqno;
|
||||
*seqno = largest_seqno;
|
||||
}
|
||||
*seqno = global_seqno;
|
||||
|
||||
if (global_seqno > kMaxSequenceNumber) {
|
||||
std::array<char, 200> msg_buf;
|
||||
@ -942,6 +949,41 @@ Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len,
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno(
|
||||
Rep* rep, FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value,
|
||||
TableProperties** table_properties) {
|
||||
assert(table_properties != nullptr);
|
||||
// If this is an external SST file ingested with write_global_seqno set to
|
||||
// true, then we expect the checksum mismatch because checksum was written
|
||||
// by SstFileWriter, but its global seqno in the properties block may have
|
||||
// been changed during ingestion. In this case, we read the properties
|
||||
// block, copy it to a memory buffer, change the global seqno to its
|
||||
// original value, i.e. 0, and verify the checksum again.
|
||||
BlockHandle props_block_handle;
|
||||
CacheAllocationPtr tmp_buf;
|
||||
Status s = ReadProperties(handle_value, rep->file.get(), prefetch_buffer,
|
||||
rep->footer, rep->ioptions, table_properties,
|
||||
false /* verify_checksum */, &props_block_handle,
|
||||
&tmp_buf, false /* compression_type_missing */,
|
||||
nullptr /* memory_allocator */);
|
||||
if (s.ok() && tmp_buf) {
|
||||
const auto seqno_pos_iter =
|
||||
(*table_properties)
|
||||
->properties_offsets.find(
|
||||
ExternalSstFilePropertyNames::kGlobalSeqno);
|
||||
size_t block_size = props_block_handle.size();
|
||||
if (seqno_pos_iter != (*table_properties)->properties_offsets.end()) {
|
||||
uint64_t global_seqno_offset = seqno_pos_iter->second;
|
||||
EncodeFixed64(
|
||||
tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), 0);
|
||||
}
|
||||
uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1);
|
||||
s = rocksdb::VerifyChecksum(rep->footer.checksum(), tmp_buf.get(),
|
||||
block_size + 1, value);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BlockBasedTable::ReadPropertiesBlock(
|
||||
Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
|
||||
const SequenceNumber largest_seqno) {
|
||||
@ -965,33 +1007,8 @@ Status BlockBasedTable::ReadPropertiesBlock(
|
||||
}
|
||||
|
||||
if (s.IsCorruption()) {
|
||||
// If this is an external SST file ingested with write_global_seqno set to
|
||||
// true, then we expect the checksum mismatch because checksum was written
|
||||
// by SstFileWriter, but its global seqno in the properties block may have
|
||||
// been changed during ingestion. In this case, we read the properties
|
||||
// block, copy it to a memory buffer, change the global seqno to its
|
||||
// original value, i.e. 0, and verify the checksum again.
|
||||
BlockHandle props_block_handle;
|
||||
CacheAllocationPtr tmp_buf;
|
||||
s = ReadProperties(meta_iter->value(), rep->file.get(), prefetch_buffer,
|
||||
rep->footer, rep->ioptions, &table_properties,
|
||||
false /* verify_checksum */, &props_block_handle,
|
||||
&tmp_buf, false /* compression_type_missing */,
|
||||
nullptr /* memory_allocator */);
|
||||
if (s.ok() && tmp_buf) {
|
||||
const auto seqno_pos_iter = table_properties->properties_offsets.find(
|
||||
ExternalSstFilePropertyNames::kGlobalSeqno);
|
||||
size_t block_size = props_block_handle.size();
|
||||
if (seqno_pos_iter != table_properties->properties_offsets.end()) {
|
||||
uint64_t global_seqno_offset = seqno_pos_iter->second;
|
||||
EncodeFixed64(
|
||||
tmp_buf.get() + global_seqno_offset - props_block_handle.offset(),
|
||||
0);
|
||||
}
|
||||
uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1);
|
||||
s = rocksdb::VerifyChecksum(rep->footer.checksum(), tmp_buf.get(),
|
||||
block_size + 1, value);
|
||||
}
|
||||
s = TryReadPropertiesWithGlobalSeqno(
|
||||
rep, prefetch_buffer, meta_iter->value(), &table_properties);
|
||||
}
|
||||
std::unique_ptr<TableProperties> props_guard;
|
||||
if (table_properties != nullptr) {
|
||||
@ -2801,7 +2818,7 @@ Status BlockBasedTable::VerifyChecksum() {
|
||||
std::unique_ptr<InternalIterator> meta_iter;
|
||||
s = ReadMetaBlock(rep_, nullptr /* prefetch buffer */, &meta, &meta_iter);
|
||||
if (s.ok()) {
|
||||
s = VerifyChecksumInBlocks(meta_iter.get());
|
||||
s = VerifyChecksumInMetaBlocks(meta_iter.get());
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
@ -2848,7 +2865,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
|
||||
return s;
|
||||
}
|
||||
|
||||
Status BlockBasedTable::VerifyChecksumInBlocks(
|
||||
Status BlockBasedTable::VerifyChecksumInMetaBlocks(
|
||||
InternalIteratorBase<Slice>* index_iter) {
|
||||
Status s;
|
||||
for (index_iter->SeekToFirst(); index_iter->Valid(); index_iter->Next()) {
|
||||
@ -2866,6 +2883,13 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
|
||||
false /* decompress */, false /*maybe_compressed*/,
|
||||
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
|
||||
s = block_fetcher.ReadBlockContents();
|
||||
if (s.IsCorruption() && index_iter->key() == kPropertiesBlock) {
|
||||
TableProperties* table_properties;
|
||||
s = TryReadPropertiesWithGlobalSeqno(rep_, nullptr /* prefetch_buffer */,
|
||||
index_iter->value(),
|
||||
&table_properties);
|
||||
delete table_properties;
|
||||
}
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
|
@ -364,6 +364,9 @@ class BlockBasedTable : public TableReader {
|
||||
static Status ReadMetaBlock(Rep* rep, FilePrefetchBuffer* prefetch_buffer,
|
||||
std::unique_ptr<Block>* meta_block,
|
||||
std::unique_ptr<InternalIterator>* iter);
|
||||
static Status TryReadPropertiesWithGlobalSeqno(
|
||||
Rep* rep, FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value,
|
||||
TableProperties** table_properties);
|
||||
static Status ReadPropertiesBlock(Rep* rep,
|
||||
FilePrefetchBuffer* prefetch_buffer,
|
||||
InternalIterator* meta_iter,
|
||||
@ -382,7 +385,7 @@ class BlockBasedTable : public TableReader {
|
||||
const BlockBasedTableOptions& table_options, const int level,
|
||||
const bool prefetch_index_and_filter_in_cache);
|
||||
|
||||
Status VerifyChecksumInBlocks(InternalIteratorBase<Slice>* index_iter);
|
||||
Status VerifyChecksumInMetaBlocks(InternalIteratorBase<Slice>* index_iter);
|
||||
Status VerifyChecksumInBlocks(InternalIteratorBase<BlockHandle>* index_iter);
|
||||
|
||||
// Create the filter from the filter block.
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include "rocksdb/sst_file_reader.h"
|
||||
|
||||
#include "db/db_iter.h"
|
||||
#include "db/dbformat.h"
|
||||
#include "options/cf_options.h"
|
||||
#include "table/get_context.h"
|
||||
#include "table/table_builder.h"
|
||||
@ -49,10 +50,12 @@ Status SstFileReader::Open(const std::string& file_path) {
|
||||
file_reader.reset(new RandomAccessFileReader(std::move(file), file_path));
|
||||
}
|
||||
if (s.ok()) {
|
||||
s = r->options.table_factory->NewTableReader(
|
||||
TableReaderOptions(r->ioptions, r->moptions.prefix_extractor.get(),
|
||||
r->soptions, r->ioptions.internal_comparator),
|
||||
std::move(file_reader), file_size, &r->table_reader);
|
||||
TableReaderOptions t_opt(r->ioptions, r->moptions.prefix_extractor.get(),
|
||||
r->soptions, r->ioptions.internal_comparator);
|
||||
// Allow open file with global sequence number for backward compatibility.
|
||||
t_opt.largest_seqno = kMaxSequenceNumber;
|
||||
s = r->options.table_factory->NewTableReader(t_opt, std::move(file_reader),
|
||||
file_size, &r->table_reader);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
@ -7,8 +7,10 @@
|
||||
|
||||
#include <inttypes.h>
|
||||
|
||||
#include "rocksdb/db.h"
|
||||
#include "rocksdb/sst_file_reader.h"
|
||||
#include "rocksdb/sst_file_writer.h"
|
||||
#include "table/sst_file_writer_collectors.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
@ -34,19 +36,29 @@ class SstFileReaderTest : public testing::Test {
|
||||
sst_name_ = test::PerThreadDBPath("sst_file");
|
||||
}
|
||||
|
||||
void CreateFileAndCheck(const std::vector<std::string>& keys) {
|
||||
~SstFileReaderTest() {
|
||||
Status s = Env::Default()->DeleteFile(sst_name_);
|
||||
assert(s.ok());
|
||||
}
|
||||
|
||||
void CreateFile(const std::string& file_name,
|
||||
const std::vector<std::string>& keys) {
|
||||
SstFileWriter writer(soptions_, options_);
|
||||
ASSERT_OK(writer.Open(sst_name_));
|
||||
ASSERT_OK(writer.Open(file_name));
|
||||
for (size_t i = 0; i + 2 < keys.size(); i += 3) {
|
||||
ASSERT_OK(writer.Put(keys[i], keys[i]));
|
||||
ASSERT_OK(writer.Merge(keys[i + 1], EncodeAsUint64(i + 1)));
|
||||
ASSERT_OK(writer.Delete(keys[i + 2]));
|
||||
}
|
||||
ASSERT_OK(writer.Finish());
|
||||
}
|
||||
|
||||
void CheckFile(const std::string& file_name,
|
||||
const std::vector<std::string>& keys,
|
||||
bool check_global_seqno = false) {
|
||||
ReadOptions ropts;
|
||||
SstFileReader reader(options_);
|
||||
ASSERT_OK(reader.Open(sst_name_));
|
||||
ASSERT_OK(reader.Open(file_name));
|
||||
ASSERT_OK(reader.VerifyChecksum());
|
||||
std::unique_ptr<Iterator> iter(reader.NewIterator(ropts));
|
||||
iter->SeekToFirst();
|
||||
@ -61,6 +73,18 @@ class SstFileReaderTest : public testing::Test {
|
||||
iter->Next();
|
||||
}
|
||||
ASSERT_FALSE(iter->Valid());
|
||||
if (check_global_seqno) {
|
||||
auto properties = reader.GetTableProperties();
|
||||
ASSERT_TRUE(properties);
|
||||
auto& user_properties = properties->user_collected_properties;
|
||||
ASSERT_TRUE(
|
||||
user_properties.count(ExternalSstFilePropertyNames::kGlobalSeqno));
|
||||
}
|
||||
}
|
||||
|
||||
void CreateFileAndCheck(const std::vector<std::string>& keys) {
|
||||
CreateFile(sst_name_, keys);
|
||||
CheckFile(sst_name_, keys);
|
||||
}
|
||||
|
||||
protected:
|
||||
@ -88,6 +112,49 @@ TEST_F(SstFileReaderTest, Uint64Comparator) {
|
||||
CreateFileAndCheck(keys);
|
||||
}
|
||||
|
||||
TEST_F(SstFileReaderTest, ReadFileWithGlobalSeqno) {
|
||||
std::vector<std::string> keys;
|
||||
for (uint64_t i = 0; i < kNumKeys; i++) {
|
||||
keys.emplace_back(EncodeAsString(i));
|
||||
}
|
||||
// Generate a SST file.
|
||||
CreateFile(sst_name_, keys);
|
||||
|
||||
// Ingest the file into a db, to assign it a global sequence number.
|
||||
Options options;
|
||||
options.create_if_missing = true;
|
||||
std::string db_name = test::PerThreadDBPath("test_db");
|
||||
DB* db;
|
||||
ASSERT_OK(DB::Open(options, db_name, &db));
|
||||
// Bump sequence number.
|
||||
ASSERT_OK(db->Put(WriteOptions(), keys[0], "foo"));
|
||||
ASSERT_OK(db->Flush(FlushOptions()));
|
||||
// Ingest the file.
|
||||
IngestExternalFileOptions ingest_options;
|
||||
ingest_options.write_global_seqno = true;
|
||||
ASSERT_OK(db->IngestExternalFile({sst_name_}, ingest_options));
|
||||
std::vector<std::string> live_files;
|
||||
uint64_t manifest_file_size = 0;
|
||||
ASSERT_OK(db->GetLiveFiles(live_files, &manifest_file_size));
|
||||
// Get the ingested file.
|
||||
std::string ingested_file;
|
||||
for (auto& live_file : live_files) {
|
||||
if (live_file.substr(live_file.size() - 4, std::string::npos) == ".sst") {
|
||||
if (ingested_file.empty() || ingested_file < live_file) {
|
||||
ingested_file = live_file;
|
||||
}
|
||||
}
|
||||
}
|
||||
ASSERT_FALSE(ingested_file.empty());
|
||||
delete db;
|
||||
|
||||
// Verify the file can be open and read by SstFileReader.
|
||||
CheckFile(db_name + ingested_file, keys, true /* check_global_seqno */);
|
||||
|
||||
// Cleanup.
|
||||
ASSERT_OK(DestroyDB(db_name, options));
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -5,6 +5,8 @@
|
||||
|
||||
#pragma once
|
||||
#include <string>
|
||||
#include "db/dbformat.h"
|
||||
#include "db/table_properties_collector.h"
|
||||
#include "rocksdb/types.h"
|
||||
#include "util/string_util.h"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user