rocksdb/table/block_based_table_builder.cc
Haobo Xu 5b825d6964 [RocksDB] Use raw pointer instead of shared pointer when passing Statistics object internally
Summary: liveness of the statistics object is already ensured by the shared pointer in DB options. There's no reason to pass again shared pointer among internal functions. Raw pointer is sufficient and efficient.

Test Plan: make check

Reviewers: dhruba, MarkCallaghan, igor

Reviewed By: dhruba

CC: leveldb, reconnect.grayhat

Differential Revision: https://reviews.facebook.net/D14289
2013-11-25 10:38:15 -08:00

559 lines
17 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based_table_builder.h"
#include <assert.h>
#include <inttypes.h>
#include <map>
#include <stdio.h>
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/table.h"
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "table/block_based_table_reader.h"
#include "table/block.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/stop_watch.h"
namespace rocksdb {
namespace {
struct BytewiseLessThan {
bool operator()(const std::string& key1, const std::string& key2) const {
// smaller entries will be placed in front.
return comparator->Compare(key1, key2) <= 0;
}
const Comparator* comparator = BytewiseComparator();
};
// When writing to a block that requires entries to be sorted by
// `BytewiseComparator`, we can buffer the content to `BytewiseSortedMap`
// before writng to store.
typedef std::map<std::string, std::string, BytewiseLessThan> BytewiseSortedMap;
void AddProperties(BytewiseSortedMap& props, std::string name, uint64_t val) {
assert(props.find(name) == props.end());
std::string dst;
PutVarint64(&dst, val);
props.insert(
std::make_pair(name, dst)
);
}
static bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
// Check to see if compressed less than 12.5%
return compressed_size < raw_size - (raw_size / 8u);
}
// Were we encounter any error occurs during user-defined statistics collection,
// we'll write the warning message to info log.
void LogPropertiesCollectionError(
Logger* info_log, const std::string& method, const std::string& name) {
assert(method == "Add" || method == "Finish");
std::string msg =
"[Warning] encountered error when calling TablePropertiesCollector::" +
method + "() with collector name: " + name;
Log(info_log, "%s", msg.c_str());
}
} // anonymous namespace
struct BlockBasedTableBuilder::Rep {
Options options;
WritableFile* file;
uint64_t offset = 0;
Status status;
BlockBuilder data_block;
BlockBuilder index_block;
std::string last_key;
CompressionType compression_type;
TableProperties props;
bool closed = false; // Either Finish() or Abandon() has been called.
FilterBlockBuilder* filter_block;
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
Rep(const Options& opt,
WritableFile* f,
FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
: options(opt),
file(f),
data_block(options),
// To avoid linear scan, we make the block_restart_interval to be `1`
// in index block builder
index_block(1 /* block_restart_interval */, options.comparator),
compression_type(compression_type),
filter_block(opt.filter_policy == nullptr ? nullptr
: new FilterBlockBuilder(opt)),
flush_block_policy(
flush_block_policy_factory->NewFlushBlockPolicy(data_block)) {
}
};
BlockBasedTableBuilder::BlockBasedTableBuilder(
const Options& options,
WritableFile* file,
FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
: rep_(new Rep(options,
file, flush_block_policy_factory, compression_type)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
if (options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix(options.block_cache_compressed, file,
&rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size);
}
}
BlockBasedTableBuilder::~BlockBasedTableBuilder() {
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
delete rep_->filter_block;
delete rep_;
}
void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->props.num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
auto should_flush = r->flush_block_policy->Update(key, value);
if (should_flush) {
assert(!r->data_block.empty());
Flush();
// Add item to index block.
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
if (ok()) {
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
}
}
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}
r->last_key.assign(key.data(), key.size());
r->data_block.Add(key, value);
r->props.num_entries++;
r->props.raw_key_size += key.size();
r->props.raw_value_size += value.size();
for (auto collector : r->options.table_properties_collectors) {
Status s = collector->Add(key, value);
if (!s.ok()) {
LogPropertiesCollectionError(
r->options.info_log.get(),
"Add", /* method */
collector->Name()
);
}
}
}
void BlockBasedTableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
r->props.data_size = r->offset;
++r->props.num_data_blocks;
}
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
Slice block_contents;
std::string* compressed = &r->compressed_output;
CompressionType type = r->compression_type;
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Snappy not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
case kZlibCompression:
if (port::Zlib_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// Zlib not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
case kBZip2Compression:
if (port::BZip2_Compress(r->options.compression_opts, raw.data(),
raw.size(), compressed) &&
GoodCompressionRatio(compressed->size(), raw.size())) {
block_contents = *compressed;
} else {
// BZip not supported, or not good compression ratio, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type,
BlockHandle* handle) {
Rep* r = rep_;
StopWatch sw(r->options.env, r->options.statistics.get(),
WRITE_RAW_BLOCK_MICROS);
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer+1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->status = InsertBlockInCache(block_contents, type, handle);
}
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}
Status BlockBasedTableBuilder::status() const {
return rep_->status;
}
static void DeleteCachedBlock(const Slice& key, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
}
//
// Make a copy of the block contents and insert into compressed block cache
//
Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
const CompressionType type,
const BlockHandle* handle) {
Rep* r = rep_;
Cache* block_cache_compressed = r->options.block_cache_compressed.get();
if (type != kNoCompression && block_cache_compressed != nullptr) {
Cache::Handle* cache_handle = nullptr;
size_t size = block_contents.size();
char* ubuf = new char[size]; // make a new copy
memcpy(ubuf, block_contents.data(), size);
BlockContents results;
Slice sl(ubuf, size);
results.data = sl;
results.cachable = true; // XXX
results.heap_allocated = true;
results.compression_type = type;
Block* block = new Block(results);
// make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64(
r->compressed_cache_key_prefix +
r->compressed_cache_key_prefix_size,
handle->offset());
Slice key(r->compressed_cache_key_prefix, static_cast<size_t>
(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
cache_handle = block_cache_compressed->Insert(key, block, block->size(),
&DeleteCachedBlock);
block_cache_compressed->Release(cache_handle);
// Invalidate OS cache.
r->file->InvalidateCache(r->offset, size);
}
return Status::OK();
}
Status BlockBasedTableBuilder::Finish() {
Rep* r = rep_;
bool empty_data_block = r->data_block.empty();
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle,
metaindex_block_handle,
index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
auto filter_contents = r->filter_block->Finish();
r->props.filter_size = filter_contents.size();
WriteRawBlock(filter_contents, kNoCompression, &filter_block_handle);
}
// To make sure properties block is able to keep the accurate size of index
// block, we will finish writing all index entries here and flush them
// to storage after metaindex block is written.
if (ok() && !empty_data_block) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, handle_encoding);
}
// Write meta blocks and metaindex block with the following order.
// 1. [meta block: filter]
// 2. [meta block: properties]
// 3. [metaindex block]
if (ok()) {
// We use `BytewiseComparator` as the comparator for meta block.
BlockBuilder meta_index_block(
r->options.block_restart_interval,
BytewiseComparator()
);
// Key: meta block name
// Value: block handle to that meta block
BytewiseSortedMap meta_block_handles;
// Write filter block.
if (r->filter_block != nullptr) {
// Add mapping from "<filter_block_prefix>.Name" to location
// of filter data.
std::string key = BlockBasedTable::kFilterBlockPrefix;
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_block_handles.insert(
std::make_pair(key, handle_encoding)
);
}
// Write properties block.
{
BlockBuilder properties_block(
r->options.block_restart_interval,
BytewiseComparator()
);
BytewiseSortedMap properties;
// Add basic properties
AddProperties(
properties,
BlockBasedTablePropertiesNames::kRawKeySize,
r->props.raw_key_size
);
AddProperties(
properties,
BlockBasedTablePropertiesNames::kRawValueSize,
r->props.raw_value_size
);
AddProperties(
properties,
BlockBasedTablePropertiesNames::kDataSize,
r->props.data_size
);
r->props.index_size =
r->index_block.CurrentSizeEstimate() + kBlockTrailerSize;
AddProperties(
properties,
BlockBasedTablePropertiesNames::kIndexSize,
r->props.index_size
);
AddProperties(
properties,
BlockBasedTablePropertiesNames::kNumEntries,
r->props.num_entries
);
AddProperties(
properties,
BlockBasedTablePropertiesNames::kNumDataBlocks,
r->props.num_data_blocks);
if (r->filter_block != nullptr) {
properties.insert({
BlockBasedTablePropertiesNames::kFilterPolicy,
r->options.filter_policy->Name()
});
}
AddProperties(
properties,
BlockBasedTablePropertiesNames::kFilterSize,
r->props.filter_size
);
for (auto collector : r->options.table_properties_collectors) {
TableProperties::UserCollectedProperties user_collected_properties;
Status s =
collector->Finish(&user_collected_properties);
if (!s.ok()) {
LogPropertiesCollectionError(
r->options.info_log.get(),
"Finish", /* method */
collector->Name()
);
} else {
properties.insert(
user_collected_properties.begin(),
user_collected_properties.end()
);
}
}
for (const auto& stat : properties) {
properties_block.Add(stat.first, stat.second);
}
BlockHandle properties_block_handle;
WriteBlock(&properties_block, &properties_block_handle);
std::string handle_encoding;
properties_block_handle.EncodeTo(&handle_encoding);
meta_block_handles.insert(
{ BlockBasedTable::kPropertiesBlock, handle_encoding }
);
} // end of properties block writing
for (const auto& metablock : meta_block_handles) {
meta_index_block.Add(metablock.first, metablock.second);
}
WriteBlock(&meta_index_block, &metaindex_block_handle);
} // meta blocks and metaindex block.
// Write index block
if (ok()) {
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
// Print out the table stats
if (ok()) {
// user collected properties
std::string user_collected;
user_collected.reserve(1024);
for (auto collector : r->options.table_properties_collectors) {
for (const auto& prop : collector->GetReadableProperties()) {
user_collected.append(prop.first);
user_collected.append("=");
user_collected.append(prop.second);
user_collected.append("; ");
}
}
Log(
r->options.info_log,
"Table was constructed:\n"
" [basic properties]: %s\n"
" [user collected properties]: %s",
r->props.ToString().c_str(),
user_collected.c_str()
);
}
return r->status;
}
void BlockBasedTableBuilder::Abandon() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
}
uint64_t BlockBasedTableBuilder::NumEntries() const {
return rep_->props.num_entries;
}
uint64_t BlockBasedTableBuilder::FileSize() const {
return rep_->offset;
}
} // namespace rocksdb