05e8854085
Summary: This diff introduces a new Merge operation into rocksdb. The purpose of this review is mostly getting feedback from the team (everyone please) on the design. Please focus on the four files under include/leveldb/, as they spell the client visible interface change. include/leveldb/db.h include/leveldb/merge_operator.h include/leveldb/options.h include/leveldb/write_batch.h Please go over local/my_test.cc carefully, as it is a concerete use case. Please also review the impelmentation files to see if the straw man implementation makes sense. Note that, the diff does pass all make check and truly supports forward iterator over db and a version of Get that's based on iterator. Future work: - Integration with compaction - A raw Get implementation I am working on a wiki that explains the design and implementation choices, but coding comes just naturally and I think it might be a good idea to share the code earlier. The code is heavily commented. Test Plan: run all local tests Reviewers: dhruba, heyongqiang Reviewed By: dhruba CC: leveldb, zshao, sheki, emayanke, MarkCallaghan Differential Revision: https://reviews.facebook.net/D9651
184 lines
5.9 KiB
C++
184 lines
5.9 KiB
C++
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/builder.h"
|
|
|
|
#include "db/filename.h"
|
|
#include "db/dbformat.h"
|
|
#include "db/merge_helper.h"
|
|
#include "db/table_cache.h"
|
|
#include "db/version_edit.h"
|
|
#include "leveldb/db.h"
|
|
#include "leveldb/env.h"
|
|
#include "leveldb/iterator.h"
|
|
|
|
namespace leveldb {
|
|
|
|
Status BuildTable(const std::string& dbname,
|
|
Env* env,
|
|
const Options& options,
|
|
const StorageOptions& soptions,
|
|
TableCache* table_cache,
|
|
Iterator* iter,
|
|
FileMetaData* meta,
|
|
const Comparator* user_comparator,
|
|
const SequenceNumber newest_snapshot,
|
|
const SequenceNumber earliest_seqno_in_memtable) {
|
|
Status s;
|
|
meta->file_size = 0;
|
|
iter->SeekToFirst();
|
|
|
|
// If the sequence number of the smallest entry in the memtable is
|
|
// smaller than the most recent snapshot, then we do not trigger
|
|
// removal of duplicate/deleted keys as part of this builder.
|
|
bool purge = options.purge_redundant_kvs_while_flush;
|
|
if (earliest_seqno_in_memtable <= newest_snapshot) {
|
|
purge = false;
|
|
}
|
|
|
|
std::string fname = TableFileName(dbname, meta->number);
|
|
if (iter->Valid()) {
|
|
unique_ptr<WritableFile> file;
|
|
s = env->NewWritableFile(fname, &file, soptions);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
TableBuilder* builder = new TableBuilder(options, file.get(), 0);
|
|
|
|
// the first key is the smallest key
|
|
Slice key = iter->key();
|
|
meta->smallest.DecodeFrom(key);
|
|
|
|
MergeHelper merge(user_comparator, options.merge_operator,
|
|
options.info_log.get(),
|
|
true /* internal key corruption is not ok */);
|
|
|
|
if (purge) {
|
|
ParsedInternalKey ikey;
|
|
// Ugly walkaround to avoid compiler error for release build
|
|
// TODO: find a clean way to treat in memory key corruption
|
|
ikey.type = kTypeValue;
|
|
ParsedInternalKey prev_ikey;
|
|
std::string prev_value;
|
|
std::string prev_key;
|
|
|
|
// Ugly walkaround to avoid compiler error for release build
|
|
// TODO: find a clean way to treat in memory key corruption
|
|
auto ok __attribute__((unused)) = ParseInternalKey(key, &ikey);
|
|
// in-memory key corruption is not ok;
|
|
assert(ok);
|
|
|
|
if (ikey.type == kTypeMerge) {
|
|
// merge values if the first entry is of merge type
|
|
merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
|
|
prev_key.assign(merge.key().data(), merge.key().size());
|
|
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
|
|
assert(ok);
|
|
prev_value.assign(merge.value().data(), merge.value().size());
|
|
} else {
|
|
// store first key-value
|
|
prev_key.assign(key.data(), key.size());
|
|
prev_value.assign(iter->value().data(), iter->value().size());
|
|
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
|
|
assert(ok);
|
|
assert(prev_ikey.sequence >= earliest_seqno_in_memtable);
|
|
iter->Next();
|
|
}
|
|
|
|
while (iter->Valid()) {
|
|
bool iterator_at_next = false;
|
|
ParsedInternalKey this_ikey;
|
|
Slice key = iter->key();
|
|
ok = ParseInternalKey(key, &this_ikey);
|
|
assert(ok);
|
|
assert(this_ikey.sequence >= earliest_seqno_in_memtable);
|
|
|
|
if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) {
|
|
// This key is different from previous key.
|
|
// Output prev key and remember current key
|
|
builder->Add(Slice(prev_key), Slice(prev_value));
|
|
if (this_ikey.type == kTypeMerge) {
|
|
merge.MergeUntil(iter, 0 /* don't worry about snapshot */);
|
|
iterator_at_next = true;
|
|
prev_key.assign(merge.key().data(), merge.key().size());
|
|
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
|
|
assert(ok);
|
|
prev_value.assign(merge.value().data(), merge.value().size());
|
|
} else {
|
|
prev_key.assign(key.data(), key.size());
|
|
prev_value.assign(iter->value().data(), iter->value().size());
|
|
ok = ParseInternalKey(Slice(prev_key), &prev_ikey);
|
|
assert(ok);
|
|
}
|
|
} else {
|
|
// seqno within the same key are in decreasing order
|
|
assert(this_ikey.sequence < prev_ikey.sequence);
|
|
// This key is an earlier version of the same key in prev_key.
|
|
// Skip current key.
|
|
}
|
|
|
|
if (!iterator_at_next) iter->Next();
|
|
}
|
|
// output last key
|
|
builder->Add(Slice(prev_key), Slice(prev_value));
|
|
meta->largest.DecodeFrom(Slice(prev_key));
|
|
|
|
} else {
|
|
for (; iter->Valid(); iter->Next()) {
|
|
Slice key = iter->key();
|
|
meta->largest.DecodeFrom(key);
|
|
builder->Add(key, iter->value());
|
|
}
|
|
}
|
|
|
|
// Finish and check for builder errors
|
|
if (s.ok()) {
|
|
s = builder->Finish();
|
|
if (s.ok()) {
|
|
meta->file_size = builder->FileSize();
|
|
assert(meta->file_size > 0);
|
|
}
|
|
} else {
|
|
builder->Abandon();
|
|
}
|
|
delete builder;
|
|
|
|
// Finish and check for file errors
|
|
if (s.ok() && !options.disableDataSync) {
|
|
if (options.use_fsync) {
|
|
s = file->Fsync();
|
|
} else {
|
|
s = file->Sync();
|
|
}
|
|
}
|
|
if (s.ok()) {
|
|
s = file->Close();
|
|
}
|
|
|
|
if (s.ok()) {
|
|
// Verify that the table is usable
|
|
Iterator* it = table_cache->NewIterator(ReadOptions(),
|
|
soptions,
|
|
meta->number,
|
|
meta->file_size);
|
|
s = it->status();
|
|
delete it;
|
|
}
|
|
}
|
|
|
|
// Check for input iterator errors
|
|
if (!iter->status().ok()) {
|
|
s = iter->status();
|
|
}
|
|
|
|
if (s.ok() && meta->file_size > 0) {
|
|
// Keep it
|
|
} else {
|
|
env->DeleteFile(fname);
|
|
}
|
|
return s;
|
|
}
|
|
|
|
} // namespace leveldb
|