rocksdb/db/builder.cc
Andres Notzli f32a572099 Simplify querying of merge results
Summary:
While working on supporting mixing merge operators with
single deletes ( https://reviews.facebook.net/D43179 ),
I realized that returning and dealing with merge results
can be made simpler. Submitting this as a separate diff
because it is not directly related to single deletes.

Before, callers of merge helper had to retrieve the merge
result in one of two ways depending on whether the merge
was successful or not (success = result of merge was single
kTypeValue). For successful merges, the caller could query
the resulting key/value pair and for unsuccessful merges,
the result could be retrieved in the form of two deques of
keys and values. However, with single deletes, a successful merge
does not return a single key/value pair (if merge
operands are merged with a single delete, we have to generate
a value and keep the original single delete around to make
sure that we are not accidentially producing a key overwrite).
In addition, the two existing call sites of the merge
helper were taking the same actions independently from whether
the merge was successful or not, so this patch simplifies that.

Test Plan: make clean all check

Reviewers: rven, sdong, yhchiang, anthony, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43353
2015-08-17 17:34:38 -07:00

275 lines
10 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/builder.h"
#include <deque>
#include <vector>
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/internal_stats.h"
#include "db/merge_helper.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "table/block_based_table_builder.h"
#include "util/file_reader_writer.h"
#include "util/iostats_context_imp.h"
#include "util/stop_watch.h"
#include "util/thread_status_util.h"
namespace rocksdb {
class TableFactory;
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, const bool skip_filters) {
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, skip_filters),
file);
}
Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, TableCache* table_cache, Iterator* iter,
FileMetaData* meta, const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, const Env::IOPriority io_priority,
TableProperties* table_properties) {
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
Status s;
meta->fd.file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0;
iter->SeekToFirst();
// If the sequence number of the smallest entry in the memtable is
// smaller than the most recent snapshot, then we do not trigger
// removal of duplicate/deleted keys as part of this builder.
bool purge = true;
if (earliest_seqno_in_memtable <= newest_snapshot) {
purge = false;
}
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
if (iter->Valid()) {
TableBuilder* builder;
unique_ptr<WritableFileWriter> file_writer;
{
unique_ptr<WritableFile> file;
s = env->NewWritableFile(fname, &file, env_options);
if (!s.ok()) {
return s;
}
file->SetIOPriority(io_priority);
file_writer.reset(new WritableFileWriter(std::move(file), env_options));
builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
file_writer.get(), compression, compression_opts);
}
{
// the first key is the smallest key
Slice key = iter->key();
meta->smallest.DecodeFrom(key);
meta->smallest_seqno = GetInternalKeySeqno(key);
meta->largest_seqno = meta->smallest_seqno;
}
MergeHelper merge(internal_comparator.user_comparator(),
ioptions.merge_operator, ioptions.info_log,
ioptions.min_partial_merge_operands,
true /* internal key corruption is not ok */);
if (purge) {
// Ugly walkaround to avoid compiler error for release build
bool ok __attribute__((unused)) = true;
// Will write to builder if current key != prev key
ParsedInternalKey prev_ikey;
std::string prev_key;
bool is_first_key = true; // Also write if this is the very first key
while (iter->Valid()) {
bool iterator_at_next = false;
// Get current key
ParsedInternalKey this_ikey;
Slice key = iter->key();
Slice value = iter->value();
// In-memory key corruption is not ok;
// TODO: find a clean way to treat in memory key corruption
ok = ParseInternalKey(key, &this_ikey);
assert(ok);
assert(this_ikey.sequence >= earliest_seqno_in_memtable);
// If the key is the same as the previous key (and it is not the
// first key), then we skip it, since it is an older version.
// Otherwise we output the key and mark it as the "new" previous key.
if (!is_first_key && !internal_comparator.user_comparator()->Compare(
prev_ikey.user_key, this_ikey.user_key)) {
// seqno within the same key are in decreasing order
assert(this_ikey.sequence < prev_ikey.sequence);
} else {
is_first_key = false;
if (this_ikey.type == kTypeMerge) {
// TODO(tbd): Add a check here to prevent RocksDB from crash when
// reopening a DB w/o properly specifying the merge operator. But
// currently we observed a memory leak on failing in RocksDB
// recovery, so we decide to let it crash instead of causing
// memory leak for now before we have identified the real cause
// of the memory leak.
// Handle merge-type keys using the MergeHelper
// TODO: pass statistics to MergeUntil
merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
iterator_at_next = true;
// Write them out one-by-one. (Proceed back() to front())
// If the merge successfully merged the input into
// a kTypeValue, the list contains a single element.
const std::deque<std::string>& keys = merge.keys();
const std::deque<std::string>& values = merge.values();
assert(keys.size() == values.size() && keys.size() >= 1);
std::deque<std::string>::const_reverse_iterator key_iter;
std::deque<std::string>::const_reverse_iterator value_iter;
for (key_iter = keys.rbegin(), value_iter = values.rbegin();
key_iter != keys.rend() && value_iter != values.rend();
++key_iter, ++value_iter) {
builder->Add(Slice(*key_iter), Slice(*value_iter));
}
// Sanity check. Both iterators should end at the same time
assert(key_iter == keys.rend() && value_iter == values.rend());
prev_key.assign(keys.front());
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
assert(ok);
} else {
// Handle Put/Delete-type keys by simply writing them
builder->Add(key, value);
prev_key.assign(key.data(), key.size());
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
assert(ok);
}
}
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN,
IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
if (!iterator_at_next) iter->Next();
}
// The last key is the largest key
meta->largest.DecodeFrom(Slice(prev_key));
SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key));
meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
meta->largest_seqno = std::max(meta->largest_seqno, seqno);
} else {
for (; iter->Valid(); iter->Next()) {
Slice key = iter->key();
meta->largest.DecodeFrom(key);
builder->Add(key, iter->value());
SequenceNumber seqno = GetInternalKeySeqno(key);
meta->smallest_seqno = std::min(meta->smallest_seqno, seqno);
meta->largest_seqno = std::max(meta->largest_seqno, seqno);
if (io_priority == Env::IO_HIGH &&
IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::FLUSH_BYTES_WRITTEN,
IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
}
}
}
// Finish and check for builder errors
if (s.ok()) {
s = builder->Finish();
} else {
builder->Abandon();
}
if (s.ok()) {
meta->fd.file_size = builder->FileSize();
meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetFileSize() > 0);
if (table_properties) {
*table_properties = builder->GetTableProperties();
}
}
delete builder;
// Finish and check for file errors
if (s.ok() && !ioptions.disable_data_sync) {
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
file_writer->Sync(ioptions.use_fsync);
}
if (s.ok()) {
s = file_writer->Close();
}
if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
false);
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {}
s = it->status();
}
delete it;
}
}
// Check for input iterator errors
if (!iter->status().ok()) {
s = iter->status();
}
if (s.ok() && meta->fd.GetFileSize() > 0) {
// Keep it
} else {
env->DeleteFile(fname);
}
return s;
}
} // namespace rocksdb