rocksdb/table/meta_blocks.cc
sdong 5cef458a2c RocksDB 2.8 to be able to read files generated by 2.6
Summary:
From 2.6 to 2.7, property block name is renamed from rocksdb.stats to rocksdb.properties. Older properties were not able to be loaded. In 2.8, we seem to have added some logic that uses property block without checking null pointers, which create segment faults.

In this patch, we fix it by:
(1) try rocksdb.stats if rocksdb.properties is not found
(2) add some null checking before consuming rep->table_properties

Test Plan: make sure a file generated in 2.7 couldn't be opened now can be opened.

Reviewers: haobo, igor, yhchiang

Reviewed By: igor

CC: ljin, xjin, dhruba, kailiu, leveldb

Differential Revision: https://reviews.facebook.net/D17961
2014-04-17 09:51:43 -07:00

298 lines
9.5 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "table/meta_blocks.h"
#include <map>
#include <string>
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/block.h"
#include "table/format.h"
#include "util/coding.h"
namespace rocksdb {
MetaIndexBuilder::MetaIndexBuilder()
: meta_index_block_(
new BlockBuilder(1 /* restart interval */, BytewiseComparator())) {
}
void MetaIndexBuilder::Add(const std::string& key,
const BlockHandle& handle) {
std::string handle_encoding;
handle.EncodeTo(&handle_encoding);
meta_block_handles_.insert({key, handle_encoding});
}
Slice MetaIndexBuilder::Finish() {
for (const auto& metablock : meta_block_handles_) {
meta_index_block_->Add(metablock.first, metablock.second);
}
return meta_index_block_->Finish();
}
PropertyBlockBuilder::PropertyBlockBuilder()
: properties_block_(
new BlockBuilder(1 /* restart interval */, BytewiseComparator())) {
}
void PropertyBlockBuilder::Add(const std::string& name,
const std::string& val) {
props_.insert({name, val});
}
void PropertyBlockBuilder::Add(const std::string& name, uint64_t val) {
assert(props_.find(name) == props_.end());
std::string dst;
PutVarint64(&dst, val);
Add(name, dst);
}
void PropertyBlockBuilder::Add(
const UserCollectedProperties& user_collected_properties) {
for (const auto& prop : user_collected_properties) {
Add(prop.first, prop.second);
}
}
void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
Add(TablePropertiesNames::kRawKeySize, props.raw_key_size);
Add(TablePropertiesNames::kRawValueSize, props.raw_value_size);
Add(TablePropertiesNames::kDataSize, props.data_size);
Add(TablePropertiesNames::kIndexSize, props.index_size);
Add(TablePropertiesNames::kNumEntries, props.num_entries);
Add(TablePropertiesNames::kNumDataBlocks, props.num_data_blocks);
Add(TablePropertiesNames::kFilterSize, props.filter_size);
Add(TablePropertiesNames::kFormatVersion, props.format_version);
Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len);
if (!props.filter_policy_name.empty()) {
Add(TablePropertiesNames::kFilterPolicy,
props.filter_policy_name);
}
}
Slice PropertyBlockBuilder::Finish() {
for (const auto& prop : props_) {
properties_block_->Add(prop.first, prop.second);
}
return properties_block_->Finish();
}
void LogPropertiesCollectionError(
Logger* info_log, const std::string& method, const std::string& name) {
assert(method == "Add" || method == "Finish");
std::string msg =
"[Warning] encountered error when calling TablePropertiesCollector::" +
method + "() with collector name: " + name;
Log(info_log, "%s", msg.c_str());
}
bool NotifyCollectTableCollectorsOnAdd(
const Slice& key,
const Slice& value,
const Options::TablePropertiesCollectors& collectors,
Logger* info_log) {
bool all_succeeded = true;
for (auto collector : collectors) {
Status s = collector->Add(key, value);
all_succeeded = all_succeeded && s.ok();
if (!s.ok()) {
LogPropertiesCollectionError(info_log, "Add" /* method */,
collector->Name());
}
}
return all_succeeded;
}
bool NotifyCollectTableCollectorsOnFinish(
const Options::TablePropertiesCollectors& collectors,
Logger* info_log,
PropertyBlockBuilder* builder) {
bool all_succeeded = true;
for (auto collector : collectors) {
UserCollectedProperties user_collected_properties;
Status s = collector->Finish(&user_collected_properties);
all_succeeded = all_succeeded && s.ok();
if (!s.ok()) {
LogPropertiesCollectionError(info_log, "Finish" /* method */,
collector->Name());
} else {
builder->Add(user_collected_properties);
}
}
return all_succeeded;
}
Status ReadProperties(const Slice& handle_value, RandomAccessFile* file,
Env* env, Logger* logger,
TableProperties** table_properties) {
assert(table_properties);
Slice v = handle_value;
BlockHandle handle;
if (!handle.DecodeFrom(&v).ok()) {
return Status::InvalidArgument("Failed to decode properties block handle");
}
BlockContents block_contents;
ReadOptions read_options;
read_options.verify_checksums = false;
Status s = ReadBlockContents(file, read_options, handle, &block_contents, env,
false);
if (!s.ok()) {
return s;
}
Block properties_block(block_contents);
std::unique_ptr<Iterator> iter(
properties_block.NewIterator(BytewiseComparator()));
auto new_table_properties = new TableProperties();
// All pre-defined properties of type uint64_t
std::unordered_map<std::string, uint64_t*> predefined_uint64_properties = {
{TablePropertiesNames::kDataSize, &new_table_properties->data_size},
{TablePropertiesNames::kIndexSize, &new_table_properties->index_size},
{TablePropertiesNames::kFilterSize, &new_table_properties->filter_size},
{TablePropertiesNames::kRawKeySize, &new_table_properties->raw_key_size},
{TablePropertiesNames::kRawValueSize,
&new_table_properties->raw_value_size},
{TablePropertiesNames::kNumDataBlocks,
&new_table_properties->num_data_blocks},
{TablePropertiesNames::kNumEntries, &new_table_properties->num_entries},
{TablePropertiesNames::kFormatVersion,
&new_table_properties->format_version},
{TablePropertiesNames::kFixedKeyLen,
&new_table_properties->fixed_key_len}, };
std::string last_key;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
s = iter->status();
if (!s.ok()) {
break;
}
auto key = iter->key().ToString();
// properties block is strictly sorted with no duplicate key.
assert(last_key.empty() ||
BytewiseComparator()->Compare(key, last_key) > 0);
last_key = key;
auto raw_val = iter->value();
auto pos = predefined_uint64_properties.find(key);
if (pos != predefined_uint64_properties.end()) {
// handle predefined rocksdb properties
uint64_t val;
if (!GetVarint64(&raw_val, &val)) {
// skip malformed value
auto error_msg =
"[Warning] detect malformed value in properties meta-block:"
"\tkey: " + key + "\tval: " + raw_val.ToString();
Log(logger, "%s", error_msg.c_str());
continue;
}
*(pos->second) = val;
} else if (key == TablePropertiesNames::kFilterPolicy) {
new_table_properties->filter_policy_name = raw_val.ToString();
} else {
// handle user-collected properties
new_table_properties->user_collected_properties.insert(
{key, raw_val.ToString()});
}
}
if (s.ok()) {
*table_properties = new_table_properties;
} else {
delete new_table_properties;
}
return s;
}
Status ReadTableProperties(RandomAccessFile* file, uint64_t file_size,
uint64_t table_magic_number, Env* env,
Logger* info_log, TableProperties** properties) {
// -- Read metaindex block
Footer footer(table_magic_number);
auto s = ReadFooterFromFile(file, file_size, &footer);
if (!s.ok()) {
return s;
}
auto metaindex_handle = footer.metaindex_handle();
BlockContents metaindex_contents;
ReadOptions read_options;
read_options.verify_checksums = false;
s = ReadBlockContents(file, read_options, metaindex_handle,
&metaindex_contents, env, false);
if (!s.ok()) {
return s;
}
Block metaindex_block(metaindex_contents);
std::unique_ptr<Iterator> meta_iter(
metaindex_block.NewIterator(BytewiseComparator()));
// -- Read property block
// This function is not used by BlockBasedTable, so we don't have to
// worry about old properties block name.
meta_iter->Seek(kPropertiesBlock);
TableProperties table_properties;
if (meta_iter->Valid() &&
meta_iter->key() == kPropertiesBlock &&
meta_iter->status().ok()) {
s = ReadProperties(meta_iter->value(), file, env, info_log, properties);
} else {
s = Status::Corruption(
"Unable to read the property block from the plain table");
}
return s;
}
Status ReadTableMagicNumber(const std::string& file_path,
const Options& options,
const EnvOptions& env_options,
uint64_t* table_magic_number) {
unique_ptr<RandomAccessFile> file;
Status s = options.env->NewRandomAccessFile(file_path, &file, env_options);
if (!s.ok()) {
return s;
}
uint64_t file_size;
options.env->GetFileSize(file_path, &file_size);
return ReadTableMagicNumber(file.get(), file_size, options, env_options,
table_magic_number);
}
Status ReadTableMagicNumber(RandomAccessFile* file, uint64_t file_size,
const Options& options,
const EnvOptions& env_options,
uint64_t* table_magic_number) {
if (file_size < Footer::kEncodedLength) {
return Status::InvalidArgument("file is too short to be an sstable");
}
Footer footer;
auto s = ReadFooterFromFile(file, file_size, &footer);
if (!s.ok()) {
return s;
}
*table_magic_number = footer.table_magic_number();
return Status::OK();
}
} // namespace rocksdb