rocksdb/db/version_builder.cc
Islam AbdelRahman ff4499e297 Update DB::AddFile() to have less restrictions
Summary:
Update DB::AddFile() restrictions to be
  - Key range in loaded table file don't overlap with existing keys or tombstones in DB.
  - No other writes happen during AddFile call.

The updated AddFile() will verify that the file key range don't overlap with any keys or tombstones in the DB, and then add the file to L0

Test Plan: unit tests

Reviewers: igor, rven, anthony, kradhakrishnan, sdong

Reviewed By: sdong

Subscribers: adsharma, ameyag, dhruba

Differential Revision: https://reviews.facebook.net/D49233
2015-10-30 16:38:10 -07:00

375 lines
12 KiB
C++

// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_builder.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <algorithm>
#include <atomic>
#include <set>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "db/dbformat.h"
#include "db/internal_stats.h"
#include "db/table_cache.h"
#include "db/version_set.h"
#include "table/table_reader.h"
namespace rocksdb {
bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if (a->smallest_seqno != b->smallest_seqno) {
return a->smallest_seqno > b->smallest_seqno;
}
if (a->largest_seqno != b->largest_seqno) {
return a->largest_seqno > b->largest_seqno;
}
// Break ties by file number
return a->fd.GetNumber() > b->fd.GetNumber();
}
namespace {
bool BySmallestKey(FileMetaData* a, FileMetaData* b,
const InternalKeyComparator* cmp) {
int r = cmp->Compare(a->smallest, b->smallest);
if (r != 0) {
return (r < 0);
}
// Break ties by file number
return (a->fd.GetNumber() < b->fd.GetNumber());
}
} // namespace
class VersionBuilder::Rep {
private:
// Helper to sort files_ in v
// kLevel0 -- NewestFirstBySeqNo
// kLevelNon0 -- BySmallestKey
struct FileComparator {
enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
switch (sort_method) {
case kLevel0:
return NewestFirstBySeqNo(f1, f2);
case kLevelNon0:
return BySmallestKey(f1, f2, internal_comparator);
}
assert(false);
return false;
}
};
struct LevelState {
std::unordered_set<uint64_t> deleted_files;
// Map from file number to file meta data.
std::unordered_map<uint64_t, FileMetaData*> added_files;
};
const EnvOptions& env_options_;
Logger* info_log_;
TableCache* table_cache_;
VersionStorageInfo* base_vstorage_;
LevelState* levels_;
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
public:
Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache,
VersionStorageInfo* base_vstorage)
: env_options_(env_options),
info_log_(info_log),
table_cache_(table_cache),
base_vstorage_(base_vstorage) {
levels_ = new LevelState[base_vstorage_->num_levels()];
level_zero_cmp_.sort_method = FileComparator::kLevel0;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
level_nonzero_cmp_.internal_comparator =
base_vstorage_->InternalComparator();
}
~Rep() {
for (int level = 0; level < base_vstorage_->num_levels(); level++) {
const auto& added = levels_[level].added_files;
for (auto& pair : added) {
UnrefFile(pair.second);
}
}
delete[] levels_;
}
void UnrefFile(FileMetaData* f) {
f->refs--;
if (f->refs <= 0) {
if (f->table_reader_handle) {
assert(table_cache_ != nullptr);
table_cache_->ReleaseHandle(f->table_reader_handle);
f->table_reader_handle = nullptr;
}
delete f;
}
}
void CheckConsistency(VersionStorageInfo* vstorage) {
#ifndef NDEBUG
// make sure the files are sorted correctly
for (int level = 0; level < vstorage->num_levels(); level++) {
auto& level_files = vstorage->LevelFiles(level);
for (size_t i = 1; i < level_files.size(); i++) {
auto f1 = level_files[i - 1];
auto f2 = level_files[i];
if (level == 0) {
assert(level_zero_cmp_(f1, f2));
assert(f1->largest_seqno > f2->largest_seqno ||
// We can have multiple files with seqno = 0 as a result of
// using DB::AddFile()
(f1->largest_seqno == 0 && f2->largest_seqno == 0));
} else {
assert(level_nonzero_cmp_(f1, f2));
// Make sure there is no overlap in levels > 0
if (vstorage->InternalComparator()->Compare(f1->largest,
f2->smallest) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
(f1->largest).DebugString().c_str(),
(f2->smallest).DebugString().c_str());
abort();
}
}
}
}
#endif
}
void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
int level) {
#ifndef NDEBUG
// a file to be deleted better exist in the previous version
bool found = false;
for (int l = 0; !found && l < base_vstorage_->num_levels(); l++) {
const std::vector<FileMetaData*>& base_files =
base_vstorage_->LevelFiles(l);
for (unsigned int i = 0; i < base_files.size(); i++) {
FileMetaData* f = base_files[i];
if (f->fd.GetNumber() == number) {
found = true;
break;
}
}
}
// if the file did not exist in the previous version, then it
// is possibly moved from lower level to higher level in current
// version
for (int l = level + 1; !found && l < base_vstorage_->num_levels(); l++) {
auto& level_added = levels_[l].added_files;
auto got = level_added.find(number);
if (got != level_added.end()) {
found = true;
break;
}
}
// maybe this file was added in a previous edit that was Applied
if (!found) {
auto& level_added = levels_[level].added_files;
auto got = level_added.find(number);
if (got != level_added.end()) {
found = true;
}
}
if (!found) {
fprintf(stderr, "not found %" PRIu64 "\n", number);
}
assert(found);
#endif
}
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
CheckConsistency(base_vstorage_);
// Delete files
const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
for (const auto& del_file : del) {
const auto level = del_file.first;
const auto number = del_file.second;
levels_[level].deleted_files.insert(number);
CheckConsistencyForDeletes(edit, number, level);
auto exising = levels_[level].added_files.find(number);
if (exising != levels_[level].added_files.end()) {
UnrefFile(exising->second);
levels_[level].added_files.erase(number);
}
}
// Add new files
for (const auto& new_file : edit->GetNewFiles()) {
const int level = new_file.first;
FileMetaData* f = new FileMetaData(new_file.second);
f->refs = 1;
assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
levels_[level].added_files.end());
levels_[level].deleted_files.erase(f->fd.GetNumber());
levels_[level].added_files[f->fd.GetNumber()] = f;
}
}
// Save the current state in *v.
void SaveTo(VersionStorageInfo* vstorage) {
CheckConsistency(base_vstorage_);
CheckConsistency(vstorage);
for (int level = 0; level < base_vstorage_->num_levels(); level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const auto& base_files = base_vstorage_->LevelFiles(level);
auto base_iter = base_files.begin();
auto base_end = base_files.end();
const auto& unordered_added_files = levels_[level].added_files;
vstorage->Reserve(level,
base_files.size() + unordered_added_files.size());
// Sort added files for the level.
std::vector<FileMetaData*> added_files;
added_files.reserve(unordered_added_files.size());
for (const auto& pair : unordered_added_files) {
added_files.push_back(pair.second);
}
std::sort(added_files.begin(), added_files.end(), cmp);
#ifndef NDEBUG
FileMetaData* prev_file = nullptr;
#endif
for (const auto& added : added_files) {
#ifndef NDEBUG
if (level > 0 && prev_file != nullptr) {
assert(base_vstorage_->InternalComparator()->Compare(
prev_file->smallest, added->smallest) <= 0);
}
prev_file = added;
#endif
// Add all smaller files listed in base_
for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp);
base_iter != bpos; ++base_iter) {
MaybeAddFile(vstorage, level, *base_iter);
}
MaybeAddFile(vstorage, level, added);
}
// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(vstorage, level, *base_iter);
}
}
CheckConsistency(vstorage);
}
void LoadTableHandlers(InternalStats* internal_stats, int max_threads) {
assert(table_cache_ != nullptr);
// <file metadata, level>
std::vector<std::pair<FileMetaData*, int>> files_meta;
for (int level = 0; level < base_vstorage_->num_levels(); level++) {
for (auto& file_meta_pair : levels_[level].added_files) {
auto* file_meta = file_meta_pair.second;
assert(!file_meta->table_reader_handle);
files_meta.emplace_back(file_meta, level);
}
}
std::atomic<size_t> next_file_meta_idx(0);
std::function<void()> load_handlers_func = [&]() {
while (true) {
size_t file_idx = next_file_meta_idx.fetch_add(1);
if (file_idx >= files_meta.size()) {
break;
}
auto* file_meta = files_meta[file_idx].first;
int level = files_meta[file_idx].second;
table_cache_->FindTable(env_options_,
*(base_vstorage_->InternalComparator()),
file_meta->fd, &file_meta->table_reader_handle,
false /*no_io */, true /* record_read_stats */,
internal_stats->GetFileReadHist(level));
if (file_meta->table_reader_handle != nullptr) {
// Load table_reader
file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
file_meta->table_reader_handle);
}
}
};
if (max_threads <= 1) {
load_handlers_func();
} else {
std::vector<std::thread> threads;
for (int i = 0; i < max_threads; i++) {
threads.emplace_back(load_handlers_func);
}
for (auto& t : threads) {
t.join();
}
}
}
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
// File is deleted: do nothing
} else {
vstorage->AddFile(level, f, info_log_);
}
}
};
VersionBuilder::VersionBuilder(const EnvOptions& env_options,
TableCache* table_cache,
VersionStorageInfo* base_vstorage,
Logger* info_log)
: rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
VersionBuilder::~VersionBuilder() { delete rep_; }
void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
rep_->CheckConsistency(vstorage);
}
void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
uint64_t number, int level) {
rep_->CheckConsistencyForDeletes(edit, number, level);
}
void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
rep_->SaveTo(vstorage);
}
void VersionBuilder::LoadTableHandlers(InternalStats* internal_stats,
int max_threads) {
rep_->LoadTableHandlers(internal_stats, max_threads);
}
void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
FileMetaData* f) {
rep_->MaybeAddFile(vstorage, level, f);
}
} // namespace rocksdb