rocksdb/table/block_based_table_factory.cc

478 lines
18 KiB
C++
Raw Normal View History

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based_table_factory.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <stdint.h>
#include <memory>
#include <string>
#include "options/options_helper.h"
#include "port/port.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/flush_block_policy.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "table/format.h"
#include "util/string_util.h"
namespace rocksdb {
BlockBasedTableFactory::BlockBasedTableFactory(
const BlockBasedTableOptions& _table_options)
: table_options_(_table_options) {
if (table_options_.flush_block_policy_factory == nullptr) {
table_options_.flush_block_policy_factory.reset(
new FlushBlockBySizePolicyFactory());
}
if (table_options_.no_block_cache) {
table_options_.block_cache.reset();
} else if (table_options_.block_cache == nullptr) {
table_options_.block_cache = NewLRUCache(8 << 20);
}
if (table_options_.block_size_deviation < 0 ||
table_options_.block_size_deviation > 100) {
table_options_.block_size_deviation = 0;
}
if (table_options_.block_restart_interval < 1) {
table_options_.block_restart_interval = 1;
}
if (table_options_.index_block_restart_interval < 1) {
table_options_.index_block_restart_interval = 1;
}
if (table_options_.partition_filters &&
table_options_.index_type !=
BlockBasedTableOptions::kTwoLevelIndexSearch) {
// We do not support partitioned filters without partitioning indexes
table_options_.partition_filters = false;
}
}
Status BlockBasedTableFactory::NewTableReader(
const TableReaderOptions& table_reader_options,
unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader,
bool prefetch_index_and_filter_in_cache) const {
return BlockBasedTable::Open(
table_reader_options.ioptions, table_reader_options.env_options,
table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, prefetch_index_and_filter_in_cache,
table_reader_options.skip_filters, table_reader_options.level);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
const TableBuilderOptions& table_builder_options, uint32_t column_family_id,
WritableFileWriter* file) const {
auto table_builder = new BlockBasedTableBuilder(
table_builder_options.ioptions, table_options_,
table_builder_options.internal_comparator,
table_builder_options.int_tbl_prop_collector_factories, column_family_id,
file, table_builder_options.compression_type,
table_builder_options.compression_opts,
table_builder_options.compression_dict,
table_builder_options.skip_filters,
FIFO Compaction with TTL Summary: Introducing FIFO compactions with TTL. FIFO compaction is based on size only which makes it tricky to enable in production as use cases can have organic growth. A user requested an option to drop files based on the time of their creation instead of the total size. To address that request: - Added a new TTL option to FIFO compaction options. - Updated FIFO compaction score to take TTL into consideration. - Added a new table property, creation_time, to keep track of when the SST file is created. - Creation_time is set as below: - On Flush: Set to the time of flush. - On Compaction: Set to the max creation_time of all the files involved in the compaction. - On Repair and Recovery: Set to the time of repair/recovery. - Old files created prior to this code change will have a creation_time of 0. - FIFO compaction with TTL is enabled when ttl > 0. All files older than ttl will be deleted during compaction. i.e. `if (file.creation_time < (current_time - ttl)) then delete(file)`. This will enable cases where you might want to delete all files older than, say, 1 day. - FIFO compaction will fall back to the prior way of deleting files based on size if: - the creation_time of all files involved in compaction is 0. - the total size (of all SST files combined) does not drop below `compaction_options_fifo.max_table_files_size` even if the files older than ttl are deleted. This feature is not supported if max_open_files != -1 or with table formats other than Block-based. **Test Plan:** Added tests. **Benchmark results:** Base: FIFO with max size: 100MB :: ``` svemuri@dev15905 ~/rocksdb (fifo-compaction) $ TEST_TMPDIR=/dev/shm ./db_bench --benchmarks=readwhilewriting --num=5000000 --threads=16 --compaction_style=2 --fifo_compaction_max_table_files_size_mb=100 readwhilewriting : 1.924 micros/op 519858 ops/sec; 13.6 MB/s (1176277 of 5000000 found) ``` With TTL (a low one for testing) :: ``` svemuri@dev15905 ~/rocksdb (fifo-compaction) $ TEST_TMPDIR=/dev/shm ./db_bench --benchmarks=readwhilewriting --num=5000000 --threads=16 --compaction_style=2 --fifo_compaction_max_table_files_size_mb=100 --fifo_compaction_ttl=20 readwhilewriting : 1.902 micros/op 525817 ops/sec; 13.7 MB/s (1185057 of 5000000 found) ``` Example Log lines: ``` 2017/06/26-15:17:24.609249 7fd5a45ff700 (Original Log Time 2017/06/26-15:17:24.609177) [db/compaction_picker.cc:1471] [default] FIFO compaction: picking file 40 with creation time 1498515423 for deletion 2017/06/26-15:17:24.609255 7fd5a45ff700 (Original Log Time 2017/06/26-15:17:24.609234) [db/db_impl_compaction_flush.cc:1541] [default] Deleted 1 files ... 2017/06/26-15:17:25.553185 7fd5a61a5800 [DEBUG] [db/db_impl_files.cc:309] [JOB 0] Delete /dev/shm/dbbench/000040.sst type=2 #40 -- OK 2017/06/26-15:17:25.553205 7fd5a61a5800 EVENT_LOG_v1 {"time_micros": 1498515445553199, "job": 0, "event": "table_file_deletion", "file_number": 40} ``` SST Files remaining in the dbbench dir, after db_bench execution completed: ``` svemuri@dev15905 ~/rocksdb (fifo-compaction) $ ls -l /dev/shm//dbbench/*.sst -rw-r--r--. 1 svemuri users 30749887 Jun 26 15:17 /dev/shm//dbbench/000042.sst -rw-r--r--. 1 svemuri users 30768779 Jun 26 15:17 /dev/shm//dbbench/000044.sst -rw-r--r--. 1 svemuri users 30757481 Jun 26 15:17 /dev/shm//dbbench/000046.sst ``` Closes https://github.com/facebook/rocksdb/pull/2480 Differential Revision: D5305116 Pulled By: sagar0 fbshipit-source-id: 3e5cfcf5dd07ed2211b5b37492eb235b45139174
2017-06-28 02:02:20 +02:00
table_builder_options.column_family_name,
table_builder_options.creation_time,
table_builder_options.oldest_key_time);
return table_builder;
}
Status BlockBasedTableFactory::SanitizeOptions(
const DBOptions& /*db_opts*/, const ColumnFamilyOptions& cf_opts) const {
if (table_options_.index_type == BlockBasedTableOptions::kHashSearch &&
cf_opts.prefix_extractor == nullptr) {
return Status::InvalidArgument(
"Hash index is specified for block-based "
"table, but prefix_extractor is not given");
}
if (table_options_.cache_index_and_filter_blocks &&
table_options_.no_block_cache) {
return Status::InvalidArgument(
"Enable cache_index_and_filter_blocks, "
", but block cache is disabled");
}
if (table_options_.pin_l0_filter_and_index_blocks_in_cache &&
table_options_.no_block_cache) {
return Status::InvalidArgument(
"Enable pin_l0_filter_and_index_blocks_in_cache, "
", but block cache is disabled");
}
if (!BlockBasedTableSupportedVersion(table_options_.format_version)) {
return Status::InvalidArgument(
"Unsupported BlockBasedTable format_version. Please check "
"include/rocksdb/table.h for more info");
}
if (table_options_.block_align && (cf_opts.compression != kNoCompression)) {
return Status::InvalidArgument(
"Enable block_align, but compression "
"enabled");
}
if (table_options_.block_align &&
(table_options_.block_size & (table_options_.block_size - 1))) {
return Status::InvalidArgument(
"Block alignment requested but block size is not a power of 2");
}
return Status::OK();
}
std::string BlockBasedTableFactory::GetPrintableTableOptions() const {
std::string ret;
ret.reserve(20000);
const int kBufferSize = 200;
char buffer[kBufferSize];
snprintf(buffer, kBufferSize, " flush_block_policy_factory: %s (%p)\n",
table_options_.flush_block_policy_factory->Name(),
static_cast<void*>(table_options_.flush_block_policy_factory.get()));
ret.append(buffer);
snprintf(buffer, kBufferSize, " cache_index_and_filter_blocks: %d\n",
table_options_.cache_index_and_filter_blocks);
ret.append(buffer);
snprintf(buffer, kBufferSize,
" cache_index_and_filter_blocks_with_high_priority: %d\n",
table_options_.cache_index_and_filter_blocks_with_high_priority);
ret.append(buffer);
snprintf(buffer, kBufferSize,
" pin_l0_filter_and_index_blocks_in_cache: %d\n",
table_options_.pin_l0_filter_and_index_blocks_in_cache);
ret.append(buffer);
snprintf(buffer, kBufferSize, " index_type: %d\n",
table_options_.index_type);
ret.append(buffer);
snprintf(buffer, kBufferSize, " hash_index_allow_collision: %d\n",
table_options_.hash_index_allow_collision);
ret.append(buffer);
snprintf(buffer, kBufferSize, " checksum: %d\n", table_options_.checksum);
ret.append(buffer);
snprintf(buffer, kBufferSize, " no_block_cache: %d\n",
table_options_.no_block_cache);
ret.append(buffer);
snprintf(buffer, kBufferSize, " block_cache: %p\n",
static_cast<void*>(table_options_.block_cache.get()));
ret.append(buffer);
if (table_options_.block_cache) {
const char* block_cache_name = table_options_.block_cache->Name();
if (block_cache_name != nullptr) {
snprintf(buffer, kBufferSize, " block_cache_name: %s\n",
block_cache_name);
ret.append(buffer);
}
ret.append(" block_cache_options:\n");
ret.append(table_options_.block_cache->GetPrintableOptions());
}
snprintf(buffer, kBufferSize, " block_cache_compressed: %p\n",
static_cast<void*>(table_options_.block_cache_compressed.get()));
ret.append(buffer);
if (table_options_.block_cache_compressed) {
const char* block_cache_compressed_name =
table_options_.block_cache_compressed->Name();
if (block_cache_compressed_name != nullptr) {
snprintf(buffer, kBufferSize, " block_cache_name: %s\n",
block_cache_compressed_name);
ret.append(buffer);
}
ret.append(" block_cache_compressed_options:\n");
ret.append(table_options_.block_cache_compressed->GetPrintableOptions());
}
snprintf(buffer, kBufferSize, " persistent_cache: %p\n",
static_cast<void*>(table_options_.persistent_cache.get()));
ret.append(buffer);
if (table_options_.persistent_cache) {
snprintf(buffer, kBufferSize, " persistent_cache_options:\n");
ret.append(buffer);
ret.append(table_options_.persistent_cache->GetPrintableOptions());
}
snprintf(buffer, kBufferSize, " block_size: %" ROCKSDB_PRIszt "\n",
table_options_.block_size);
ret.append(buffer);
snprintf(buffer, kBufferSize, " block_size_deviation: %d\n",
table_options_.block_size_deviation);
ret.append(buffer);
snprintf(buffer, kBufferSize, " block_restart_interval: %d\n",
table_options_.block_restart_interval);
ret.append(buffer);
snprintf(buffer, kBufferSize, " index_block_restart_interval: %d\n",
table_options_.index_block_restart_interval);
ret.append(buffer);
snprintf(buffer, kBufferSize, " metadata_block_size: %" PRIu64 "\n",
table_options_.metadata_block_size);
ret.append(buffer);
snprintf(buffer, kBufferSize, " partition_filters: %d\n",
table_options_.partition_filters);
ret.append(buffer);
snprintf(buffer, kBufferSize, " use_delta_encoding: %d\n",
table_options_.use_delta_encoding);
ret.append(buffer);
snprintf(buffer, kBufferSize, " filter_policy: %s\n",
table_options_.filter_policy == nullptr
? "nullptr"
: table_options_.filter_policy->Name());
ret.append(buffer);
snprintf(buffer, kBufferSize, " whole_key_filtering: %d\n",
table_options_.whole_key_filtering);
ret.append(buffer);
snprintf(buffer, kBufferSize, " verify_compression: %d\n",
table_options_.verify_compression);
ret.append(buffer);
snprintf(buffer, kBufferSize, " read_amp_bytes_per_bit: %d\n",
table_options_.read_amp_bytes_per_bit);
ret.append(buffer);
snprintf(buffer, kBufferSize, " format_version: %d\n",
table_options_.format_version);
ret.append(buffer);
snprintf(buffer, kBufferSize, " enable_index_compression: %d\n",
table_options_.enable_index_compression);
ret.append(buffer);
snprintf(buffer, kBufferSize, " block_align: %d\n",
table_options_.block_align);
ret.append(buffer);
return ret;
}
#ifndef ROCKSDB_LITE
namespace {
bool SerializeSingleBlockBasedTableOption(
std::string* opt_string, const BlockBasedTableOptions& bbt_options,
const std::string& name, const std::string& delimiter) {
auto iter = block_based_table_type_info.find(name);
if (iter == block_based_table_type_info.end()) {
return false;
}
auto& opt_info = iter->second;
const char* opt_address =
reinterpret_cast<const char*>(&bbt_options) + opt_info.offset;
std::string value;
bool result = SerializeSingleOptionHelper(opt_address, opt_info.type, &value);
if (result) {
*opt_string = name + "=" + value + delimiter;
}
return result;
}
} // namespace
Status BlockBasedTableFactory::GetOptionString(
std::string* opt_string, const std::string& delimiter) const {
assert(opt_string);
opt_string->clear();
for (auto iter = block_based_table_type_info.begin();
iter != block_based_table_type_info.end(); ++iter) {
if (iter->second.verification == OptionVerificationType::kDeprecated) {
// If the option is no longer used in rocksdb and marked as deprecated,
// we skip it in the serialization.
continue;
}
std::string single_output;
bool result = SerializeSingleBlockBasedTableOption(
&single_output, table_options_, iter->first, delimiter);
assert(result);
if (result) {
opt_string->append(single_output);
}
}
return Status::OK();
}
#else
Status BlockBasedTableFactory::GetOptionString(
std::string* /*opt_string*/, const std::string& /*delimiter*/) const {
return Status::OK();
}
#endif // !ROCKSDB_LITE
2015-10-30 23:58:46 +01:00
const BlockBasedTableOptions& BlockBasedTableFactory::table_options() const {
return table_options_;
}
#ifndef ROCKSDB_LITE
namespace {
std::string ParseBlockBasedTableOption(const std::string& name,
const std::string& org_value,
BlockBasedTableOptions* new_options,
bool input_strings_escaped = false,
bool ignore_unknown_options = false) {
const std::string& value =
input_strings_escaped ? UnescapeOptionString(org_value) : org_value;
if (!input_strings_escaped) {
// if the input string is not escaped, it means this function is
// invoked from SetOptions, which takes the old format.
if (name == "block_cache" || name == "block_cache_compressed") {
// cache options can be specified in the following format
// "block_cache={capacity=1M;num_shard_bits=4;
// strict_capacity_limit=true;high_pri_pool_ratio=0.5;}"
// To support backward compatibility, the following format
// is also supported.
// "block_cache=1M"
std::shared_ptr<Cache> cache;
// block_cache is specified in format block_cache=<cache_size>.
if (value.find('=') == std::string::npos) {
cache = NewLRUCache(ParseSizeT(value));
} else {
LRUCacheOptions cache_opts;
if (!ParseOptionHelper(reinterpret_cast<char*>(&cache_opts),
OptionType::kLRUCacheOptions, value)) {
return "Invalid cache options";
}
cache = NewLRUCache(cache_opts);
}
if (name == "block_cache") {
new_options->block_cache = cache;
} else {
new_options->block_cache_compressed = cache;
}
return "";
} else if (name == "filter_policy") {
// Expect the following format
// bloomfilter:int:bool
const std::string kName = "bloomfilter:";
if (value.compare(0, kName.size(), kName) != 0) {
return "Invalid filter policy name";
}
size_t pos = value.find(':', kName.size());
if (pos == std::string::npos) {
return "Invalid filter policy config, missing bits_per_key";
}
int bits_per_key =
ParseInt(trim(value.substr(kName.size(), pos - kName.size())));
bool use_block_based_builder =
ParseBoolean("use_block_based_builder", trim(value.substr(pos + 1)));
new_options->filter_policy.reset(
NewBloomFilterPolicy(bits_per_key, use_block_based_builder));
return "";
}
}
const auto iter = block_based_table_type_info.find(name);
if (iter == block_based_table_type_info.end()) {
if (ignore_unknown_options) {
return "";
} else {
return "Unrecognized option";
}
}
const auto& opt_info = iter->second;
if (opt_info.verification != OptionVerificationType::kDeprecated &&
!ParseOptionHelper(reinterpret_cast<char*>(new_options) + opt_info.offset,
opt_info.type, value)) {
return "Invalid value";
}
return "";
}
} // namespace
Status GetBlockBasedTableOptionsFromString(
const BlockBasedTableOptions& table_options, const std::string& opts_str,
BlockBasedTableOptions* new_table_options) {
std::unordered_map<std::string, std::string> opts_map;
Status s = StringToMap(opts_str, &opts_map);
if (!s.ok()) {
return s;
}
return GetBlockBasedTableOptionsFromMap(table_options, opts_map,
new_table_options);
}
Status GetBlockBasedTableOptionsFromMap(
const BlockBasedTableOptions& table_options,
const std::unordered_map<std::string, std::string>& opts_map,
BlockBasedTableOptions* new_table_options, bool input_strings_escaped,
bool ignore_unknown_options) {
assert(new_table_options);
*new_table_options = table_options;
for (const auto& o : opts_map) {
auto error_message = ParseBlockBasedTableOption(
o.first, o.second, new_table_options, input_strings_escaped,
ignore_unknown_options);
if (error_message != "") {
const auto iter = block_based_table_type_info.find(o.first);
if (iter == block_based_table_type_info.end() ||
!input_strings_escaped || // !input_strings_escaped indicates
// the old API, where everything is
// parsable.
(iter->second.verification != OptionVerificationType::kByName &&
iter->second.verification !=
OptionVerificationType::kByNameAllowNull &&
iter->second.verification !=
OptionVerificationType::kByNameAllowFromNull &&
iter->second.verification != OptionVerificationType::kDeprecated)) {
// Restore "new_options" to the default "base_options".
*new_table_options = table_options;
return Status::InvalidArgument("Can't parse BlockBasedTableOptions:",
o.first + " " + error_message);
}
}
}
return Status::OK();
}
Status VerifyBlockBasedTableFactory(
const BlockBasedTableFactory* base_tf,
const BlockBasedTableFactory* file_tf,
OptionsSanityCheckLevel sanity_check_level) {
if ((base_tf != nullptr) != (file_tf != nullptr) &&
sanity_check_level > kSanityLevelNone) {
return Status::Corruption(
"[RocksDBOptionsParser]: Inconsistent TableFactory class type");
}
if (base_tf == nullptr) {
return Status::OK();
}
assert(file_tf != nullptr);
const auto& base_opt = base_tf->table_options();
const auto& file_opt = file_tf->table_options();
for (auto& pair : block_based_table_type_info) {
if (pair.second.verification == OptionVerificationType::kDeprecated) {
// We skip checking deprecated variables as they might
// contain random values since they might not be initialized
continue;
}
if (BBTOptionSanityCheckLevel(pair.first) <= sanity_check_level) {
if (!AreEqualOptions(reinterpret_cast<const char*>(&base_opt),
reinterpret_cast<const char*>(&file_opt),
pair.second, pair.first, nullptr)) {
return Status::Corruption(
"[RocksDBOptionsParser]: "
"failed the verification on BlockBasedTableOptions::",
pair.first);
}
}
}
return Status::OK();
}
#endif // !ROCKSDB_LITE
TableFactory* NewBlockBasedTableFactory(
const BlockBasedTableOptions& _table_options) {
return new BlockBasedTableFactory(_table_options);
}
const std::string BlockBasedTableFactory::kName = "BlockBasedTable";
const std::string BlockBasedTablePropertyNames::kIndexType =
"rocksdb.block.based.table.index.type";
const std::string BlockBasedTablePropertyNames::kWholeKeyFiltering =
"rocksdb.block.based.table.whole.key.filtering";
const std::string BlockBasedTablePropertyNames::kPrefixFiltering =
"rocksdb.block.based.table.prefix.filtering";
const std::string kHashIndexPrefixesBlock = "rocksdb.hashindex.prefixes";
const std::string kHashIndexPrefixesMetadataBlock =
"rocksdb.hashindex.metadata";
const std::string kPropTrue = "1";
const std::string kPropFalse = "0";
} // namespace rocksdb