Merge branch 'master' of https://github.com/facebook/rocksdb
This commit is contained in:
commit
6b835c6009
@ -2,6 +2,9 @@
|
||||
|
||||
## Unreleased
|
||||
|
||||
### New Features
|
||||
* HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory().
|
||||
|
||||
|
||||
## 3.2.0 (06/20/2014)
|
||||
|
||||
|
@ -54,7 +54,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
|
||||
purge = false;
|
||||
}
|
||||
|
||||
std::string fname = TableFileName(dbname, meta->fd.GetNumber());
|
||||
std::string fname = TableFileName(options.db_paths, meta->fd.GetNumber(),
|
||||
meta->fd.GetPathId());
|
||||
if (iter->Valid()) {
|
||||
unique_ptr<WritableFile> file;
|
||||
s = env->NewWritableFile(fname, &file, soptions);
|
||||
|
@ -495,7 +495,7 @@ int main(int argc, char** argv) {
|
||||
rocksdb_filterpolicy_t* policy = rocksdb_filterpolicy_create_bloom(10);
|
||||
rocksdb_options_set_filter_policy(options, policy);
|
||||
rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3));
|
||||
rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4);
|
||||
rocksdb_options_set_hash_skip_list_rep(options, 5000, 4, 4);
|
||||
rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16);
|
||||
|
||||
db = rocksdb_open(options, dbname, &err);
|
||||
|
@ -224,8 +224,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
|
||||
if (dummy_versions != nullptr) {
|
||||
internal_stats_.reset(new InternalStats(
|
||||
options_.num_levels, db_options->env, db_options->statistics.get()));
|
||||
table_cache_.reset(
|
||||
new TableCache(dbname, &options_, storage_options, table_cache));
|
||||
table_cache_.reset(new TableCache(&options_, storage_options, table_cache));
|
||||
if (options_.compaction_style == kCompactionStyleUniversal) {
|
||||
compaction_picker_.reset(
|
||||
new UniversalCompactionPicker(&options_, &internal_comparator_));
|
||||
|
@ -29,7 +29,8 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
|
||||
Compaction::Compaction(Version* input_version, int level, int out_level,
|
||||
uint64_t target_file_size,
|
||||
uint64_t max_grandparent_overlap_bytes,
|
||||
bool seek_compaction, bool enable_compression,
|
||||
uint32_t output_path_id,
|
||||
CompressionType output_compression, bool seek_compaction,
|
||||
bool deletion_compaction)
|
||||
: level_(level),
|
||||
out_level_(out_level),
|
||||
@ -38,8 +39,9 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
|
||||
input_version_(input_version),
|
||||
number_levels_(input_version_->NumberLevels()),
|
||||
cfd_(input_version_->cfd_),
|
||||
output_path_id_(output_path_id),
|
||||
output_compression_(output_compression),
|
||||
seek_compaction_(seek_compaction),
|
||||
enable_compression_(enable_compression),
|
||||
deletion_compaction_(deletion_compaction),
|
||||
grandparent_index_(0),
|
||||
seen_key_(false),
|
||||
|
@ -47,8 +47,11 @@ class Compaction {
|
||||
// Maximum size of files to build during this compaction.
|
||||
uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
|
||||
|
||||
// Whether compression will be enabled for compaction outputs
|
||||
bool enable_compression() const { return enable_compression_; }
|
||||
// What compression for output
|
||||
CompressionType OutputCompressionType() const { return output_compression_; }
|
||||
|
||||
// Whether need to write output file to second DB path.
|
||||
uint32_t GetOutputPathId() const { return output_path_id_; }
|
||||
|
||||
// Is this a trivial compaction that can be implemented by just
|
||||
// moving a single input file to the next level (no merging or splitting)
|
||||
@ -104,8 +107,8 @@ class Compaction {
|
||||
|
||||
Compaction(Version* input_version, int level, int out_level,
|
||||
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
|
||||
bool seek_compaction = false, bool enable_compression = true,
|
||||
bool deletion_compaction = false);
|
||||
uint32_t output_path_id, CompressionType output_compression,
|
||||
bool seek_compaction = false, bool deletion_compaction = false);
|
||||
|
||||
int level_;
|
||||
int out_level_; // levels to which output files are stored
|
||||
@ -116,8 +119,9 @@ class Compaction {
|
||||
int number_levels_;
|
||||
ColumnFamilyData* cfd_;
|
||||
|
||||
uint32_t output_path_id_;
|
||||
CompressionType output_compression_;
|
||||
bool seek_compaction_;
|
||||
bool enable_compression_;
|
||||
// if true, just delete files in inputs_[0]
|
||||
bool deletion_compaction_;
|
||||
|
||||
|
@ -12,12 +12,38 @@
|
||||
#define __STDC_FORMAT_MACROS
|
||||
#include <inttypes.h>
|
||||
#include <limits>
|
||||
#include "db/filename.h"
|
||||
#include "util/log_buffer.h"
|
||||
#include "util/statistics.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
namespace {
|
||||
// Determine compression type, based on user options, level of the output
|
||||
// file and whether compression is disabled.
|
||||
// If enable_compression is false, then compression is always disabled no
|
||||
// matter what the values of the other two parameters are.
|
||||
// Otherwise, the compression type is determined based on options and level.
|
||||
CompressionType GetCompressionType(const Options& options, int level,
|
||||
const bool enable_compression = true) {
|
||||
if (!enable_compression) {
|
||||
// disable compression
|
||||
return kNoCompression;
|
||||
}
|
||||
// If the use has specified a different compression level for each level,
|
||||
// then pick the compresison for that level.
|
||||
if (!options.compression_per_level.empty()) {
|
||||
const int n = options.compression_per_level.size() - 1;
|
||||
// It is possible for level_ to be -1; in that case, we use level
|
||||
// 0's compression. This occurs mostly in backwards compatibility
|
||||
// situations when the builder doesn't know what level the file
|
||||
// belongs to. Likewise, if level_ is beyond the end of the
|
||||
// specified compression levels, use the last value.
|
||||
return options.compression_per_level[std::max(0, std::min(level, n))];
|
||||
} else {
|
||||
return options.compression;
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
|
||||
uint64_t sum = 0;
|
||||
@ -345,7 +371,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
|
||||
}
|
||||
Compaction* c = new Compaction(version, input_level, output_level,
|
||||
MaxFileSizeForLevel(output_level),
|
||||
MaxGrandParentOverlapBytes(input_level));
|
||||
MaxGrandParentOverlapBytes(input_level), 0,
|
||||
GetCompressionType(*options_, output_level));
|
||||
|
||||
c->inputs_[0] = inputs;
|
||||
if (ExpandWhileOverlapping(c) == false) {
|
||||
@ -465,7 +492,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
|
||||
assert(level >= 0);
|
||||
assert(level + 1 < NumberLevels());
|
||||
c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
|
||||
MaxGrandParentOverlapBytes(level));
|
||||
MaxGrandParentOverlapBytes(level), 0,
|
||||
GetCompressionType(*options_, level + 1));
|
||||
c->score_ = score;
|
||||
|
||||
// Pick the largest file in this level that is not already
|
||||
@ -585,15 +613,9 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
|
||||
newerfile = f;
|
||||
}
|
||||
|
||||
// The files are sorted from newest first to oldest last.
|
||||
std::vector<int>& file_by_time = c->input_version_->files_by_size_[level];
|
||||
|
||||
// Is the earliest file part of this compaction?
|
||||
int last_index = file_by_time[file_by_time.size()-1];
|
||||
FileMetaData* last_file = c->input_version_->files_[level][last_index];
|
||||
if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) {
|
||||
c->bottommost_level_ = true;
|
||||
}
|
||||
FileMetaData* last_file = c->input_version_->files_[level].back();
|
||||
c->bottommost_level_ = c->inputs_[0].back() == last_file;
|
||||
|
||||
// update statistics
|
||||
MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION,
|
||||
@ -628,12 +650,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
options_->compaction_options_universal.max_merge_width;
|
||||
|
||||
// The files are sorted from newest first to oldest last.
|
||||
std::vector<int>& file_by_time = version->files_by_size_[level];
|
||||
const auto& files = version->files_[level];
|
||||
|
||||
FileMetaData* f = nullptr;
|
||||
bool done = false;
|
||||
int start_index = 0;
|
||||
unsigned int candidate_count = 0;
|
||||
assert(file_by_time.size() == version->files_[level].size());
|
||||
|
||||
unsigned int max_files_to_compact = std::min(max_merge_width,
|
||||
max_number_of_files_to_compact);
|
||||
@ -641,14 +663,13 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
|
||||
// Considers a candidate file only if it is smaller than the
|
||||
// total size accumulated so far.
|
||||
for (unsigned int loop = 0; loop < file_by_time.size(); loop++) {
|
||||
for (unsigned int loop = 0; loop < files.size(); loop++) {
|
||||
|
||||
candidate_count = 0;
|
||||
|
||||
// Skip files that are already being compacted
|
||||
for (f = nullptr; loop < file_by_time.size(); loop++) {
|
||||
int index = file_by_time[loop];
|
||||
f = version->files_[level][index];
|
||||
for (f = nullptr; loop < files.size(); loop++) {
|
||||
f = files[loop];
|
||||
|
||||
if (!f->being_compacted) {
|
||||
candidate_count = 1;
|
||||
@ -664,17 +685,16 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
// first candidate to be compacted.
|
||||
uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0;
|
||||
if (f != nullptr) {
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Universal: Possible candidate file %" PRIu64 "[%d].",
|
||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop);
|
||||
LogToBuffer(
|
||||
log_buffer, "[%s] Universal: Possible candidate file %s[%d].",
|
||||
version->cfd_->GetName().c_str(),
|
||||
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop);
|
||||
}
|
||||
|
||||
// Check if the suceeding files need compaction.
|
||||
for (unsigned int i = loop+1;
|
||||
candidate_count < max_files_to_compact && i < file_by_time.size();
|
||||
i++) {
|
||||
int index = file_by_time[i];
|
||||
FileMetaData* f = version->files_[level][index];
|
||||
for (unsigned int i = loop + 1;
|
||||
candidate_count < max_files_to_compact && i < files.size(); i++) {
|
||||
FileMetaData* f = files[i];
|
||||
if (f->being_compacted) {
|
||||
break;
|
||||
}
|
||||
@ -713,14 +733,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
break;
|
||||
} else {
|
||||
for (unsigned int i = loop;
|
||||
i < loop + candidate_count && i < file_by_time.size(); i++) {
|
||||
int index = file_by_time[i];
|
||||
FileMetaData* f = version->files_[level][index];
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Universal: Skipping file %" PRIu64 "[%d] "
|
||||
"with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n",
|
||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(),
|
||||
i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted);
|
||||
i < loop + candidate_count && i < files.size(); i++) {
|
||||
FileMetaData* f = files[i];
|
||||
LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64
|
||||
"[%d] with size %" PRIu64
|
||||
" (compensated size %" PRIu64 ") %d\n",
|
||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), i,
|
||||
f->fd.GetFileSize(), f->compensated_file_size,
|
||||
f->being_compacted);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -736,31 +756,29 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
||||
if (ratio_to_compress >= 0) {
|
||||
uint64_t total_size = version->NumLevelBytes(level);
|
||||
uint64_t older_file_size = 0;
|
||||
for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
|
||||
i--) {
|
||||
older_file_size +=
|
||||
version->files_[level][file_by_time[i]]->fd.GetFileSize();
|
||||
for (unsigned int i = files.size() - 1;
|
||||
i >= first_index_after; i--) {
|
||||
older_file_size += files[i]->fd.GetFileSize();
|
||||
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
|
||||
enable_compression = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Compaction* c =
|
||||
new Compaction(version, level, level, MaxFileSizeForLevel(level),
|
||||
LLONG_MAX, false, enable_compression);
|
||||
Compaction* c = new Compaction(
|
||||
version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, 0,
|
||||
GetCompressionType(*options_, level, enable_compression));
|
||||
c->score_ = score;
|
||||
|
||||
for (unsigned int i = start_index; i < first_index_after; i++) {
|
||||
int index = file_by_time[i];
|
||||
FileMetaData* f = c->input_version_->files_[level][index];
|
||||
FileMetaData* f = c->input_version_->files_[level][i];
|
||||
c->inputs_[0].push_back(f);
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Universal: Picking file %" PRIu64 "[%d] "
|
||||
"[%s] Universal: Picking file %s[%d] "
|
||||
"with size %" PRIu64 " (compensated size %" PRIu64 ")\n",
|
||||
version->cfd_->GetName().c_str(),
|
||||
f->fd.GetNumber(), i,
|
||||
f->fd.GetFileSize(), f->compensated_file_size);
|
||||
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(),
|
||||
i, f->fd.GetFileSize(), f->compensated_file_size);
|
||||
}
|
||||
return c;
|
||||
}
|
||||
@ -780,8 +798,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
max_size_amplification_percent;
|
||||
|
||||
// The files are sorted from newest first to oldest last.
|
||||
std::vector<int>& file_by_time = version->files_by_size_[level];
|
||||
assert(file_by_time.size() == version->files_[level].size());
|
||||
const auto& files = version->files_[level];
|
||||
|
||||
unsigned int candidate_count = 0;
|
||||
uint64_t candidate_size = 0;
|
||||
@ -789,38 +806,35 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
FileMetaData* f = nullptr;
|
||||
|
||||
// Skip files that are already being compacted
|
||||
for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) {
|
||||
int index = file_by_time[loop];
|
||||
f = version->files_[level][index];
|
||||
for (unsigned int loop = 0; loop < files.size() - 1; loop++) {
|
||||
f = files[loop];
|
||||
if (!f->being_compacted) {
|
||||
start_index = loop; // Consider this as the first candidate.
|
||||
break;
|
||||
}
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Universal: skipping file %" PRIu64 "[%d] compacted %s",
|
||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop,
|
||||
" cannot be a candidate to reduce size amp.\n");
|
||||
LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s",
|
||||
version->cfd_->GetName().c_str(),
|
||||
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(),
|
||||
loop, " cannot be a candidate to reduce size amp.\n");
|
||||
f = nullptr;
|
||||
}
|
||||
if (f == nullptr) {
|
||||
return nullptr; // no candidate files
|
||||
}
|
||||
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Universal: First candidate file %" PRIu64 "[%d] %s",
|
||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), start_index,
|
||||
" to reduce size amp.\n");
|
||||
LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s",
|
||||
version->cfd_->GetName().c_str(),
|
||||
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(),
|
||||
start_index, " to reduce size amp.\n");
|
||||
|
||||
// keep adding up all the remaining files
|
||||
for (unsigned int loop = start_index; loop < file_by_time.size() - 1;
|
||||
loop++) {
|
||||
int index = file_by_time[loop];
|
||||
f = version->files_[level][index];
|
||||
for (unsigned int loop = start_index; loop < files.size() - 1; loop++) {
|
||||
f = files[loop];
|
||||
if (f->being_compacted) {
|
||||
LogToBuffer(
|
||||
log_buffer,
|
||||
"[%s] Universal: Possible candidate file %" PRIu64 "[%d] %s.",
|
||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop,
|
||||
log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.",
|
||||
version->cfd_->GetName().c_str(),
|
||||
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop,
|
||||
" is already being compacted. No size amp reduction possible.\n");
|
||||
return nullptr;
|
||||
}
|
||||
@ -832,8 +846,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
}
|
||||
|
||||
// size of earliest file
|
||||
int index = file_by_time[file_by_time.size() - 1];
|
||||
uint64_t earliest_file_size = version->files_[level][index]->fd.GetFileSize();
|
||||
uint64_t earliest_file_size = files.back()->fd.GetFileSize();
|
||||
|
||||
// size amplification = percentage of additional size
|
||||
if (candidate_size * 100 < ratio * earliest_file_size) {
|
||||
@ -850,23 +863,22 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
||||
"earliest-file-size %" PRIu64,
|
||||
version->cfd_->GetName().c_str(), candidate_size, earliest_file_size);
|
||||
}
|
||||
assert(start_index >= 0 && start_index < file_by_time.size() - 1);
|
||||
assert(start_index >= 0 && start_index < files.size() - 1);
|
||||
|
||||
// create a compaction request
|
||||
// We always compact all the files, so always compress.
|
||||
Compaction* c =
|
||||
new Compaction(version, level, level, MaxFileSizeForLevel(level),
|
||||
LLONG_MAX, false, true);
|
||||
LLONG_MAX, 0, GetCompressionType(*options_, level));
|
||||
c->score_ = score;
|
||||
for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
|
||||
int index = file_by_time[loop];
|
||||
f = c->input_version_->files_[level][index];
|
||||
for (unsigned int loop = start_index; loop < files.size(); loop++) {
|
||||
f = c->input_version_->files_[level][loop];
|
||||
c->inputs_[0].push_back(f);
|
||||
LogToBuffer(log_buffer,
|
||||
"[%s] Universal: size amp picking file %" PRIu64 "[%d] "
|
||||
"with size %" PRIu64 " (compensated size %" PRIu64 ")",
|
||||
version->cfd_->GetName().c_str(),
|
||||
f->fd.GetNumber(), index,
|
||||
f->fd.GetNumber(), loop,
|
||||
f->fd.GetFileSize(), f->compensated_file_size);
|
||||
}
|
||||
return c;
|
||||
@ -899,7 +911,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false,
|
||||
Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false,
|
||||
true /* is deletion compaction */);
|
||||
// delete old files (FIFO)
|
||||
for (auto ritr = version->files_[0].rbegin();
|
||||
|
@ -98,7 +98,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
}
|
||||
|
||||
// Make a set of all of the live *.sst files
|
||||
std::set<uint64_t> live;
|
||||
std::vector<FileDescriptor> live;
|
||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||
cfd->current()->AddLiveFiles(&live);
|
||||
}
|
||||
@ -109,7 +109,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
||||
// create names of the live files. The names are not absolute
|
||||
// paths, instead they are relative to dbname_;
|
||||
for (auto live_file : live) {
|
||||
ret.push_back(TableFileName("", live_file));
|
||||
ret.push_back(MakeTableFileName("", live_file.GetNumber()));
|
||||
}
|
||||
|
||||
ret.push_back(CurrentFileName(""));
|
||||
|
216
db/db_impl.cc
216
db/db_impl.cc
@ -98,6 +98,7 @@ struct DBImpl::CompactionState {
|
||||
// Files produced by compaction
|
||||
struct Output {
|
||||
uint64_t number;
|
||||
uint32_t path_id;
|
||||
uint64_t file_size;
|
||||
InternalKey smallest, largest;
|
||||
SequenceNumber smallest_seqno, largest_seqno;
|
||||
@ -294,30 +295,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
|
||||
result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
|
||||
}
|
||||
|
||||
if (result.db_paths.size() == 0) {
|
||||
result.db_paths.push_back(dbname);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
CompressionType GetCompressionType(const Options& options, int level,
|
||||
const bool enable_compression) {
|
||||
if (!enable_compression) {
|
||||
// disable compression
|
||||
return kNoCompression;
|
||||
}
|
||||
// If the use has specified a different compression level for each level,
|
||||
// then pick the compresison for that level.
|
||||
if (!options.compression_per_level.empty()) {
|
||||
const int n = options.compression_per_level.size() - 1;
|
||||
// It is possible for level_ to be -1; in that case, we use level
|
||||
// 0's compression. This occurs mostly in backwards compatibility
|
||||
// situations when the builder doesn't know what level the file
|
||||
// belongs to. Likewise, if level_ is beyond the end of the
|
||||
// specified compression levels, use the last value.
|
||||
return options.compression_per_level[std::max(0, std::min(level, n))];
|
||||
} else {
|
||||
return options.compression;
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
CompressionType GetCompressionFlush(const Options& options) {
|
||||
// Compressing memtable flushes might not help unless the sequential load
|
||||
// optimization is used for leveled compaction. Otherwise the CPU and
|
||||
@ -325,12 +310,13 @@ CompressionType GetCompressionFlush(const Options& options) {
|
||||
|
||||
bool can_compress;
|
||||
|
||||
if (options.compaction_style == kCompactionStyleUniversal) {
|
||||
if (options.compaction_style == kCompactionStyleUniversal) {
|
||||
can_compress =
|
||||
(options.compaction_options_universal.compression_size_percent < 0);
|
||||
} else {
|
||||
// For leveled compress when min_level_to_compress == 0.
|
||||
can_compress = (GetCompressionType(options, 0, true) != kNoCompression);
|
||||
can_compress = options.compression_per_level.empty() ||
|
||||
options.compression_per_level[0] != kNoCompression;
|
||||
}
|
||||
|
||||
if (can_compress) {
|
||||
@ -339,6 +325,7 @@ CompressionType GetCompressionFlush(const Options& options) {
|
||||
return kNoCompression;
|
||||
}
|
||||
}
|
||||
} // namespace
|
||||
|
||||
DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
|
||||
: env_(options.env),
|
||||
@ -591,30 +578,48 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
|
||||
}
|
||||
|
||||
// don't delete live files
|
||||
deletion_state.sst_live.assign(pending_outputs_.begin(),
|
||||
pending_outputs_.end());
|
||||
for (auto pair : pending_outputs_) {
|
||||
deletion_state.sst_live.emplace_back(pair.first, pair.second, 0);
|
||||
}
|
||||
/* deletion_state.sst_live.insert(pending_outputs_.begin(),
|
||||
pending_outputs_.end());*/
|
||||
versions_->AddLiveFiles(&deletion_state.sst_live);
|
||||
|
||||
if (doing_the_full_scan) {
|
||||
// set of all files in the directory. We'll exclude files that are still
|
||||
// alive in the subsequent processings.
|
||||
env_->GetChildren(
|
||||
dbname_, &deletion_state.candidate_files
|
||||
); // Ignore errors
|
||||
for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) {
|
||||
// set of all files in the directory. We'll exclude files that are still
|
||||
// alive in the subsequent processings.
|
||||
std::vector<std::string> files;
|
||||
env_->GetChildren(dbname_, &files); // Ignore errors
|
||||
for (std::string file : files) {
|
||||
deletion_state.candidate_files.emplace_back(file, path_id);
|
||||
}
|
||||
}
|
||||
|
||||
//Add log files in wal_dir
|
||||
if (options_.wal_dir != dbname_) {
|
||||
std::vector<std::string> log_files;
|
||||
env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
|
||||
deletion_state.candidate_files.insert(
|
||||
deletion_state.candidate_files.end(),
|
||||
log_files.begin(),
|
||||
log_files.end()
|
||||
);
|
||||
for (std::string log_file : log_files) {
|
||||
deletion_state.candidate_files.emplace_back(log_file, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
namespace {
|
||||
bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first,
|
||||
const rocksdb::DBImpl::CandidateFileInfo& second) {
|
||||
if (first.file_name > second.file_name) {
|
||||
return true;
|
||||
} else if (first.file_name < second.file_name) {
|
||||
return false;
|
||||
} else {
|
||||
return (first.path_id > first.path_id);
|
||||
}
|
||||
}
|
||||
}; // namespace
|
||||
|
||||
// Diffs the files listed in filenames and those that do not
|
||||
// belong to live files are posibly removed. Also, removes all the
|
||||
// files in sst_delete_files and log_delete_files.
|
||||
@ -630,10 +635,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Now, convert live list to an unordered set, WITHOUT mutex held;
|
||||
// Now, convert live list to an unordered map, WITHOUT mutex held;
|
||||
// set is slow.
|
||||
std::unordered_set<uint64_t> sst_live(state.sst_live.begin(),
|
||||
state.sst_live.end());
|
||||
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
|
||||
for (FileDescriptor& fd : state.sst_live) {
|
||||
sst_live_map[fd.GetNumber()] = &fd;
|
||||
}
|
||||
|
||||
auto& candidate_files = state.candidate_files;
|
||||
candidate_files.reserve(
|
||||
@ -643,26 +650,30 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
||||
// We may ignore the dbname when generating the file names.
|
||||
const char* kDumbDbName = "";
|
||||
for (auto file : state.sst_delete_files) {
|
||||
candidate_files.push_back(
|
||||
TableFileName(kDumbDbName, file->fd.GetNumber()).substr(1));
|
||||
candidate_files.emplace_back(
|
||||
MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
|
||||
file->fd.GetPathId());
|
||||
delete file;
|
||||
}
|
||||
|
||||
for (auto file_num : state.log_delete_files) {
|
||||
if (file_num > 0) {
|
||||
candidate_files.push_back(LogFileName(kDumbDbName, file_num).substr(1));
|
||||
candidate_files.emplace_back(LogFileName(kDumbDbName, file_num).substr(1),
|
||||
0);
|
||||
}
|
||||
}
|
||||
|
||||
// dedup state.candidate_files so we don't try to delete the same
|
||||
// file twice
|
||||
sort(candidate_files.begin(), candidate_files.end());
|
||||
sort(candidate_files.begin(), candidate_files.end(), CompareCandidateFile);
|
||||
candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()),
|
||||
candidate_files.end());
|
||||
|
||||
std::vector<std::string> old_info_log_files;
|
||||
|
||||
for (const auto& to_delete : candidate_files) {
|
||||
for (const auto& candidate_file : candidate_files) {
|
||||
std::string to_delete = candidate_file.file_name;
|
||||
uint32_t path_id = candidate_file.path_id;
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
// Ignore file if we cannot recognize it.
|
||||
@ -682,7 +693,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
||||
keep = (number >= state.manifest_file_number);
|
||||
break;
|
||||
case kTableFile:
|
||||
keep = (sst_live.find(number) != sst_live.end());
|
||||
keep = (sst_live_map.find(number) != sst_live_map.end());
|
||||
break;
|
||||
case kTempFile:
|
||||
// Any temp files that are currently being written to must
|
||||
@ -690,7 +701,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
||||
// Also, SetCurrentFile creates a temp file when writing out new
|
||||
// manifest, which is equal to state.pending_manifest_file_number. We
|
||||
// should not delete that file
|
||||
keep = (sst_live.find(number) != sst_live.end()) ||
|
||||
keep = (sst_live_map.find(number) != sst_live_map.end()) ||
|
||||
(number == state.pending_manifest_file_number);
|
||||
break;
|
||||
case kInfoLogFile:
|
||||
@ -711,13 +722,16 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string fname;
|
||||
if (type == kTableFile) {
|
||||
// evict from cache
|
||||
TableCache::Evict(table_cache_.get(), number);
|
||||
fname = TableFileName(options_.db_paths, number, path_id);
|
||||
} else {
|
||||
fname =
|
||||
((type == kLogFile) ? options_.wal_dir : dbname_) + "/" + to_delete;
|
||||
}
|
||||
|
||||
std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
|
||||
"/" + to_delete;
|
||||
if (type == kLogFile &&
|
||||
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
|
||||
auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number);
|
||||
@ -1102,6 +1116,13 @@ Status DBImpl::Recover(
|
||||
return s;
|
||||
}
|
||||
|
||||
for (auto db_path : options_.db_paths) {
|
||||
s = env_->CreateDirIfMissing(db_path);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
}
|
||||
|
||||
s = env_->NewDirectory(dbname_, &db_directory_);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
@ -1367,8 +1388,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
|
||||
mutex_.AssertHeld();
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
FileMetaData meta;
|
||||
meta.fd.number = versions_->NewFileNumber();
|
||||
pending_outputs_.insert(meta.fd.GetNumber());
|
||||
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
||||
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
|
||||
Iterator* iter = mem->NewIterator(ReadOptions(), true);
|
||||
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
||||
const SequenceNumber earliest_seqno_in_memtable =
|
||||
@ -1399,9 +1420,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
|
||||
// should not be added to the manifest.
|
||||
int level = 0;
|
||||
if (s.ok() && meta.fd.GetFileSize() > 0) {
|
||||
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(),
|
||||
meta.smallest, meta.largest, meta.smallest_seqno,
|
||||
meta.largest_seqno);
|
||||
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
||||
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
||||
meta.smallest_seqno, meta.largest_seqno);
|
||||
}
|
||||
|
||||
InternalStats::CompactionStats stats;
|
||||
@ -1420,9 +1441,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
|
||||
mutex_.AssertHeld();
|
||||
const uint64_t start_micros = env_->NowMicros();
|
||||
FileMetaData meta;
|
||||
meta.fd.number = versions_->NewFileNumber();
|
||||
|
||||
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
||||
*filenumber = meta.fd.GetNumber();
|
||||
pending_outputs_.insert(meta.fd.GetNumber());
|
||||
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
|
||||
|
||||
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
||||
const SequenceNumber earliest_seqno_in_memtable =
|
||||
@ -1489,9 +1511,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
|
||||
cfd->options()->compaction_style == kCompactionStyleLevel) {
|
||||
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
|
||||
}
|
||||
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(),
|
||||
meta.smallest, meta.largest, meta.smallest_seqno,
|
||||
meta.largest_seqno);
|
||||
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
||||
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
||||
meta.smallest_seqno, meta.largest_seqno);
|
||||
}
|
||||
|
||||
InternalStats::CompactionStats stats;
|
||||
@ -1547,7 +1569,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
|
||||
// Replace immutable memtable with the generated Table
|
||||
s = cfd->imm()->InstallMemtableFlushResults(
|
||||
cfd, mems, versions_.get(), &mutex_, options_.info_log.get(),
|
||||
file_number, pending_outputs_, &deletion_state.memtables_to_free,
|
||||
file_number, &pending_outputs_, &deletion_state.memtables_to_free,
|
||||
db_directory_.get(), log_buffer);
|
||||
}
|
||||
|
||||
@ -1691,9 +1713,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
|
||||
edit.SetColumnFamily(cfd->GetID());
|
||||
for (const auto& f : cfd->current()->files_[level]) {
|
||||
edit.DeleteFile(level, f->fd.GetNumber());
|
||||
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetFileSize(),
|
||||
f->smallest, f->largest, f->smallest_seqno,
|
||||
f->largest_seqno);
|
||||
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->smallest_seqno, f->largest_seqno);
|
||||
}
|
||||
Log(options_.info_log, "[%s] Apply version edit:\n%s",
|
||||
cfd->GetName().c_str(), edit.DebugString().data());
|
||||
@ -2190,9 +2212,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
||||
assert(c->num_input_files(0) == 1);
|
||||
FileMetaData* f = c->input(0, 0);
|
||||
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
||||
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetFileSize(),
|
||||
f->smallest, f->largest, f->smallest_seqno,
|
||||
f->largest_seqno);
|
||||
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->smallest_seqno, f->largest_seqno);
|
||||
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
|
||||
db_directory_.get());
|
||||
InstallSuperVersion(c->column_family_data(), deletion_state);
|
||||
@ -2298,7 +2320,7 @@ void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
|
||||
int filesNeeded = compact->compaction->num_input_files(1);
|
||||
for (int i = 0; i < std::max(filesNeeded, 1); i++) {
|
||||
uint64_t file_number = versions_->NewFileNumber();
|
||||
pending_outputs_.insert(file_number);
|
||||
pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
|
||||
compact->allocated_file_numbers.push_back(file_number);
|
||||
}
|
||||
}
|
||||
@ -2324,18 +2346,20 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
|
||||
} else {
|
||||
mutex_.Lock();
|
||||
file_number = versions_->NewFileNumber();
|
||||
pending_outputs_.insert(file_number);
|
||||
pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
|
||||
mutex_.Unlock();
|
||||
}
|
||||
CompactionState::Output out;
|
||||
out.number = file_number;
|
||||
out.path_id = compact->compaction->GetOutputPathId();
|
||||
out.smallest.Clear();
|
||||
out.largest.Clear();
|
||||
out.smallest_seqno = out.largest_seqno = 0;
|
||||
compact->outputs.push_back(out);
|
||||
|
||||
// Make the output file
|
||||
std::string fname = TableFileName(dbname_, file_number);
|
||||
std::string fname = TableFileName(options_.db_paths, file_number,
|
||||
compact->compaction->GetOutputPathId());
|
||||
Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
|
||||
|
||||
if (s.ok()) {
|
||||
@ -2343,13 +2367,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
|
||||
compact->compaction->OutputFilePreallocationSize());
|
||||
|
||||
ColumnFamilyData* cfd = compact->compaction->column_family_data();
|
||||
CompressionType compression_type =
|
||||
GetCompressionType(*cfd->options(), compact->compaction->output_level(),
|
||||
compact->compaction->enable_compression());
|
||||
|
||||
compact->builder.reset(
|
||||
NewTableBuilder(*cfd->options(), cfd->internal_comparator(),
|
||||
compact->outfile.get(), compression_type));
|
||||
compact->builder.reset(NewTableBuilder(
|
||||
*cfd->options(), cfd->internal_comparator(), compact->outfile.get(),
|
||||
compact->compaction->OutputCompressionType()));
|
||||
}
|
||||
LogFlush(options_.info_log);
|
||||
return s;
|
||||
@ -2362,6 +2382,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
||||
assert(compact->builder != nullptr);
|
||||
|
||||
const uint64_t output_number = compact->current_output()->number;
|
||||
const uint32_t output_path_id = compact->current_output()->path_id;
|
||||
assert(output_number != 0);
|
||||
|
||||
// Check for iterator errors
|
||||
@ -2397,9 +2418,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
||||
if (s.ok() && current_entries > 0) {
|
||||
// Verify that the table is usable
|
||||
ColumnFamilyData* cfd = compact->compaction->column_family_data();
|
||||
FileDescriptor meta(output_number, current_bytes);
|
||||
FileDescriptor fd(output_number, output_path_id, current_bytes);
|
||||
Iterator* iter = cfd->table_cache()->NewIterator(
|
||||
ReadOptions(), storage_options_, cfd->internal_comparator(), meta);
|
||||
ReadOptions(), storage_options_, cfd->internal_comparator(), fd);
|
||||
s = iter->status();
|
||||
delete iter;
|
||||
if (s.ok()) {
|
||||
@ -2442,9 +2463,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact,
|
||||
compact->compaction->AddInputDeletions(compact->compaction->edit());
|
||||
for (size_t i = 0; i < compact->outputs.size(); i++) {
|
||||
const CompactionState::Output& out = compact->outputs[i];
|
||||
compact->compaction->edit()->AddFile(
|
||||
compact->compaction->output_level(), out.number, out.file_size,
|
||||
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
|
||||
compact->compaction->edit()->AddFile(compact->compaction->output_level(),
|
||||
out.number, out.path_id, out.file_size,
|
||||
out.smallest, out.largest,
|
||||
out.smallest_seqno, out.largest_seqno);
|
||||
}
|
||||
return versions_->LogAndApply(compact->compaction->column_family_data(),
|
||||
compact->compaction->edit(), &mutex_,
|
||||
@ -4140,7 +4162,7 @@ Status DBImpl::MakeRoomForWrite(
|
||||
// how do we fail if we're not creating new log?
|
||||
assert(creating_new_log);
|
||||
// Avoid chewing through file number space in a tight loop.
|
||||
versions_->ReuseFileNumber(new_log_number);
|
||||
versions_->ReuseLogFileNumber(new_log_number);
|
||||
assert(!new_mem);
|
||||
assert(!new_log);
|
||||
break;
|
||||
@ -4383,14 +4405,15 @@ Status DBImpl::CheckConsistency() {
|
||||
|
||||
std::string corruption_messages;
|
||||
for (const auto& md : metadata) {
|
||||
std::string file_path = dbname_ + md.name;
|
||||
std::string file_path = md.db_path + "/" + md.name;
|
||||
|
||||
uint64_t fsize = 0;
|
||||
Status s = env_->GetFileSize(file_path, &fsize);
|
||||
if (!s.ok()) {
|
||||
corruption_messages +=
|
||||
"Can't access " + md.name + ": " + s.ToString() + "\n";
|
||||
} else if (fsize != md.size) {
|
||||
corruption_messages += "Sst file size mismatch: " + md.name +
|
||||
corruption_messages += "Sst file size mismatch: " + file_path +
|
||||
". Size recorded in manifest " +
|
||||
std::to_string(md.size) + ", actual size " +
|
||||
std::to_string(fsize) + "\n";
|
||||
@ -4488,6 +4511,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
|
||||
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
|
||||
if (db_options.db_paths.size() > 1) {
|
||||
return Status::NotSupported(
|
||||
"More than one DB paths are not supported yet. ");
|
||||
}
|
||||
|
||||
*dbptr = nullptr;
|
||||
handles->clear();
|
||||
|
||||
@ -4503,6 +4531,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||
|
||||
DBImpl* impl = new DBImpl(db_options, dbname);
|
||||
Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
|
||||
if (s.ok()) {
|
||||
for (auto path : impl->options_.db_paths) {
|
||||
s = impl->env_->CreateDirIfMissing(path);
|
||||
if (!s.ok()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!s.ok()) {
|
||||
delete impl;
|
||||
return s;
|
||||
@ -4665,6 +4702,21 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
||||
}
|
||||
}
|
||||
|
||||
for (auto db_path : options.db_paths) {
|
||||
env->GetChildren(db_path, &filenames);
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
for (size_t i = 0; i < filenames.size(); i++) {
|
||||
if (ParseFileName(filenames[i], &number, &type) &&
|
||||
type == kTableFile) { // Lock file will be deleted at end
|
||||
Status del = env->DeleteFile(db_path + "/" + filenames[i]);
|
||||
if (result.ok() && !del.ok()) {
|
||||
result = del;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
env->GetChildren(archivedir, &archiveFiles);
|
||||
// Delete archival files.
|
||||
for (size_t i = 0; i < archiveFiles.size(); ++i) {
|
||||
|
29
db/db_impl.h
29
db/db_impl.h
@ -198,6 +198,17 @@ class DBImpl : public DB {
|
||||
Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
|
||||
#endif // NDEBUG
|
||||
|
||||
// Structure to store information for candidate files to delete.
|
||||
struct CandidateFileInfo {
|
||||
std::string file_name;
|
||||
uint32_t path_id;
|
||||
CandidateFileInfo(std::string name, uint32_t path)
|
||||
: file_name(name), path_id(path) {}
|
||||
bool operator==(const CandidateFileInfo& other) const {
|
||||
return file_name == other.file_name && path_id == other.path_id;
|
||||
}
|
||||
};
|
||||
|
||||
// needed for CleanupIteratorState
|
||||
struct DeletionState {
|
||||
inline bool HaveSomethingToDelete() const {
|
||||
@ -209,10 +220,10 @@ class DBImpl : public DB {
|
||||
// a list of all files that we'll consider deleting
|
||||
// (every once in a while this is filled up with all files
|
||||
// in the DB directory)
|
||||
std::vector<std::string> candidate_files;
|
||||
std::vector<CandidateFileInfo> candidate_files;
|
||||
|
||||
// the list of all live sst files that cannot be deleted
|
||||
std::vector<uint64_t> sst_live;
|
||||
std::vector<FileDescriptor> sst_live;
|
||||
|
||||
// a list of sst files that we need to delete
|
||||
std::vector<FileMetaData*> sst_delete_files;
|
||||
@ -501,7 +512,8 @@ class DBImpl : public DB {
|
||||
|
||||
// Set of table files to protect from deletion because they are
|
||||
// part of ongoing compactions.
|
||||
std::set<uint64_t> pending_outputs_;
|
||||
// map from pending file number ID to their path IDs.
|
||||
FileNumToPathIdMap pending_outputs_;
|
||||
|
||||
// At least one compaction or flush job is pending but not yet scheduled
|
||||
// because of the max background thread limit.
|
||||
@ -625,15 +637,4 @@ extern Options SanitizeOptions(const std::string& db,
|
||||
const Options& src);
|
||||
extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src);
|
||||
|
||||
// Determine compression type, based on user options, level of the output
|
||||
// file and whether compression is disabled.
|
||||
// If enable_compression is false, then compression is always disabled no
|
||||
// matter what the values of the other two parameters are.
|
||||
// Otherwise, the compression type is determined based on options and level.
|
||||
CompressionType GetCompressionType(const Options& options, int level,
|
||||
const bool enable_compression);
|
||||
|
||||
// Determine compression type for L0 file written by memtable flush.
|
||||
CompressionType GetCompressionFlush(const Options& options);
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -98,7 +98,12 @@ class AtomicCounter {
|
||||
count_ = 0;
|
||||
}
|
||||
};
|
||||
} // namespace anon
|
||||
|
||||
static std::string Key(int i) {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "key%06d", i);
|
||||
return std::string(buf);
|
||||
}
|
||||
|
||||
// Special Env used to delay background operations
|
||||
@ -355,7 +360,10 @@ class DBTest {
|
||||
|
||||
~DBTest() {
|
||||
Close();
|
||||
ASSERT_OK(DestroyDB(dbname_, Options()));
|
||||
Options options;
|
||||
options.db_paths.push_back(dbname_);
|
||||
options.db_paths.push_back(dbname_ + "_2");
|
||||
ASSERT_OK(DestroyDB(dbname_, options));
|
||||
delete env_;
|
||||
delete filter_policy_;
|
||||
}
|
||||
@ -436,7 +444,8 @@ class DBTest {
|
||||
switch (option_config_) {
|
||||
case kHashSkipList:
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
||||
options.memtable_factory.reset(NewHashSkipListRepFactory());
|
||||
options.memtable_factory.reset(
|
||||
NewHashSkipListRepFactory(16));
|
||||
break;
|
||||
case kPlainTableFirstBytePrefix:
|
||||
options.table_factory.reset(new PlainTableFactory());
|
||||
@ -466,7 +475,7 @@ class DBTest {
|
||||
options.db_log_dir = test::TmpDir();
|
||||
break;
|
||||
case kWalDir:
|
||||
options.wal_dir = "/tmp/wal";
|
||||
options.wal_dir = test::TmpDir() + "/wal";
|
||||
break;
|
||||
case kManifestFileSize:
|
||||
options.max_manifest_file_size = 50; // 50 bytes
|
||||
@ -487,7 +496,8 @@ class DBTest {
|
||||
break;
|
||||
case kHashLinkList:
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
|
||||
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0));
|
||||
options.memtable_factory.reset(
|
||||
NewHashLinkListRepFactory(4, 0, 3, true, 4));
|
||||
break;
|
||||
case kHashCuckoo:
|
||||
options.memtable_factory.reset(
|
||||
@ -895,6 +905,30 @@ class DBTest {
|
||||
return property;
|
||||
}
|
||||
|
||||
int GetSstFileCount(std::string path) {
|
||||
std::vector<std::string> files;
|
||||
env_->GetChildren(path, &files);
|
||||
|
||||
int sst_count = 0;
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
for (size_t i = 0; i < files.size(); i++) {
|
||||
if (ParseFileName(files[i], &number, &type) && type == kTableFile) {
|
||||
sst_count++;
|
||||
}
|
||||
}
|
||||
return sst_count;
|
||||
}
|
||||
|
||||
void GenerateNewFile(Random* rnd, int* key_idx) {
|
||||
for (int i = 0; i < 11; i++) {
|
||||
ASSERT_OK(Put(Key(*key_idx), RandomString(rnd, (i == 10) ? 1 : 10000)));
|
||||
(*key_idx)++;
|
||||
}
|
||||
dbfull()->TEST_WaitForFlushMemTable();
|
||||
dbfull()->TEST_WaitForCompact();
|
||||
}
|
||||
|
||||
std::string IterStatus(Iterator* iter) {
|
||||
std::string result;
|
||||
if (iter->Valid()) {
|
||||
@ -1035,12 +1069,6 @@ class DBTest {
|
||||
|
||||
};
|
||||
|
||||
static std::string Key(int i) {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "key%06d", i);
|
||||
return std::string(buf);
|
||||
}
|
||||
|
||||
static long TestGetTickerCount(const Options& options, Tickers ticker_type) {
|
||||
return options.statistics->getTickerCount(ticker_type);
|
||||
}
|
||||
@ -3432,6 +3460,13 @@ TEST(DBTest, UniversalCompactionCompressRatio2) {
|
||||
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(),
|
||||
120000 * 12 * 0.8 + 120000 * 2);
|
||||
}
|
||||
|
||||
TEST(DBTest, FailMoreDbPaths) {
|
||||
Options options;
|
||||
options.db_paths.push_back(dbname_);
|
||||
options.db_paths.push_back(dbname_ + "_2");
|
||||
ASSERT_TRUE(TryReopen(&options).IsNotSupported());
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST(DBTest, ConvertCompactionStyle) {
|
||||
@ -6691,7 +6726,7 @@ TEST(DBTest, PrefixScan) {
|
||||
options.disable_auto_compactions = true;
|
||||
options.max_background_compactions = 2;
|
||||
options.create_if_missing = true;
|
||||
options.memtable_factory.reset(NewHashSkipListRepFactory());
|
||||
options.memtable_factory.reset(NewHashSkipListRepFactory(16));
|
||||
|
||||
// 11 RAND I/Os
|
||||
DestroyAndReopen(&options);
|
||||
@ -6848,7 +6883,7 @@ TEST(DBTest, TailingIteratorPrefixSeek) {
|
||||
options.create_if_missing = true;
|
||||
options.disable_auto_compactions = true;
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
|
||||
options.memtable_factory.reset(NewHashSkipListRepFactory());
|
||||
options.memtable_factory.reset(NewHashSkipListRepFactory(16));
|
||||
DestroyAndReopen(&options);
|
||||
CreateAndReopenWithCF({"pikachu"}, &options);
|
||||
|
||||
|
@ -297,6 +297,13 @@ class IterKey {
|
||||
parsed_key_suffix.sequence, parsed_key_suffix.type);
|
||||
}
|
||||
|
||||
void EncodeLengthPrefixedKey(const Slice& key) {
|
||||
auto size = key.size();
|
||||
EnlargeBufferIfNeeded(size + VarintLength(size));
|
||||
char* ptr = EncodeVarint32(key_, size);
|
||||
memcpy(ptr, key.data(), size);
|
||||
}
|
||||
|
||||
private:
|
||||
char* key_;
|
||||
size_t buf_size_;
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#include <ctype.h>
|
||||
#include <stdio.h>
|
||||
#include <vector>
|
||||
#include "db/dbformat.h"
|
||||
#include "rocksdb/env.h"
|
||||
#include "util/logging.h"
|
||||
@ -66,9 +67,28 @@ std::string ArchivedLogFileName(const std::string& name, uint64_t number) {
|
||||
return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log");
|
||||
}
|
||||
|
||||
std::string TableFileName(const std::string& name, uint64_t number) {
|
||||
std::string MakeTableFileName(const std::string& path, uint64_t number) {
|
||||
return MakeFileName(path, number, "sst");
|
||||
}
|
||||
|
||||
std::string TableFileName(const std::vector<std::string> db_paths,
|
||||
uint64_t number, uint32_t path_id) {
|
||||
assert(number > 0);
|
||||
return MakeFileName(name, number, "sst");
|
||||
std::string path;
|
||||
if (path_id >= db_paths.size()) {
|
||||
path = db_paths.back();
|
||||
} else {
|
||||
path = db_paths[path_id];
|
||||
}
|
||||
return MakeTableFileName(path, number);
|
||||
}
|
||||
|
||||
std::string FormatFileNumber(uint64_t number, uint32_t path_id) {
|
||||
if (path_id == 0) {
|
||||
return std::to_string(number);
|
||||
} else {
|
||||
return std::to_string(number) + "(path " + std::to_string(path_id) + ")";
|
||||
}
|
||||
}
|
||||
|
||||
std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
|
||||
|
@ -11,7 +11,9 @@
|
||||
|
||||
#pragma once
|
||||
#include <stdint.h>
|
||||
#include <unordered_map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "rocksdb/slice.h"
|
||||
#include "rocksdb/status.h"
|
||||
#include "rocksdb/transaction_log.h"
|
||||
@ -34,6 +36,9 @@ enum FileType {
|
||||
kIdentityFile
|
||||
};
|
||||
|
||||
// map from file number to path ID.
|
||||
typedef std::unordered_map<uint64_t, uint32_t> FileNumToPathIdMap;
|
||||
|
||||
// Return the name of the log file with the specified number
|
||||
// in the db named by "dbname". The result will be prefixed with
|
||||
// "dbname".
|
||||
@ -48,10 +53,15 @@ extern std::string ArchivalDirectory(const std::string& dbname);
|
||||
extern std::string ArchivedLogFileName(const std::string& dbname,
|
||||
uint64_t num);
|
||||
|
||||
extern std::string MakeTableFileName(const std::string& name, uint64_t number);
|
||||
|
||||
// Return the name of the sstable with the specified number
|
||||
// in the db named by "dbname". The result will be prefixed with
|
||||
// "dbname".
|
||||
extern std::string TableFileName(const std::string& dbname, uint64_t number);
|
||||
extern std::string TableFileName(const std::vector<std::string> db_paths,
|
||||
uint64_t number, uint32_t path_id);
|
||||
|
||||
extern std::string FormatFileNumber(uint64_t number, uint32_t path_id);
|
||||
|
||||
// Return the name of the descriptor file for the db named by
|
||||
// "dbname" and the specified incarnation number. The result will be
|
||||
|
@ -108,7 +108,9 @@ TEST(FileNameTest, Construction) {
|
||||
ASSERT_EQ(192U, number);
|
||||
ASSERT_EQ(kLogFile, type);
|
||||
|
||||
fname = TableFileName("bar", 200);
|
||||
fname = TableFileName({"bar"}, 200, 0);
|
||||
std::string fname1 = TableFileName({"foo", "bar"}, 200, 1);
|
||||
ASSERT_EQ(fname, fname1);
|
||||
ASSERT_EQ("bar/", std::string(fname.data(), 4));
|
||||
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
|
||||
ASSERT_EQ(200U, number);
|
||||
|
@ -51,7 +51,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
|
||||
for (int i = 0; i < num_base_files; i++) {
|
||||
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
|
||||
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
|
||||
vbase.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1);
|
||||
vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
|
||||
}
|
||||
ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu));
|
||||
}
|
||||
@ -61,7 +61,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
|
||||
vedit.DeleteFile(2, fnum);
|
||||
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
|
||||
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
|
||||
vedit.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1);
|
||||
vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
|
||||
vset->LogAndApply(default_cfd, &vedit, &mu);
|
||||
}
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
|
||||
assert(!should_flush_);
|
||||
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
|
||||
prefix_bloom_.reset(new DynamicBloom(
|
||||
&arena_,
|
||||
options.memtable_prefix_bloom_bits, options.bloom_locality,
|
||||
options.memtable_prefix_bloom_probes, nullptr,
|
||||
options.memtable_prefix_bloom_huge_page_tlb_size,
|
||||
|
@ -140,7 +140,7 @@ void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
|
||||
|
||||
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||
uint64_t file_number,
|
||||
std::set<uint64_t>* pending_outputs) {
|
||||
FileNumToPathIdMap* pending_outputs) {
|
||||
assert(!mems.empty());
|
||||
|
||||
// If the flush was not successful, then just reset state.
|
||||
@ -162,7 +162,7 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||
Status MemTableList::InstallMemtableFlushResults(
|
||||
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
|
||||
port::Mutex* mu, Logger* info_log, uint64_t file_number,
|
||||
std::set<uint64_t>& pending_outputs, autovector<MemTable*>* to_delete,
|
||||
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
|
||||
Directory* db_directory, LogBuffer* log_buffer) {
|
||||
mu->AssertHeld();
|
||||
|
||||
@ -219,7 +219,7 @@ Status MemTableList::InstallMemtableFlushResults(
|
||||
// has been written to a committed version so that other concurrently
|
||||
// executing compaction threads do not mistakenly assume that this
|
||||
// file is not live.
|
||||
pending_outputs.erase(m->file_number_);
|
||||
pending_outputs->erase(m->file_number_);
|
||||
if (m->Unref() != nullptr) {
|
||||
to_delete->push_back(m);
|
||||
}
|
||||
@ -233,7 +233,7 @@ Status MemTableList::InstallMemtableFlushResults(
|
||||
m->flush_in_progress_ = false;
|
||||
m->edit_.Clear();
|
||||
num_flush_not_started_++;
|
||||
pending_outputs.erase(m->file_number_);
|
||||
pending_outputs->erase(m->file_number_);
|
||||
m->file_number_ = 0;
|
||||
imm_flush_needed.Release_Store((void *)1);
|
||||
}
|
||||
|
@ -15,6 +15,7 @@
|
||||
#include "rocksdb/iterator.h"
|
||||
|
||||
#include "db/dbformat.h"
|
||||
#include "db/filename.h"
|
||||
#include "db/skiplist.h"
|
||||
#include "db/memtable.h"
|
||||
#include "rocksdb/db.h"
|
||||
@ -108,17 +109,14 @@ class MemTableList {
|
||||
// they can get picked up again on the next round of flush.
|
||||
void RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||
uint64_t file_number,
|
||||
std::set<uint64_t>* pending_outputs);
|
||||
FileNumToPathIdMap* pending_outputs);
|
||||
|
||||
// Commit a successful flush in the manifest file
|
||||
Status InstallMemtableFlushResults(ColumnFamilyData* cfd,
|
||||
const autovector<MemTable*>& m,
|
||||
VersionSet* vset, port::Mutex* mu,
|
||||
Logger* info_log, uint64_t file_number,
|
||||
std::set<uint64_t>& pending_outputs,
|
||||
autovector<MemTable*>* to_delete,
|
||||
Directory* db_directory,
|
||||
LogBuffer* log_buffer);
|
||||
Status InstallMemtableFlushResults(
|
||||
ColumnFamilyData* cfd, const autovector<MemTable*>& m, VersionSet* vset,
|
||||
port::Mutex* mu, Logger* info_log, uint64_t file_number,
|
||||
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
|
||||
Directory* db_directory, LogBuffer* log_buffer);
|
||||
|
||||
// New memtables are inserted at the front of the list.
|
||||
// Takes ownership of the referenced held on *m by the caller of Add().
|
||||
|
@ -62,7 +62,7 @@ class PlainTableDBTest {
|
||||
Options CurrentOptions() {
|
||||
Options options;
|
||||
options.table_factory.reset(NewPlainTableFactory(0, 2, 0.8, 3, 0, kPrefix));
|
||||
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true));
|
||||
options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true, 3));
|
||||
options.prefix_extractor.reset(NewFixedPrefixTransform(8));
|
||||
options.allow_mmap_reads = true;
|
||||
return options;
|
||||
|
@ -189,6 +189,10 @@ class PrefixTest {
|
||||
options.memtable_factory.reset(
|
||||
NewHashLinkListRepFactory(bucket_count, 2 * 1024 * 1024));
|
||||
return true;
|
||||
case kHashLinkListTriggerSkipList:
|
||||
options.memtable_factory.reset(
|
||||
NewHashLinkListRepFactory(bucket_count, 0, 3));
|
||||
return true;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
@ -208,6 +212,7 @@ class PrefixTest {
|
||||
kHashSkipList,
|
||||
kHashLinkList,
|
||||
kHashLinkListHugePageTlb,
|
||||
kHashLinkListTriggerSkipList,
|
||||
kEnd
|
||||
};
|
||||
int option_config_;
|
||||
|
91
db/repair.cc
91
db/repair.cc
@ -65,8 +65,8 @@ class Repairer {
|
||||
NewLRUCache(10, options_.table_cache_numshardbits,
|
||||
options_.table_cache_remove_scan_count_limit)),
|
||||
next_file_number_(1) {
|
||||
table_cache_ = new TableCache(dbname_, &options_, storage_options_,
|
||||
raw_table_cache_.get());
|
||||
table_cache_ =
|
||||
new TableCache(&options_, storage_options_, raw_table_cache_.get());
|
||||
edit_ = new VersionEdit();
|
||||
}
|
||||
|
||||
@ -116,7 +116,7 @@ class Repairer {
|
||||
VersionEdit* edit_;
|
||||
|
||||
std::vector<std::string> manifests_;
|
||||
std::vector<uint64_t> table_numbers_;
|
||||
std::vector<FileDescriptor> table_fds_;
|
||||
std::vector<uint64_t> logs_;
|
||||
std::vector<TableInfo> tables_;
|
||||
uint64_t next_file_number_;
|
||||
@ -124,35 +124,43 @@ class Repairer {
|
||||
|
||||
Status FindFiles() {
|
||||
std::vector<std::string> filenames;
|
||||
Status status = env_->GetChildren(dbname_, &filenames);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
if (filenames.empty()) {
|
||||
return Status::Corruption(dbname_, "repair found no files");
|
||||
}
|
||||
bool found_file = false;
|
||||
for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) {
|
||||
Status status = env_->GetChildren(options_.db_paths[path_id], &filenames);
|
||||
if (!status.ok()) {
|
||||
return status;
|
||||
}
|
||||
if (!filenames.empty()) {
|
||||
found_file = true;
|
||||
}
|
||||
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
for (size_t i = 0; i < filenames.size(); i++) {
|
||||
if (ParseFileName(filenames[i], &number, &type)) {
|
||||
if (type == kDescriptorFile) {
|
||||
manifests_.push_back(filenames[i]);
|
||||
} else {
|
||||
if (number + 1 > next_file_number_) {
|
||||
next_file_number_ = number + 1;
|
||||
}
|
||||
if (type == kLogFile) {
|
||||
logs_.push_back(number);
|
||||
} else if (type == kTableFile) {
|
||||
table_numbers_.push_back(number);
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
for (size_t i = 0; i < filenames.size(); i++) {
|
||||
if (ParseFileName(filenames[i], &number, &type)) {
|
||||
if (type == kDescriptorFile) {
|
||||
assert(path_id == 0);
|
||||
manifests_.push_back(filenames[i]);
|
||||
} else {
|
||||
// Ignore other files
|
||||
if (number + 1 > next_file_number_) {
|
||||
next_file_number_ = number + 1;
|
||||
}
|
||||
if (type == kLogFile) {
|
||||
assert(path_id == 0);
|
||||
logs_.push_back(number);
|
||||
} else if (type == kTableFile) {
|
||||
table_fds_.emplace_back(number, path_id, 0);
|
||||
} else {
|
||||
// Ignore other files
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return status;
|
||||
if (!found_file) {
|
||||
return Status::Corruption(dbname_, "repair found no files");
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void ConvertLogFilesToTables() {
|
||||
@ -228,7 +236,7 @@ class Repairer {
|
||||
// Do not record a version edit for this conversion to a Table
|
||||
// since ExtractMetaData() will also generate edits.
|
||||
FileMetaData meta;
|
||||
meta.fd.number = next_file_number_++;
|
||||
meta.fd = FileDescriptor(next_file_number_++, 0, 0);
|
||||
ReadOptions ro;
|
||||
Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */);
|
||||
status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_,
|
||||
@ -239,7 +247,7 @@ class Repairer {
|
||||
mem = nullptr;
|
||||
if (status.ok()) {
|
||||
if (meta.fd.GetFileSize() > 0) {
|
||||
table_numbers_.push_back(meta.fd.GetNumber());
|
||||
table_fds_.push_back(meta.fd);
|
||||
}
|
||||
}
|
||||
Log(options_.info_log,
|
||||
@ -249,14 +257,17 @@ class Repairer {
|
||||
}
|
||||
|
||||
void ExtractMetaData() {
|
||||
for (size_t i = 0; i < table_numbers_.size(); i++) {
|
||||
for (size_t i = 0; i < table_fds_.size(); i++) {
|
||||
TableInfo t;
|
||||
t.meta.fd.number = table_numbers_[i];
|
||||
t.meta.fd = table_fds_[i];
|
||||
Status status = ScanTable(&t);
|
||||
if (!status.ok()) {
|
||||
std::string fname = TableFileName(dbname_, table_numbers_[i]);
|
||||
Log(options_.info_log, "Table #%" PRIu64 ": ignoring %s",
|
||||
table_numbers_[i], status.ToString().c_str());
|
||||
std::string fname = TableFileName(
|
||||
options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
|
||||
Log(options_.info_log, "Table #%s: ignoring %s",
|
||||
FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId())
|
||||
.c_str(),
|
||||
status.ToString().c_str());
|
||||
ArchiveFile(fname);
|
||||
} else {
|
||||
tables_.push_back(t);
|
||||
@ -265,9 +276,13 @@ class Repairer {
|
||||
}
|
||||
|
||||
Status ScanTable(TableInfo* t) {
|
||||
std::string fname = TableFileName(dbname_, t->meta.fd.GetNumber());
|
||||
std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(),
|
||||
t->meta.fd.GetPathId());
|
||||
int counter = 0;
|
||||
Status status = env_->GetFileSize(fname, &t->meta.fd.file_size);
|
||||
uint64_t file_size;
|
||||
Status status = env_->GetFileSize(fname, &file_size);
|
||||
t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
|
||||
file_size);
|
||||
if (status.ok()) {
|
||||
Iterator* iter = table_cache_->NewIterator(
|
||||
ReadOptions(), storage_options_, icmp_, t->meta.fd);
|
||||
@ -330,9 +345,9 @@ class Repairer {
|
||||
for (size_t i = 0; i < tables_.size(); i++) {
|
||||
// TODO(opt): separate out into multiple levels
|
||||
const TableInfo& t = tables_[i];
|
||||
edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetFileSize(),
|
||||
t.meta.smallest, t.meta.largest, t.min_sequence,
|
||||
t.max_sequence);
|
||||
edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
|
||||
t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest,
|
||||
t.min_sequence, t.max_sequence);
|
||||
}
|
||||
|
||||
//fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
|
||||
|
@ -36,10 +36,10 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) {
|
||||
sizeof(*file_number));
|
||||
}
|
||||
|
||||
TableCache::TableCache(const std::string& dbname, const Options* options,
|
||||
TableCache::TableCache(const Options* options,
|
||||
const EnvOptions& storage_options, Cache* const cache)
|
||||
: env_(options->env),
|
||||
dbname_(dbname),
|
||||
db_paths_(options->db_paths),
|
||||
options_(options),
|
||||
storage_options_(storage_options),
|
||||
cache_(cache) {}
|
||||
@ -60,13 +60,15 @@ Status TableCache::FindTable(const EnvOptions& toptions,
|
||||
const FileDescriptor& fd, Cache::Handle** handle,
|
||||
const bool no_io) {
|
||||
Status s;
|
||||
Slice key = GetSliceForFileNumber(&fd.number);
|
||||
uint64_t number = fd.GetNumber();
|
||||
Slice key = GetSliceForFileNumber(&number);
|
||||
*handle = cache_->Lookup(key);
|
||||
if (*handle == nullptr) {
|
||||
if (no_io) { // Dont do IO and return a not-found status
|
||||
return Status::Incomplete("Table not found in table_cache, no_io is set");
|
||||
}
|
||||
std::string fname = TableFileName(dbname_, fd.GetNumber());
|
||||
std::string fname =
|
||||
TableFileName(db_paths_, fd.GetNumber(), fd.GetPathId());
|
||||
unique_ptr<RandomAccessFile> file;
|
||||
unique_ptr<TableReader> table_reader;
|
||||
s = env_->NewRandomAccessFile(fname, &file, toptions);
|
||||
|
@ -11,6 +11,7 @@
|
||||
|
||||
#pragma once
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "db/dbformat.h"
|
||||
@ -28,8 +29,8 @@ struct FileDescriptor;
|
||||
|
||||
class TableCache {
|
||||
public:
|
||||
TableCache(const std::string& dbname, const Options* options,
|
||||
const EnvOptions& storage_options, Cache* cache);
|
||||
TableCache(const Options* options, const EnvOptions& storage_options,
|
||||
Cache* cache);
|
||||
~TableCache();
|
||||
|
||||
// Return an iterator for the specified file number (the corresponding
|
||||
@ -84,7 +85,7 @@ class TableCache {
|
||||
|
||||
private:
|
||||
Env* const env_;
|
||||
const std::string dbname_;
|
||||
const std::vector<std::string> db_paths_;
|
||||
const Options* options_;
|
||||
const EnvOptions& storage_options_;
|
||||
Cache* const cache_;
|
||||
|
@ -18,25 +18,30 @@ namespace rocksdb {
|
||||
// Tag numbers for serialized VersionEdit. These numbers are written to
|
||||
// disk and should not be changed.
|
||||
enum Tag {
|
||||
kComparator = 1,
|
||||
kLogNumber = 2,
|
||||
kNextFileNumber = 3,
|
||||
kLastSequence = 4,
|
||||
kCompactPointer = 5,
|
||||
kDeletedFile = 6,
|
||||
kNewFile = 7,
|
||||
kComparator = 1,
|
||||
kLogNumber = 2,
|
||||
kNextFileNumber = 3,
|
||||
kLastSequence = 4,
|
||||
kCompactPointer = 5,
|
||||
kDeletedFile = 6,
|
||||
kNewFile = 7,
|
||||
// 8 was used for large value refs
|
||||
kPrevLogNumber = 9,
|
||||
kPrevLogNumber = 9,
|
||||
|
||||
// these are new formats divergent from open source leveldb
|
||||
kNewFile2 = 100, // store smallest & largest seqno
|
||||
|
||||
kColumnFamily = 200, // specify column family for version edit
|
||||
kColumnFamilyAdd = 201,
|
||||
kColumnFamilyDrop = 202,
|
||||
kMaxColumnFamily = 203,
|
||||
kNewFile2 = 100,
|
||||
kNewFile3 = 102,
|
||||
kColumnFamily = 200, // specify column family for version edit
|
||||
kColumnFamilyAdd = 201,
|
||||
kColumnFamilyDrop = 202,
|
||||
kMaxColumnFamily = 203,
|
||||
};
|
||||
|
||||
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
|
||||
assert(number <= kFileNumberMask);
|
||||
return number | (path_id * (kFileNumberMask + 1));
|
||||
}
|
||||
|
||||
void VersionEdit::Clear() {
|
||||
comparator_.clear();
|
||||
max_level_ = 0;
|
||||
@ -93,9 +98,18 @@ void VersionEdit::EncodeTo(std::string* dst) const {
|
||||
|
||||
for (size_t i = 0; i < new_files_.size(); i++) {
|
||||
const FileMetaData& f = new_files_[i].second;
|
||||
PutVarint32(dst, kNewFile2);
|
||||
if (f.fd.GetPathId() == 0) {
|
||||
// Use older format to make sure user can roll back the build if they
|
||||
// don't config multiple DB paths.
|
||||
PutVarint32(dst, kNewFile2);
|
||||
} else {
|
||||
PutVarint32(dst, kNewFile3);
|
||||
}
|
||||
PutVarint32(dst, new_files_[i].first); // level
|
||||
PutVarint64(dst, f.fd.GetNumber());
|
||||
if (f.fd.GetPathId() != 0) {
|
||||
PutVarint32(dst, f.fd.GetPathId());
|
||||
}
|
||||
PutVarint64(dst, f.fd.GetFileSize());
|
||||
PutLengthPrefixedSlice(dst, f.smallest.Encode());
|
||||
PutLengthPrefixedSlice(dst, f.largest.Encode());
|
||||
@ -237,7 +251,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
GetVarint64(&input, &file_size) &&
|
||||
GetInternalKey(&input, &f.smallest) &&
|
||||
GetInternalKey(&input, &f.largest)) {
|
||||
f.fd = FileDescriptor(number, file_size);
|
||||
f.fd = FileDescriptor(number, 0, file_size);
|
||||
new_files_.push_back(std::make_pair(level, f));
|
||||
} else {
|
||||
if (!msg) {
|
||||
@ -255,7 +269,27 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
||||
GetInternalKey(&input, &f.largest) &&
|
||||
GetVarint64(&input, &f.smallest_seqno) &&
|
||||
GetVarint64(&input, &f.largest_seqno)) {
|
||||
f.fd = FileDescriptor(number, file_size);
|
||||
f.fd = FileDescriptor(number, 0, file_size);
|
||||
new_files_.push_back(std::make_pair(level, f));
|
||||
} else {
|
||||
if (!msg) {
|
||||
msg = "new-file2 entry";
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case kNewFile3: {
|
||||
uint64_t number;
|
||||
uint32_t path_id;
|
||||
uint64_t file_size;
|
||||
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
|
||||
GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) &&
|
||||
GetInternalKey(&input, &f.smallest) &&
|
||||
GetInternalKey(&input, &f.largest) &&
|
||||
GetVarint64(&input, &f.smallest_seqno) &&
|
||||
GetVarint64(&input, &f.largest_seqno)) {
|
||||
f.fd = FileDescriptor(number, path_id, file_size);
|
||||
new_files_.push_back(std::make_pair(level, f));
|
||||
} else {
|
||||
if (!msg) {
|
||||
|
@ -19,21 +19,41 @@ namespace rocksdb {
|
||||
|
||||
class VersionSet;
|
||||
|
||||
const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF;
|
||||
|
||||
extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id);
|
||||
|
||||
// A copyable structure contains information needed to read data from an SST
|
||||
// file. It can contains a pointer to a table reader opened for the file, or
|
||||
// file number and size, which can be used to create a new table reader for it.
|
||||
// The behavior is undefined when a copied of the structure is used when the
|
||||
// file is not in any live version any more.
|
||||
struct FileDescriptor {
|
||||
uint64_t number;
|
||||
uint64_t file_size; // File size in bytes
|
||||
// Table reader in table_reader_handle
|
||||
TableReader* table_reader;
|
||||
uint64_t packed_number_and_path_id;
|
||||
uint64_t file_size; // File size in bytes
|
||||
|
||||
FileDescriptor(uint64_t number, uint64_t file_size)
|
||||
: number(number), file_size(file_size), table_reader(nullptr) {}
|
||||
FileDescriptor() : FileDescriptor(0, 0, 0) {}
|
||||
|
||||
uint64_t GetNumber() const { return number; }
|
||||
FileDescriptor(uint64_t number, uint32_t path_id, uint64_t file_size)
|
||||
: table_reader(nullptr),
|
||||
packed_number_and_path_id(PackFileNumberAndPathId(number, path_id)),
|
||||
file_size(file_size) {}
|
||||
|
||||
FileDescriptor& operator=(const FileDescriptor& fd) {
|
||||
table_reader = fd.table_reader;
|
||||
packed_number_and_path_id = fd.packed_number_and_path_id;
|
||||
file_size = fd.file_size;
|
||||
return *this;
|
||||
}
|
||||
|
||||
uint64_t GetNumber() const {
|
||||
return packed_number_and_path_id & kFileNumberMask;
|
||||
}
|
||||
uint32_t GetPathId() const {
|
||||
return packed_number_and_path_id / (kFileNumberMask + 1);
|
||||
}
|
||||
uint64_t GetFileSize() const { return file_size; }
|
||||
};
|
||||
|
||||
@ -58,7 +78,6 @@ struct FileMetaData {
|
||||
|
||||
FileMetaData()
|
||||
: refs(0),
|
||||
fd(0, 0),
|
||||
being_compacted(false),
|
||||
table_reader_handle(nullptr),
|
||||
compensated_file_size(0),
|
||||
@ -103,15 +122,13 @@ class VersionEdit {
|
||||
// Add the specified file at the specified number.
|
||||
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
|
||||
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
|
||||
void AddFile(int level, uint64_t file,
|
||||
uint64_t file_size,
|
||||
const InternalKey& smallest,
|
||||
const InternalKey& largest,
|
||||
const SequenceNumber& smallest_seqno,
|
||||
void AddFile(int level, uint64_t file, uint64_t file_size,
|
||||
uint64_t file_path_id, const InternalKey& smallest,
|
||||
const InternalKey& largest, const SequenceNumber& smallest_seqno,
|
||||
const SequenceNumber& largest_seqno) {
|
||||
assert(smallest_seqno <= largest_seqno);
|
||||
FileMetaData f;
|
||||
f.fd = FileDescriptor(file, file_size);
|
||||
f.fd = FileDescriptor(file, file_size, file_path_id);
|
||||
f.smallest = smallest;
|
||||
f.largest = largest;
|
||||
f.smallest_seqno = smallest_seqno;
|
||||
|
@ -30,11 +30,10 @@ TEST(VersionEditTest, EncodeDecode) {
|
||||
VersionEdit edit;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
TestEncodeDecode(edit);
|
||||
edit.AddFile(3, kBig + 300 + i, kBig + 400 + i,
|
||||
edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, 0,
|
||||
InternalKey("foo", kBig + 500 + i, kTypeValue),
|
||||
InternalKey("zoo", kBig + 600 + i, kTypeDeletion),
|
||||
kBig + 500 + i,
|
||||
kBig + 600 + i);
|
||||
kBig + 500 + i, kBig + 600 + i);
|
||||
edit.DeleteFile(4, kBig + 700 + i);
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <set>
|
||||
#include <climits>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
#include <stdio.h>
|
||||
|
||||
#include "db/filename.h"
|
||||
@ -171,7 +172,7 @@ class Version::LevelFileNumIterator : public Iterator {
|
||||
: icmp_(icmp),
|
||||
flist_(flist),
|
||||
index_(flist->size()),
|
||||
current_value_(0, 0) { // Marks as invalid
|
||||
current_value_(0, 0, 0) { // Marks as invalid
|
||||
}
|
||||
virtual bool Valid() const {
|
||||
return index_ < flist_->size();
|
||||
@ -276,7 +277,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
|
||||
*fname, &file, vset_->storage_options_);
|
||||
} else {
|
||||
s = options->env->NewRandomAccessFile(
|
||||
TableFileName(vset_->dbname_, file_meta->fd.GetNumber()),
|
||||
TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(),
|
||||
file_meta->fd.GetPathId()),
|
||||
&file, vset_->storage_options_);
|
||||
}
|
||||
if (!s.ok()) {
|
||||
@ -303,7 +305,9 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
|
||||
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
|
||||
for (int level = 0; level < num_levels_; level++) {
|
||||
for (const auto& file_meta : files_[level]) {
|
||||
auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber());
|
||||
auto fname =
|
||||
TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(),
|
||||
file_meta->fd.GetPathId());
|
||||
// 1. If the table is already present in table cache, load table
|
||||
// properties from there.
|
||||
std::shared_ptr<const TableProperties> table_properties;
|
||||
@ -861,7 +865,6 @@ void Version::ComputeCompactionScore(
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
// Compator that is used to sort files based on their size
|
||||
// In normal mode: descending size
|
||||
bool CompareCompensatedSizeDescending(const Version::Fsize& first,
|
||||
@ -869,18 +872,6 @@ bool CompareCompensatedSizeDescending(const Version::Fsize& first,
|
||||
return (first.file->compensated_file_size >
|
||||
second.file->compensated_file_size);
|
||||
}
|
||||
// A static compator used to sort files based on their seqno
|
||||
// In universal style : descending seqno
|
||||
bool CompareSeqnoDescending(const Version::Fsize& first,
|
||||
const Version::Fsize& second) {
|
||||
if (first.file->smallest_seqno > second.file->smallest_seqno) {
|
||||
assert(first.file->largest_seqno > second.file->largest_seqno);
|
||||
return true;
|
||||
}
|
||||
assert(first.file->largest_seqno <= second.file->largest_seqno);
|
||||
return false;
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
||||
void Version::UpdateNumNonEmptyLevels() {
|
||||
@ -895,19 +886,15 @@ void Version::UpdateNumNonEmptyLevels() {
|
||||
}
|
||||
|
||||
void Version::UpdateFilesBySize() {
|
||||
if (cfd_->options()->compaction_style == kCompactionStyleFIFO) {
|
||||
if (cfd_->options()->compaction_style == kCompactionStyleFIFO ||
|
||||
cfd_->options()->compaction_style == kCompactionStyleUniversal) {
|
||||
// don't need this
|
||||
return;
|
||||
}
|
||||
// No need to sort the highest level because it is never compacted.
|
||||
int max_level =
|
||||
(cfd_->options()->compaction_style == kCompactionStyleUniversal)
|
||||
? NumberLevels()
|
||||
: NumberLevels() - 1;
|
||||
|
||||
for (int level = 0; level < max_level; level++) {
|
||||
for (int level = 0; level < NumberLevels() - 1; level++) {
|
||||
const std::vector<FileMetaData*>& files = files_[level];
|
||||
std::vector<int>& files_by_size = files_by_size_[level];
|
||||
auto& files_by_size = files_by_size_[level];
|
||||
assert(files_by_size.size() == 0);
|
||||
|
||||
// populate a temp vector for sorting based on size
|
||||
@ -918,18 +905,12 @@ void Version::UpdateFilesBySize() {
|
||||
}
|
||||
|
||||
// sort the top number_of_files_to_sort_ based on file size
|
||||
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
|
||||
int num = temp.size();
|
||||
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
|
||||
CompareSeqnoDescending);
|
||||
} else {
|
||||
int num = Version::number_of_files_to_sort_;
|
||||
if (num > (int)temp.size()) {
|
||||
num = temp.size();
|
||||
}
|
||||
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
|
||||
CompareCompensatedSizeDescending);
|
||||
size_t num = Version::number_of_files_to_sort_;
|
||||
if (num > temp.size()) {
|
||||
num = temp.size();
|
||||
}
|
||||
std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
|
||||
CompareCompensatedSizeDescending);
|
||||
assert(temp.size() == files.size());
|
||||
|
||||
// initialize files_by_size_
|
||||
@ -1291,11 +1272,11 @@ int64_t Version::MaxNextLevelOverlappingBytes() {
|
||||
return result;
|
||||
}
|
||||
|
||||
void Version::AddLiveFiles(std::set<uint64_t>* live) {
|
||||
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
|
||||
for (int level = 0; level < NumberLevels(); level++) {
|
||||
const std::vector<FileMetaData*>& files = files_[level];
|
||||
for (const auto& file : files) {
|
||||
live->insert(file->fd.GetNumber());
|
||||
live->push_back(file->fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1448,7 +1429,7 @@ class VersionSet::Builder {
|
||||
#endif
|
||||
}
|
||||
|
||||
void CheckConsistencyForDeletes(VersionEdit* edit, unsigned int number,
|
||||
void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
|
||||
int level) {
|
||||
#ifndef NDEBUG
|
||||
// a file to be deleted better exist in the previous version
|
||||
@ -1490,6 +1471,9 @@ class VersionSet::Builder {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
fprintf(stderr, "not found %ld\n", number);
|
||||
}
|
||||
assert(found);
|
||||
#endif
|
||||
}
|
||||
@ -2183,17 +2167,15 @@ Status VersionSet::Recover(
|
||||
last_sequence_ = last_sequence;
|
||||
prev_log_number_ = prev_log_number;
|
||||
|
||||
Log(options_->info_log, "Recovered from manifest file:%s succeeded,"
|
||||
Log(options_->info_log,
|
||||
"Recovered from manifest file:%s succeeded,"
|
||||
"manifest_file_number is %lu, next_file_number is %lu, "
|
||||
"last_sequence is %lu, log_number is %lu,"
|
||||
"prev_log_number is %lu,"
|
||||
"max_column_family is %u\n",
|
||||
manifest_filename.c_str(),
|
||||
(unsigned long)manifest_file_number_,
|
||||
(unsigned long)next_file_number_,
|
||||
(unsigned long)last_sequence_,
|
||||
(unsigned long)log_number,
|
||||
(unsigned long)prev_log_number_,
|
||||
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
|
||||
(unsigned long)next_file_number_, (unsigned long)last_sequence_,
|
||||
(unsigned long)log_number, (unsigned long)prev_log_number_,
|
||||
column_family_set_->GetMaxColumnFamily());
|
||||
|
||||
for (auto cfd : *column_family_set_) {
|
||||
@ -2580,9 +2562,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
|
||||
|
||||
for (int level = 0; level < cfd->NumberLevels(); level++) {
|
||||
for (const auto& f : cfd->current()->files_[level]) {
|
||||
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetFileSize(),
|
||||
f->smallest, f->largest, f->smallest_seqno,
|
||||
f->largest_seqno);
|
||||
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||
f->smallest_seqno, f->largest_seqno);
|
||||
}
|
||||
}
|
||||
edit.SetLogNumber(cfd->GetLogNumber());
|
||||
@ -2664,7 +2646,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
|
||||
return result;
|
||||
}
|
||||
|
||||
void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
|
||||
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
|
||||
// pre-calculate space requirement
|
||||
int64_t total_files = 0;
|
||||
for (auto cfd : *column_family_set_) {
|
||||
@ -2686,7 +2668,7 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
|
||||
v = v->next_) {
|
||||
for (int level = 0; level < v->NumberLevels(); level++) {
|
||||
for (const auto& f : v->files_[level]) {
|
||||
live_list->push_back(f->fd.GetNumber());
|
||||
live_list->push_back(f->fd);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2809,7 +2791,14 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
||||
for (const auto& file : cfd->current()->files_[level]) {
|
||||
LiveFileMetaData filemetadata;
|
||||
filemetadata.column_family_name = cfd->GetName();
|
||||
filemetadata.name = TableFileName("", file->fd.GetNumber());
|
||||
uint32_t path_id = file->fd.GetPathId();
|
||||
if (path_id < options_->db_paths.size()) {
|
||||
filemetadata.db_path = options_->db_paths[path_id];
|
||||
} else {
|
||||
assert(!options_->db_paths.empty());
|
||||
filemetadata.db_path = options_->db_paths.back();
|
||||
}
|
||||
filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
|
||||
filemetadata.level = level;
|
||||
filemetadata.size = file->fd.GetFileSize();
|
||||
filemetadata.smallestkey = file->smallest.user_key().ToString();
|
||||
|
@ -188,7 +188,7 @@ class Version {
|
||||
int64_t MaxNextLevelOverlappingBytes();
|
||||
|
||||
// Add all files listed in the current version to *live.
|
||||
void AddLiveFiles(std::set<uint64_t>* live);
|
||||
void AddLiveFiles(std::vector<FileDescriptor>* live);
|
||||
|
||||
// Return a human readable string that describes this version's contents.
|
||||
std::string DebugString(bool hex = false) const;
|
||||
@ -294,7 +294,7 @@ class Version {
|
||||
// that on a running system, we need to look at only the first
|
||||
// few largest files because a new version is created every few
|
||||
// seconds/minutes (because of concurrent compactions).
|
||||
static const int number_of_files_to_sort_ = 50;
|
||||
static const size_t number_of_files_to_sort_ = 50;
|
||||
|
||||
// Level that should be compacted next and its compaction score.
|
||||
// Score < 1 means compaction is not strictly needed. These fields
|
||||
@ -399,7 +399,7 @@ class VersionSet {
|
||||
// Arrange to reuse "file_number" unless a newer file number has
|
||||
// already been allocated.
|
||||
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
|
||||
void ReuseFileNumber(uint64_t file_number) {
|
||||
void ReuseLogFileNumber(uint64_t file_number) {
|
||||
if (next_file_number_ == file_number + 1) {
|
||||
next_file_number_ = file_number;
|
||||
}
|
||||
@ -440,7 +440,7 @@ class VersionSet {
|
||||
Iterator* MakeInputIterator(Compaction* c);
|
||||
|
||||
// Add all files listed in any live version to *live.
|
||||
void AddLiveFiles(std::vector<uint64_t>* live_list);
|
||||
void AddLiveFiles(std::vector<FileDescriptor>* live_list);
|
||||
|
||||
// Return the approximate offset in the database of the data for
|
||||
// "key" as of version "v".
|
||||
|
@ -31,7 +31,7 @@ class FindFileTest {
|
||||
SequenceNumber smallest_seq = 100,
|
||||
SequenceNumber largest_seq = 100) {
|
||||
FileMetaData* f = new FileMetaData;
|
||||
f->fd = FileDescriptor(files_.size() + 1, 0);
|
||||
f->fd = FileDescriptor(files_.size() + 1, 0, 0);
|
||||
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
|
||||
f->largest = InternalKey(largest, largest_seq, kTypeValue);
|
||||
files_.push_back(f);
|
||||
|
@ -55,6 +55,7 @@ class Env;
|
||||
// Metadata associated with each SST file.
|
||||
struct LiveFileMetaData {
|
||||
std::string column_family_name; // Name of the column family
|
||||
std::string db_path;
|
||||
std::string name; // Name of the file
|
||||
int level; // Level at which this file resides.
|
||||
size_t size; // File size in bytes.
|
||||
|
@ -227,9 +227,10 @@ extern MemTableRepFactory* NewHashSkipListRepFactory(
|
||||
int32_t skiplist_branching_factor = 4
|
||||
);
|
||||
|
||||
// The factory is to create memtables with a hashed linked list:
|
||||
// it contains a fixed array of buckets, each pointing to a sorted single
|
||||
// linked list (null if the bucket is empty).
|
||||
// The factory is to create memtables based on a hash table:
|
||||
// it contains a fixed array of buckets, each pointing to either a linked list
|
||||
// or a skip list if number of entries inside the bucket exceeds
|
||||
// threshold_use_skiplist.
|
||||
// @bucket_count: number of fixed array buckets
|
||||
// @huge_page_tlb_size: if <=0, allocate the hash table bytes from malloc.
|
||||
// Otherwise from huge page TLB. The user needs to reserve
|
||||
@ -240,10 +241,13 @@ extern MemTableRepFactory* NewHashSkipListRepFactory(
|
||||
// exceeds this number, log about it.
|
||||
// @if_log_bucket_dist_when_flash: if true, log distribution of number of
|
||||
// entries when flushing.
|
||||
// @threshold_use_skiplist: a bucket switches to skip list if number of
|
||||
// entries exceed this parameter.
|
||||
extern MemTableRepFactory* NewHashLinkListRepFactory(
|
||||
size_t bucket_count = 50000, size_t huge_page_tlb_size = 0,
|
||||
int bucket_entries_logging_threshold = 4096,
|
||||
bool if_log_bucket_dist_when_flash = true);
|
||||
bool if_log_bucket_dist_when_flash = true,
|
||||
uint32_t threshold_use_skiplist = 256);
|
||||
|
||||
// This factory creates a cuckoo-hashing based mem-table representation.
|
||||
// Cuckoo-hash is a closed-hash strategy, in which all key/value pairs
|
||||
|
@ -675,6 +675,13 @@ struct DBOptions {
|
||||
// Default value is 1800 (half an hour).
|
||||
int db_stats_log_interval;
|
||||
|
||||
// A list paths where SST files can be put into. A compaction style can
|
||||
// determine which of those paths it will put the file to.
|
||||
// If left empty, only one path will be used, which is db_name passed when
|
||||
// opening the DB.
|
||||
// Default: empty
|
||||
std::vector<std::string> db_paths;
|
||||
|
||||
// This specifies the info LOG dir.
|
||||
// If it is empty, the log files will be in the same dir as data.
|
||||
// If it is non empty, the log files will be in the specified dir,
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
#include <stdint.h>
|
||||
#include <climits>
|
||||
#include <vector>
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -61,6 +62,7 @@ class CompactionOptionsUniversal {
|
||||
// well as the total size of C1...Ct as total_C, the compaction output file
|
||||
// will be compressed iff
|
||||
// total_C / total_size < this percentage
|
||||
// Default: -1
|
||||
int compression_size_percent;
|
||||
|
||||
// The algorithm used to stop picking files into a single compaction run
|
||||
@ -68,14 +70,13 @@ class CompactionOptionsUniversal {
|
||||
CompactionStopStyle stop_style;
|
||||
|
||||
// Default set of parameters
|
||||
CompactionOptionsUniversal() :
|
||||
size_ratio(1),
|
||||
min_merge_width(2),
|
||||
max_merge_width(UINT_MAX),
|
||||
max_size_amplification_percent(200),
|
||||
compression_size_percent(-1),
|
||||
stop_style(kCompactionStopStyleTotalSize) {
|
||||
}
|
||||
CompactionOptionsUniversal()
|
||||
: size_ratio(1),
|
||||
min_merge_width(2),
|
||||
max_merge_width(UINT_MAX),
|
||||
max_size_amplification_percent(200),
|
||||
compression_size_percent(-1),
|
||||
stop_style(kCompactionStopStyleTotalSize) {}
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -21,7 +21,7 @@ import org.rocksdb.util.Environment;
|
||||
public class RocksDB extends RocksObject {
|
||||
public static final int NOT_FOUND = -1;
|
||||
private static final String[] compressionLibs_ = {
|
||||
"snappy", "zlib", "bzip2", "lz4", "lz4hc"};
|
||||
"snappy", "z", "bzip2", "lz4", "lz4hc"};
|
||||
|
||||
/**
|
||||
* Loads the necessary library files.
|
||||
|
@ -463,7 +463,7 @@ public class DbBenchmark {
|
||||
if (compressionType_.equals("snappy")) {
|
||||
System.loadLibrary("snappy");
|
||||
} else if (compressionType_.equals("zlib")) {
|
||||
System.loadLibrary("zlib");
|
||||
System.loadLibrary("z");
|
||||
} else if (compressionType_.equals("bzip2")) {
|
||||
System.loadLibrary("bzip2");
|
||||
} else if (compressionType_.equals("lz4")) {
|
||||
|
@ -9,10 +9,12 @@
|
||||
|
||||
#include "port/port_posix.h"
|
||||
|
||||
#include <cstdlib>
|
||||
#include <stdio.h>
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
#include <sys/time.h>
|
||||
#include <string.h>
|
||||
#include <cstdlib>
|
||||
#include "util/logging.h"
|
||||
|
||||
namespace rocksdb {
|
||||
@ -83,6 +85,27 @@ void CondVar::Wait() {
|
||||
#endif
|
||||
}
|
||||
|
||||
bool CondVar::TimedWait(uint64_t abs_time_us) {
|
||||
struct timespec ts;
|
||||
ts.tv_sec = abs_time_us / 1000000;
|
||||
ts.tv_nsec = (abs_time_us % 1000000) * 1000;
|
||||
|
||||
#ifndef NDEBUG
|
||||
mu_->locked_ = false;
|
||||
#endif
|
||||
int err = pthread_cond_timedwait(&cv_, &mu_->mu_, &ts);
|
||||
#ifndef NDEBUG
|
||||
mu_->locked_ = true;
|
||||
#endif
|
||||
if (err == ETIMEDOUT) {
|
||||
return true;
|
||||
}
|
||||
if (err != 0) {
|
||||
PthreadCall("timedwait", err);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void CondVar::Signal() {
|
||||
PthreadCall("signal", pthread_cond_signal(&cv_));
|
||||
}
|
||||
|
@ -137,6 +137,8 @@ class CondVar {
|
||||
explicit CondVar(Mutex* mu);
|
||||
~CondVar();
|
||||
void Wait();
|
||||
// Timed condition wait. Returns true if timeout occurred.
|
||||
bool TimedWait(uint64_t abs_time_us);
|
||||
void Signal();
|
||||
void SignalAll();
|
||||
private:
|
||||
|
@ -333,7 +333,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes,
|
||||
uint32_t bloom_total_bits = num_prefixes * bloom_bits_per_key;
|
||||
if (bloom_total_bits > 0) {
|
||||
enable_bloom_ = true;
|
||||
bloom_.SetTotalBits(bloom_total_bits, options_.bloom_locality,
|
||||
bloom_.SetTotalBits(&arena_, bloom_total_bits, options_.bloom_locality,
|
||||
huge_page_tlb_size, options_.info_log.get());
|
||||
}
|
||||
}
|
||||
@ -465,7 +465,7 @@ Status PlainTableReader::PopulateIndex(TableProperties* props,
|
||||
table_properties_->num_entries * bloom_bits_per_key;
|
||||
if (num_bloom_bits > 0) {
|
||||
enable_bloom_ = true;
|
||||
bloom_.SetTotalBits(num_bloom_bits, options_.bloom_locality,
|
||||
bloom_.SetTotalBits(&arena_, num_bloom_bits, options_.bloom_locality,
|
||||
huge_page_tlb_size, options_.info_log.get());
|
||||
}
|
||||
}
|
||||
|
@ -1591,7 +1591,7 @@ class StressTest {
|
||||
}
|
||||
switch (FLAGS_rep_factory) {
|
||||
case kHashSkipList:
|
||||
options_.memtable_factory.reset(NewHashSkipListRepFactory());
|
||||
options_.memtable_factory.reset(NewHashSkipListRepFactory(10000));
|
||||
break;
|
||||
case kSkipList:
|
||||
// no need to do anything
|
||||
|
@ -32,12 +32,13 @@ uint32_t GetTotalBitsForLocality(uint32_t total_bits) {
|
||||
}
|
||||
}
|
||||
|
||||
DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t locality,
|
||||
DynamicBloom::DynamicBloom(Arena* arena, uint32_t total_bits, uint32_t locality,
|
||||
uint32_t num_probes,
|
||||
uint32_t (*hash_func)(const Slice& key),
|
||||
size_t huge_page_tlb_size, Logger* logger)
|
||||
size_t huge_page_tlb_size,
|
||||
Logger* logger)
|
||||
: DynamicBloom(num_probes, hash_func) {
|
||||
SetTotalBits(total_bits, locality, huge_page_tlb_size, logger);
|
||||
SetTotalBits(arena, total_bits, locality, huge_page_tlb_size, logger);
|
||||
}
|
||||
|
||||
DynamicBloom::DynamicBloom(uint32_t num_probes,
|
||||
@ -47,8 +48,10 @@ DynamicBloom::DynamicBloom(uint32_t num_probes,
|
||||
kNumProbes(num_probes),
|
||||
hash_func_(hash_func == nullptr ? &BloomHash : hash_func) {}
|
||||
|
||||
void DynamicBloom::SetTotalBits(uint32_t total_bits, uint32_t locality,
|
||||
size_t huge_page_tlb_size, Logger* logger) {
|
||||
void DynamicBloom::SetTotalBits(Arena* arena,
|
||||
uint32_t total_bits, uint32_t locality,
|
||||
size_t huge_page_tlb_size,
|
||||
Logger* logger) {
|
||||
kTotalBits = (locality > 0) ? GetTotalBitsForLocality(total_bits)
|
||||
: (total_bits + 7) / 8 * 8;
|
||||
kNumBlocks = (locality > 0) ? (kTotalBits / (CACHE_LINE_SIZE * 8)) : 0;
|
||||
@ -60,8 +63,9 @@ void DynamicBloom::SetTotalBits(uint32_t total_bits, uint32_t locality,
|
||||
if (kNumBlocks > 0) {
|
||||
sz += CACHE_LINE_SIZE - 1;
|
||||
}
|
||||
assert(arena);
|
||||
raw_ = reinterpret_cast<unsigned char*>(
|
||||
arena_.AllocateAligned(sz, huge_page_tlb_size, logger));
|
||||
arena->AllocateAligned(sz, huge_page_tlb_size, logger));
|
||||
memset(raw_, 0, sz);
|
||||
if (kNumBlocks > 0 && (reinterpret_cast<uint64_t>(raw_) % CACHE_LINE_SIZE)) {
|
||||
data_ = raw_ + CACHE_LINE_SIZE -
|
||||
|
@ -18,6 +18,7 @@ class Logger;
|
||||
|
||||
class DynamicBloom {
|
||||
public:
|
||||
// arena: pass arena to bloom filter, hence trace the usage of memory
|
||||
// total_bits: fixed total bits for the bloom
|
||||
// num_probes: number of hash probes for a single key
|
||||
// locality: If positive, optimize for cache line locality, 0 otherwise.
|
||||
@ -27,7 +28,8 @@ class DynamicBloom {
|
||||
// it to be allocated, like:
|
||||
// sysctl -w vm.nr_hugepages=20
|
||||
// See linux doc Documentation/vm/hugetlbpage.txt
|
||||
explicit DynamicBloom(uint32_t total_bits, uint32_t locality = 0,
|
||||
explicit DynamicBloom(Arena* arena,
|
||||
uint32_t total_bits, uint32_t locality = 0,
|
||||
uint32_t num_probes = 6,
|
||||
uint32_t (*hash_func)(const Slice& key) = nullptr,
|
||||
size_t huge_page_tlb_size = 0,
|
||||
@ -36,7 +38,7 @@ class DynamicBloom {
|
||||
explicit DynamicBloom(uint32_t num_probes = 6,
|
||||
uint32_t (*hash_func)(const Slice& key) = nullptr);
|
||||
|
||||
void SetTotalBits(uint32_t total_bits, uint32_t locality,
|
||||
void SetTotalBits(Arena* arena, uint32_t total_bits, uint32_t locality,
|
||||
size_t huge_page_tlb_size, Logger* logger);
|
||||
|
||||
~DynamicBloom() {}
|
||||
@ -63,8 +65,6 @@ class DynamicBloom {
|
||||
uint32_t (*hash_func_)(const Slice& key);
|
||||
unsigned char* data_;
|
||||
unsigned char* raw_;
|
||||
|
||||
Arena arena_;
|
||||
};
|
||||
|
||||
inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); }
|
||||
|
@ -40,17 +40,19 @@ class DynamicBloomTest {
|
||||
};
|
||||
|
||||
TEST(DynamicBloomTest, EmptyFilter) {
|
||||
DynamicBloom bloom1(100, 0, 2);
|
||||
Arena arena;
|
||||
DynamicBloom bloom1(&arena, 100, 0, 2);
|
||||
ASSERT_TRUE(!bloom1.MayContain("hello"));
|
||||
ASSERT_TRUE(!bloom1.MayContain("world"));
|
||||
|
||||
DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
|
||||
DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
|
||||
ASSERT_TRUE(!bloom2.MayContain("hello"));
|
||||
ASSERT_TRUE(!bloom2.MayContain("world"));
|
||||
}
|
||||
|
||||
TEST(DynamicBloomTest, Small) {
|
||||
DynamicBloom bloom1(100, 0, 2);
|
||||
Arena arena;
|
||||
DynamicBloom bloom1(&arena, 100, 0, 2);
|
||||
bloom1.Add("hello");
|
||||
bloom1.Add("world");
|
||||
ASSERT_TRUE(bloom1.MayContain("hello"));
|
||||
@ -58,7 +60,7 @@ TEST(DynamicBloomTest, Small) {
|
||||
ASSERT_TRUE(!bloom1.MayContain("x"));
|
||||
ASSERT_TRUE(!bloom1.MayContain("foo"));
|
||||
|
||||
DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
|
||||
DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2);
|
||||
bloom2.Add("hello");
|
||||
bloom2.Add("world");
|
||||
ASSERT_TRUE(bloom2.MayContain("hello"));
|
||||
@ -94,13 +96,14 @@ TEST(DynamicBloomTest, VaryingLengths) {
|
||||
for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) {
|
||||
for (uint32_t num = 1; num <= 10000; num = NextNum(num)) {
|
||||
uint32_t bloom_bits = 0;
|
||||
Arena arena;
|
||||
if (enable_locality == 0) {
|
||||
bloom_bits = std::max(num * FLAGS_bits_per_key, 64U);
|
||||
} else {
|
||||
bloom_bits = std::max(num * FLAGS_bits_per_key,
|
||||
enable_locality * CACHE_LINE_SIZE * 8);
|
||||
}
|
||||
DynamicBloom bloom(bloom_bits, enable_locality, num_probes);
|
||||
DynamicBloom bloom(&arena, bloom_bits, enable_locality, num_probes);
|
||||
for (uint64_t i = 0; i < num; i++) {
|
||||
bloom.Add(Key(i, buffer));
|
||||
ASSERT_TRUE(bloom.MayContain(Key(i, buffer)));
|
||||
@ -148,10 +151,11 @@ TEST(DynamicBloomTest, perf) {
|
||||
}
|
||||
|
||||
for (uint64_t m = 1; m <= 8; ++m) {
|
||||
Arena arena;
|
||||
const uint64_t num_keys = m * 8 * 1024 * 1024;
|
||||
fprintf(stderr, "testing %" PRIu64 "M keys\n", m * 8);
|
||||
|
||||
DynamicBloom std_bloom(num_keys * 10, 0, num_probes);
|
||||
DynamicBloom std_bloom(&arena, num_keys * 10, 0, num_probes);
|
||||
|
||||
timer.Start();
|
||||
for (uint64_t i = 1; i <= num_keys; ++i) {
|
||||
@ -175,7 +179,7 @@ TEST(DynamicBloomTest, perf) {
|
||||
ASSERT_TRUE(count == num_keys);
|
||||
|
||||
// Locality enabled version
|
||||
DynamicBloom blocked_bloom(num_keys * 10, 1, num_probes);
|
||||
DynamicBloom blocked_bloom(&arena, num_keys * 10, 1, num_probes);
|
||||
|
||||
timer.Start();
|
||||
for (uint64_t i = 1; i <= num_keys; ++i) {
|
||||
|
@ -821,7 +821,7 @@ class PosixWritableFile : public WritableFile {
|
||||
}
|
||||
}
|
||||
|
||||
virtual Status RangeSync(off64_t offset, off64_t nbytes) {
|
||||
virtual Status RangeSync(off_t offset, off_t nbytes) {
|
||||
if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) {
|
||||
return Status::OK();
|
||||
} else {
|
||||
|
@ -7,6 +7,7 @@
|
||||
#ifndef ROCKSDB_LITE
|
||||
#include "util/hash_linklist_rep.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include "rocksdb/memtablerep.h"
|
||||
#include "util/arena.h"
|
||||
#include "rocksdb/slice.h"
|
||||
@ -22,6 +23,31 @@ namespace rocksdb {
|
||||
namespace {
|
||||
|
||||
typedef const char* Key;
|
||||
typedef SkipList<Key, const MemTableRep::KeyComparator&> MemtableSkipList;
|
||||
typedef port::AtomicPointer Pointer;
|
||||
|
||||
// A data structure used as the header of a link list of a hash bucket.
|
||||
struct BucketHeader {
|
||||
Pointer next;
|
||||
uint32_t num_entries;
|
||||
|
||||
explicit BucketHeader(void* n, uint32_t count)
|
||||
: next(n), num_entries(count) {}
|
||||
|
||||
bool IsSkipListBucket() { return next.NoBarrier_Load() == this; }
|
||||
};
|
||||
|
||||
// A data structure used as the header of a skip list of a hash bucket.
|
||||
struct SkipListBucketHeader {
|
||||
BucketHeader Counting_header;
|
||||
MemtableSkipList skip_list;
|
||||
|
||||
explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp,
|
||||
Arena* arena, uint32_t count)
|
||||
: Counting_header(this, // Pointing to itself to indicate header type.
|
||||
count),
|
||||
skip_list(cmp, arena) {}
|
||||
};
|
||||
|
||||
struct Node {
|
||||
// Accessors/mutators for links. Wrapped in methods so we can
|
||||
@ -51,12 +77,75 @@ struct Node {
|
||||
char key[0];
|
||||
};
|
||||
|
||||
// Memory structure of the mem table:
|
||||
// It is a hash table, each bucket points to one entry, a linked list or a
|
||||
// skip list. In order to track total number of records in a bucket to determine
|
||||
// whether should switch to skip list, a header is added just to indicate
|
||||
// number of entries in the bucket.
|
||||
//
|
||||
//
|
||||
// +-----> NULL Case 1. Empty bucket
|
||||
// |
|
||||
// |
|
||||
// | +---> +-------+
|
||||
// | | | Next +--> NULL
|
||||
// | | +-------+
|
||||
// +-----+ | | | | Case 2. One Entry in bucket.
|
||||
// | +-+ | | Data | next pointer points to
|
||||
// +-----+ | | | NULL. All other cases
|
||||
// | | | | | next pointer is not NULL.
|
||||
// +-----+ | +-------+
|
||||
// | +---+
|
||||
// +-----+ +-> +-------+ +> +-------+ +-> +-------+
|
||||
// | | | | Next +--+ | Next +--+ | Next +-->NULL
|
||||
// +-----+ | +-------+ +-------+ +-------+
|
||||
// | +-----+ | Count | | | | |
|
||||
// +-----+ +-------+ | Data | | Data |
|
||||
// | | | | | |
|
||||
// +-----+ Case 3. | | | |
|
||||
// | | A header +-------+ +-------+
|
||||
// +-----+ points to
|
||||
// | | a linked list. Count indicates total number
|
||||
// +-----+ of rows in this bucket.
|
||||
// | |
|
||||
// +-----+ +-> +-------+ <--+
|
||||
// | | | | Next +----+
|
||||
// +-----+ | +-------+ Case 4. A header points to a skip
|
||||
// | +----+ | Count | list and next pointer points to
|
||||
// +-----+ +-------+ itself, to distinguish case 3 or 4.
|
||||
// | | | | Count still is kept to indicates total
|
||||
// +-----+ | Skip +--> of entries in the bucket for debugging
|
||||
// | | | List | Data purpose.
|
||||
// | | | +-->
|
||||
// +-----+ | |
|
||||
// | | +-------+
|
||||
// +-----+
|
||||
//
|
||||
// We don't have data race when changing cases because:
|
||||
// (1) When changing from case 2->3, we create a new bucket header, put the
|
||||
// single node there first without changing the original node, and do a
|
||||
// release store when changing the bucket pointer. In that case, a reader
|
||||
// who sees a stale value of the bucket pointer will read this node, while
|
||||
// a reader sees the correct value because of the release store.
|
||||
// (2) When changing case 3->4, a new header is created with skip list points
|
||||
// to the data, before doing an acquire store to change the bucket pointer.
|
||||
// The old header and nodes are never changed, so any reader sees any
|
||||
// of those existing pointers will guarantee to be able to iterate to the
|
||||
// end of the linked list.
|
||||
// (3) Header's next pointer in case 3 might change, but they are never equal
|
||||
// to itself, so no matter a reader sees any stale or newer value, it will
|
||||
// be able to correctly distinguish case 3 and 4.
|
||||
//
|
||||
// The reason that we use case 2 is we want to make the format to be efficient
|
||||
// when the utilization of buckets is relatively low. If we use case 3 for
|
||||
// single entry bucket, we will need to waste 12 bytes for every entry,
|
||||
// which can be significant decrease of memory utilization.
|
||||
class HashLinkListRep : public MemTableRep {
|
||||
public:
|
||||
HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena,
|
||||
const SliceTransform* transform, size_t bucket_size,
|
||||
size_t huge_page_tlb_size, Logger* logger,
|
||||
int bucket_entries_logging_threshold,
|
||||
uint32_t threshold_use_skiplist, size_t huge_page_tlb_size,
|
||||
Logger* logger, int bucket_entries_logging_threshold,
|
||||
bool if_log_bucket_dist_when_flash);
|
||||
|
||||
virtual KeyHandle Allocate(const size_t len, char** buf) override;
|
||||
@ -80,7 +169,6 @@ class HashLinkListRep : public MemTableRep {
|
||||
|
||||
private:
|
||||
friend class DynamicIterator;
|
||||
typedef SkipList<const char*, const MemTableRep::KeyComparator&> FullList;
|
||||
|
||||
size_t bucket_size_;
|
||||
|
||||
@ -88,6 +176,8 @@ class HashLinkListRep : public MemTableRep {
|
||||
// the same transform.
|
||||
port::AtomicPointer* buckets_;
|
||||
|
||||
const uint32_t threshold_use_skiplist_;
|
||||
|
||||
// The user-supplied transform whose domain is the user keys.
|
||||
const SliceTransform* transform_;
|
||||
|
||||
@ -97,7 +187,12 @@ class HashLinkListRep : public MemTableRep {
|
||||
int bucket_entries_logging_threshold_;
|
||||
bool if_log_bucket_dist_when_flash_;
|
||||
|
||||
bool BucketContains(Node* head, const Slice& key) const;
|
||||
bool LinkListContains(Node* head, const Slice& key) const;
|
||||
|
||||
SkipListBucketHeader* GetSkipListBucketHeader(Pointer* first_next_pointer)
|
||||
const;
|
||||
|
||||
Node* GetLinkListFirstNode(Pointer* first_next_pointer) const;
|
||||
|
||||
Slice GetPrefix(const Slice& internal_key) const {
|
||||
return transform_->Transform(ExtractUserKey(internal_key));
|
||||
@ -107,11 +202,11 @@ class HashLinkListRep : public MemTableRep {
|
||||
return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_;
|
||||
}
|
||||
|
||||
Node* GetBucket(size_t i) const {
|
||||
return static_cast<Node*>(buckets_[i].Acquire_Load());
|
||||
Pointer* GetBucket(size_t i) const {
|
||||
return static_cast<Pointer*>(buckets_[i].Acquire_Load());
|
||||
}
|
||||
|
||||
Node* GetBucket(const Slice& slice) const {
|
||||
Pointer* GetBucket(const Slice& slice) const {
|
||||
return GetBucket(GetHash(slice));
|
||||
}
|
||||
|
||||
@ -119,7 +214,6 @@ class HashLinkListRep : public MemTableRep {
|
||||
return (compare_(b, a) == 0);
|
||||
}
|
||||
|
||||
|
||||
bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); }
|
||||
|
||||
bool KeyIsAfterNode(const Slice& internal_key, const Node* n) const {
|
||||
@ -137,8 +231,8 @@ class HashLinkListRep : public MemTableRep {
|
||||
|
||||
class FullListIterator : public MemTableRep::Iterator {
|
||||
public:
|
||||
explicit FullListIterator(FullList* list, Arena* arena)
|
||||
: iter_(list), full_list_(list), arena_(arena) {}
|
||||
explicit FullListIterator(MemtableSkipList* list, Arena* arena)
|
||||
: iter_(list), full_list_(list), arena_(arena) {}
|
||||
|
||||
virtual ~FullListIterator() {
|
||||
}
|
||||
@ -189,22 +283,22 @@ class HashLinkListRep : public MemTableRep {
|
||||
iter_.SeekToLast();
|
||||
}
|
||||
private:
|
||||
FullList::Iterator iter_;
|
||||
MemtableSkipList::Iterator iter_;
|
||||
// To destruct with the iterator.
|
||||
std::unique_ptr<FullList> full_list_;
|
||||
std::unique_ptr<MemtableSkipList> full_list_;
|
||||
std::unique_ptr<Arena> arena_;
|
||||
std::string tmp_; // For passing to EncodeKey
|
||||
};
|
||||
|
||||
class Iterator : public MemTableRep::Iterator {
|
||||
class LinkListIterator : public MemTableRep::Iterator {
|
||||
public:
|
||||
explicit Iterator(const HashLinkListRep* const hash_link_list_rep,
|
||||
Node* head) :
|
||||
hash_link_list_rep_(hash_link_list_rep), head_(head), node_(nullptr) {
|
||||
}
|
||||
explicit LinkListIterator(const HashLinkListRep* const hash_link_list_rep,
|
||||
Node* head)
|
||||
: hash_link_list_rep_(hash_link_list_rep),
|
||||
head_(head),
|
||||
node_(nullptr) {}
|
||||
|
||||
virtual ~Iterator() {
|
||||
}
|
||||
virtual ~LinkListIterator() {}
|
||||
|
||||
// Returns true iff the iterator is positioned at a valid node.
|
||||
virtual bool Valid() const {
|
||||
@ -271,22 +365,68 @@ class HashLinkListRep : public MemTableRep {
|
||||
}
|
||||
};
|
||||
|
||||
class DynamicIterator : public HashLinkListRep::Iterator {
|
||||
class DynamicIterator : public HashLinkListRep::LinkListIterator {
|
||||
public:
|
||||
explicit DynamicIterator(HashLinkListRep& memtable_rep)
|
||||
: HashLinkListRep::Iterator(&memtable_rep, nullptr),
|
||||
memtable_rep_(memtable_rep) {}
|
||||
: HashLinkListRep::LinkListIterator(&memtable_rep, nullptr),
|
||||
memtable_rep_(memtable_rep) {}
|
||||
|
||||
// Advance to the first entry with a key >= target
|
||||
virtual void Seek(const Slice& k, const char* memtable_key) {
|
||||
auto transformed = memtable_rep_.GetPrefix(k);
|
||||
Reset(memtable_rep_.GetBucket(transformed));
|
||||
HashLinkListRep::Iterator::Seek(k, memtable_key);
|
||||
auto* bucket = memtable_rep_.GetBucket(transformed);
|
||||
|
||||
SkipListBucketHeader* skip_list_header =
|
||||
memtable_rep_.GetSkipListBucketHeader(bucket);
|
||||
if (skip_list_header != nullptr) {
|
||||
// The bucket is organized as a skip list
|
||||
if (!skip_list_iter_) {
|
||||
skip_list_iter_.reset(
|
||||
new MemtableSkipList::Iterator(&skip_list_header->skip_list));
|
||||
} else {
|
||||
skip_list_iter_->SetList(&skip_list_header->skip_list);
|
||||
}
|
||||
if (memtable_key != nullptr) {
|
||||
skip_list_iter_->Seek(memtable_key);
|
||||
} else {
|
||||
IterKey encoded_key;
|
||||
encoded_key.EncodeLengthPrefixedKey(k);
|
||||
skip_list_iter_->Seek(encoded_key.GetKey().data());
|
||||
}
|
||||
} else {
|
||||
// The bucket is organized as a linked list
|
||||
skip_list_iter_.reset();
|
||||
Reset(memtable_rep_.GetLinkListFirstNode(bucket));
|
||||
HashLinkListRep::LinkListIterator::Seek(k, memtable_key);
|
||||
}
|
||||
}
|
||||
|
||||
virtual bool Valid() const {
|
||||
if (skip_list_iter_) {
|
||||
return skip_list_iter_->Valid();
|
||||
}
|
||||
return HashLinkListRep::LinkListIterator::Valid();
|
||||
}
|
||||
|
||||
virtual const char* key() const {
|
||||
if (skip_list_iter_) {
|
||||
return skip_list_iter_->key();
|
||||
}
|
||||
return HashLinkListRep::LinkListIterator::key();
|
||||
}
|
||||
|
||||
virtual void Next() {
|
||||
if (skip_list_iter_) {
|
||||
skip_list_iter_->Next();
|
||||
} else {
|
||||
HashLinkListRep::LinkListIterator::Next();
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
// the underlying memtable
|
||||
const HashLinkListRep& memtable_rep_;
|
||||
std::unique_ptr<MemtableSkipList::Iterator> skip_list_iter_;
|
||||
};
|
||||
|
||||
class EmptyIterator : public MemTableRep::Iterator {
|
||||
@ -312,12 +452,16 @@ class HashLinkListRep : public MemTableRep {
|
||||
|
||||
HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare,
|
||||
Arena* arena, const SliceTransform* transform,
|
||||
size_t bucket_size, size_t huge_page_tlb_size,
|
||||
Logger* logger,
|
||||
size_t bucket_size,
|
||||
uint32_t threshold_use_skiplist,
|
||||
size_t huge_page_tlb_size, Logger* logger,
|
||||
int bucket_entries_logging_threshold,
|
||||
bool if_log_bucket_dist_when_flash)
|
||||
: MemTableRep(arena),
|
||||
bucket_size_(bucket_size),
|
||||
// Threshold to use skip list doesn't make sense if less than 3, so we
|
||||
// force it to be minimum of 3 to simplify implementation.
|
||||
threshold_use_skiplist_(std::max(threshold_use_skiplist, 3U)),
|
||||
transform_(transform),
|
||||
compare_(compare),
|
||||
logger_(logger),
|
||||
@ -343,53 +487,161 @@ KeyHandle HashLinkListRep::Allocate(const size_t len, char** buf) {
|
||||
return static_cast<void*>(x);
|
||||
}
|
||||
|
||||
SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader(
|
||||
Pointer* first_next_pointer) const {
|
||||
if (first_next_pointer == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
if (first_next_pointer->NoBarrier_Load() == nullptr) {
|
||||
// Single entry bucket
|
||||
return nullptr;
|
||||
}
|
||||
// Counting header
|
||||
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
||||
if (header->IsSkipListBucket()) {
|
||||
assert(header->num_entries > threshold_use_skiplist_);
|
||||
auto* skip_list_bucket_header =
|
||||
reinterpret_cast<SkipListBucketHeader*>(header);
|
||||
assert(skip_list_bucket_header->Counting_header.next.NoBarrier_Load() ==
|
||||
header);
|
||||
return skip_list_bucket_header;
|
||||
}
|
||||
assert(header->num_entries <= threshold_use_skiplist_);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const {
|
||||
if (first_next_pointer == nullptr) {
|
||||
return nullptr;
|
||||
}
|
||||
if (first_next_pointer->NoBarrier_Load() == nullptr) {
|
||||
// Single entry bucket
|
||||
return reinterpret_cast<Node*>(first_next_pointer);
|
||||
}
|
||||
// Counting header
|
||||
BucketHeader* header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
||||
if (!header->IsSkipListBucket()) {
|
||||
assert(header->num_entries <= threshold_use_skiplist_);
|
||||
return reinterpret_cast<Node*>(header->next.NoBarrier_Load());
|
||||
}
|
||||
assert(header->num_entries > threshold_use_skiplist_);
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void HashLinkListRep::Insert(KeyHandle handle) {
|
||||
Node* x = static_cast<Node*>(handle);
|
||||
assert(!Contains(x->key));
|
||||
Slice internal_key = GetLengthPrefixedSlice(x->key);
|
||||
auto transformed = GetPrefix(internal_key);
|
||||
auto& bucket = buckets_[GetHash(transformed)];
|
||||
Node* head = static_cast<Node*>(bucket.Acquire_Load());
|
||||
Pointer* first_next_pointer = static_cast<Pointer*>(bucket.NoBarrier_Load());
|
||||
|
||||
if (!head) {
|
||||
if (first_next_pointer == nullptr) {
|
||||
// Case 1. empty bucket
|
||||
// NoBarrier_SetNext() suffices since we will add a barrier when
|
||||
// we publish a pointer to "x" in prev[i].
|
||||
x->NoBarrier_SetNext(nullptr);
|
||||
bucket.Release_Store(static_cast<void*>(x));
|
||||
bucket.Release_Store(x);
|
||||
return;
|
||||
}
|
||||
|
||||
Node* cur = head;
|
||||
Node* prev = nullptr;
|
||||
while (true) {
|
||||
if (cur == nullptr) {
|
||||
break;
|
||||
}
|
||||
Node* next = cur->Next();
|
||||
// Make sure the lists are sorted.
|
||||
// If x points to head_ or next points nullptr, it is trivially satisfied.
|
||||
assert((cur == head) || (next == nullptr) ||
|
||||
KeyIsAfterNode(next->key, cur));
|
||||
if (KeyIsAfterNode(internal_key, cur)) {
|
||||
// Keep searching in this list
|
||||
prev = cur;
|
||||
cur = next;
|
||||
} else {
|
||||
break;
|
||||
BucketHeader* header = nullptr;
|
||||
if (first_next_pointer->NoBarrier_Load() == nullptr) {
|
||||
// Case 2. only one entry in the bucket
|
||||
// Need to convert to a Counting bucket and turn to case 4.
|
||||
Node* first = reinterpret_cast<Node*>(first_next_pointer);
|
||||
// Need to add a bucket header.
|
||||
// We have to first convert it to a bucket with header before inserting
|
||||
// the new node. Otherwise, we might need to change next pointer of first.
|
||||
// In that case, a reader might sees the next pointer is NULL and wrongly
|
||||
// think the node is a bucket header.
|
||||
auto* mem = arena_->AllocateAligned(sizeof(BucketHeader));
|
||||
header = new (mem) BucketHeader(first, 1);
|
||||
bucket.Release_Store(header);
|
||||
} else {
|
||||
header = reinterpret_cast<BucketHeader*>(first_next_pointer);
|
||||
if (header->IsSkipListBucket()) {
|
||||
// Case 4. Bucket is already a skip list
|
||||
assert(header->num_entries > threshold_use_skiplist_);
|
||||
auto* skip_list_bucket_header =
|
||||
reinterpret_cast<SkipListBucketHeader*>(header);
|
||||
skip_list_bucket_header->Counting_header.num_entries++;
|
||||
skip_list_bucket_header->skip_list.Insert(x->key);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Our data structure does not allow duplicate insertion
|
||||
assert(cur == nullptr || !Equal(x->key, cur->key));
|
||||
if (bucket_entries_logging_threshold_ > 0 &&
|
||||
header->num_entries ==
|
||||
static_cast<uint32_t>(bucket_entries_logging_threshold_)) {
|
||||
Info(logger_,
|
||||
"HashLinkedList bucket %zu has more than %d "
|
||||
"entries. Key to insert: %s",
|
||||
GetHash(transformed), header->num_entries,
|
||||
GetLengthPrefixedSlice(x->key).ToString(true).c_str());
|
||||
}
|
||||
|
||||
// NoBarrier_SetNext() suffices since we will add a barrier when
|
||||
// we publish a pointer to "x" in prev[i].
|
||||
x->NoBarrier_SetNext(cur);
|
||||
if (header->num_entries == threshold_use_skiplist_) {
|
||||
// Case 3. number of entries reaches the threshold so need to convert to
|
||||
// skip list.
|
||||
LinkListIterator bucket_iter(
|
||||
this, reinterpret_cast<Node*>(first_next_pointer->NoBarrier_Load()));
|
||||
auto mem = arena_->AllocateAligned(sizeof(SkipListBucketHeader));
|
||||
SkipListBucketHeader* new_skip_list_header = new (mem)
|
||||
SkipListBucketHeader(compare_, arena_, header->num_entries + 1);
|
||||
auto& skip_list = new_skip_list_header->skip_list;
|
||||
|
||||
if (prev) {
|
||||
prev->SetNext(x);
|
||||
// Add all current entries to the skip list
|
||||
for (bucket_iter.SeekToHead(); bucket_iter.Valid(); bucket_iter.Next()) {
|
||||
skip_list.Insert(bucket_iter.key());
|
||||
}
|
||||
|
||||
// insert the new entry
|
||||
skip_list.Insert(x->key);
|
||||
// Set the bucket
|
||||
bucket.Release_Store(new_skip_list_header);
|
||||
} else {
|
||||
bucket.Release_Store(static_cast<void*>(x));
|
||||
// Case 5. Need to insert to the sorted linked list without changing the
|
||||
// header.
|
||||
Node* first = reinterpret_cast<Node*>(header->next.NoBarrier_Load());
|
||||
assert(first != nullptr);
|
||||
// Advance counter unless the bucket needs to be advanced to skip list.
|
||||
// In that case, we need to make sure the previous count never exceeds
|
||||
// threshold_use_skiplist_ to avoid readers to cast to wrong format.
|
||||
header->num_entries++;
|
||||
|
||||
Node* cur = first;
|
||||
Node* prev = nullptr;
|
||||
while (true) {
|
||||
if (cur == nullptr) {
|
||||
break;
|
||||
}
|
||||
Node* next = cur->Next();
|
||||
// Make sure the lists are sorted.
|
||||
// If x points to head_ or next points nullptr, it is trivially satisfied.
|
||||
assert((cur == first) || (next == nullptr) ||
|
||||
KeyIsAfterNode(next->key, cur));
|
||||
if (KeyIsAfterNode(internal_key, cur)) {
|
||||
// Keep searching in this list
|
||||
prev = cur;
|
||||
cur = next;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Our data structure does not allow duplicate insertion
|
||||
assert(cur == nullptr || !Equal(x->key, cur->key));
|
||||
|
||||
// NoBarrier_SetNext() suffices since we will add a barrier when
|
||||
// we publish a pointer to "x" in prev[i].
|
||||
x->NoBarrier_SetNext(cur);
|
||||
|
||||
if (prev) {
|
||||
prev->SetNext(x);
|
||||
} else {
|
||||
header->next.Release_Store(static_cast<void*>(x));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -401,7 +653,13 @@ bool HashLinkListRep::Contains(const char* key) const {
|
||||
if (bucket == nullptr) {
|
||||
return false;
|
||||
}
|
||||
return BucketContains(bucket, internal_key);
|
||||
|
||||
SkipListBucketHeader* skip_list_header = GetSkipListBucketHeader(bucket);
|
||||
if (skip_list_header != nullptr) {
|
||||
return skip_list_header->skip_list.Contains(key);
|
||||
} else {
|
||||
return LinkListContains(GetLinkListFirstNode(bucket), internal_key);
|
||||
}
|
||||
}
|
||||
|
||||
size_t HashLinkListRep::ApproximateMemoryUsage() {
|
||||
@ -413,37 +671,53 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args,
|
||||
bool (*callback_func)(void* arg, const char* entry)) {
|
||||
auto transformed = transform_->Transform(k.user_key());
|
||||
auto bucket = GetBucket(transformed);
|
||||
if (bucket != nullptr) {
|
||||
Iterator iter(this, bucket);
|
||||
for (iter.Seek(k.internal_key(), nullptr);
|
||||
|
||||
auto* skip_list_header = GetSkipListBucketHeader(bucket);
|
||||
if (skip_list_header != nullptr) {
|
||||
// Is a skip list
|
||||
MemtableSkipList::Iterator iter(&skip_list_header->skip_list);
|
||||
for (iter.Seek(k.memtable_key().data());
|
||||
iter.Valid() && callback_func(callback_args, iter.key());
|
||||
iter.Next()) {
|
||||
}
|
||||
} else {
|
||||
auto* link_list_head = GetLinkListFirstNode(bucket);
|
||||
if (link_list_head != nullptr) {
|
||||
LinkListIterator iter(this, link_list_head);
|
||||
for (iter.Seek(k.internal_key(), nullptr);
|
||||
iter.Valid() && callback_func(callback_args, iter.key());
|
||||
iter.Next()) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) {
|
||||
// allocate a new arena of similar size to the one currently in use
|
||||
Arena* new_arena = new Arena(arena_->BlockSize());
|
||||
auto list = new FullList(compare_, new_arena);
|
||||
auto list = new MemtableSkipList(compare_, new_arena);
|
||||
HistogramImpl keys_per_bucket_hist;
|
||||
|
||||
for (size_t i = 0; i < bucket_size_; ++i) {
|
||||
int count = 0;
|
||||
bool num_entries_printed = false;
|
||||
auto bucket = GetBucket(i);
|
||||
auto* bucket = GetBucket(i);
|
||||
if (bucket != nullptr) {
|
||||
Iterator itr(this, bucket);
|
||||
for (itr.SeekToHead(); itr.Valid(); itr.Next()) {
|
||||
list->Insert(itr.key());
|
||||
if (logger_ != nullptr &&
|
||||
++count >= bucket_entries_logging_threshold_ &&
|
||||
!num_entries_printed) {
|
||||
num_entries_printed = true;
|
||||
Info(logger_, "HashLinkedList bucket %zu has more than %d "
|
||||
"entries. %dth key: %s",
|
||||
i, count, count,
|
||||
GetLengthPrefixedSlice(itr.key()).ToString(true).c_str());
|
||||
auto* skip_list_header = GetSkipListBucketHeader(bucket);
|
||||
if (skip_list_header != nullptr) {
|
||||
// Is a skip list
|
||||
MemtableSkipList::Iterator itr(&skip_list_header->skip_list);
|
||||
for (itr.SeekToFirst(); itr.Valid(); itr.Next()) {
|
||||
list->Insert(itr.key());
|
||||
count++;
|
||||
}
|
||||
} else {
|
||||
auto* link_list_head = GetLinkListFirstNode(bucket);
|
||||
if (link_list_head != nullptr) {
|
||||
LinkListIterator itr(this, link_list_head);
|
||||
for (itr.SeekToHead(); itr.Valid(); itr.Next()) {
|
||||
list->Insert(itr.key());
|
||||
count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -474,7 +748,8 @@ MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator(
|
||||
}
|
||||
}
|
||||
|
||||
bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const {
|
||||
bool HashLinkListRep::LinkListContains(Node* head,
|
||||
const Slice& user_key) const {
|
||||
Node* x = FindGreaterOrEqualInBucket(head, user_key);
|
||||
return (x != nullptr && Equal(user_key, x->key));
|
||||
}
|
||||
@ -505,17 +780,19 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
|
||||
MemTableRep* HashLinkListRepFactory::CreateMemTableRep(
|
||||
const MemTableRep::KeyComparator& compare, Arena* arena,
|
||||
const SliceTransform* transform, Logger* logger) {
|
||||
return new HashLinkListRep(
|
||||
compare, arena, transform, bucket_count_, huge_page_tlb_size_, logger,
|
||||
bucket_entries_logging_threshold_, if_log_bucket_dist_when_flash_);
|
||||
return new HashLinkListRep(compare, arena, transform, bucket_count_,
|
||||
threshold_use_skiplist_, huge_page_tlb_size_,
|
||||
logger, bucket_entries_logging_threshold_,
|
||||
if_log_bucket_dist_when_flash_);
|
||||
}
|
||||
|
||||
MemTableRepFactory* NewHashLinkListRepFactory(
|
||||
size_t bucket_count, size_t huge_page_tlb_size,
|
||||
int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) {
|
||||
return new HashLinkListRepFactory(bucket_count, huge_page_tlb_size,
|
||||
bucket_entries_logging_threshold,
|
||||
if_log_bucket_dist_when_flash);
|
||||
int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash,
|
||||
uint32_t threshold_use_skiplist) {
|
||||
return new HashLinkListRepFactory(
|
||||
bucket_count, threshold_use_skiplist, huge_page_tlb_size,
|
||||
bucket_entries_logging_threshold, if_log_bucket_dist_when_flash);
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -16,10 +16,12 @@ namespace rocksdb {
|
||||
class HashLinkListRepFactory : public MemTableRepFactory {
|
||||
public:
|
||||
explicit HashLinkListRepFactory(size_t bucket_count,
|
||||
uint32_t threshold_use_skiplist,
|
||||
size_t huge_page_tlb_size,
|
||||
int bucket_entries_logging_threshold,
|
||||
bool if_log_bucket_dist_when_flash)
|
||||
: bucket_count_(bucket_count),
|
||||
threshold_use_skiplist_(threshold_use_skiplist),
|
||||
huge_page_tlb_size_(huge_page_tlb_size),
|
||||
bucket_entries_logging_threshold_(bucket_entries_logging_threshold),
|
||||
if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) {}
|
||||
@ -36,6 +38,7 @@ class HashLinkListRepFactory : public MemTableRepFactory {
|
||||
|
||||
private:
|
||||
const size_t bucket_count_;
|
||||
const uint32_t threshold_use_skiplist_;
|
||||
const size_t huge_page_tlb_size_;
|
||||
int bucket_entries_logging_threshold_;
|
||||
bool if_log_bucket_dist_when_flash_;
|
||||
|
@ -229,7 +229,9 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare,
|
||||
transform_(transform),
|
||||
compare_(compare),
|
||||
arena_(arena) {
|
||||
buckets_ = new port::AtomicPointer[bucket_size];
|
||||
auto mem =
|
||||
arena->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size);
|
||||
buckets_ = new (mem) port::AtomicPointer[bucket_size];
|
||||
|
||||
for (size_t i = 0; i < bucket_size_; ++i) {
|
||||
buckets_[i].NoBarrier_Store(nullptr);
|
||||
@ -237,7 +239,6 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare,
|
||||
}
|
||||
|
||||
HashSkipListRep::~HashSkipListRep() {
|
||||
delete[] buckets_;
|
||||
}
|
||||
|
||||
HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket(
|
||||
@ -271,7 +272,7 @@ bool HashSkipListRep::Contains(const char* key) const {
|
||||
}
|
||||
|
||||
size_t HashSkipListRep::ApproximateMemoryUsage() {
|
||||
return sizeof(buckets_);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void HashSkipListRep::Get(const LookupKey& k, void* callback_args,
|
||||
|
@ -286,6 +286,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
|
||||
}
|
||||
}
|
||||
|
||||
if (opt.db_paths.size() == 0) {
|
||||
opt.db_paths.push_back(db_path_);
|
||||
}
|
||||
|
||||
return opt;
|
||||
}
|
||||
|
||||
|
@ -214,6 +214,7 @@ DBOptions::DBOptions(const Options& options)
|
||||
disableDataSync(options.disableDataSync),
|
||||
use_fsync(options.use_fsync),
|
||||
db_stats_log_interval(options.db_stats_log_interval),
|
||||
db_paths(options.db_paths),
|
||||
db_log_dir(options.db_log_dir),
|
||||
wal_dir(options.wal_dir),
|
||||
delete_obsolete_files_period_micros(
|
||||
|
Loading…
Reference in New Issue
Block a user