Support Multiple DB paths (without having an interface to expose to users)
Summary: In this patch, we allow RocksDB to support multiple DB paths internally. No user interface is supported yet so this patch is silent to users. Test Plan: make all check Reviewers: igor, haobo, ljin, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D18921
This commit is contained in:
parent
f146cab261
commit
2459f7ec4e
@ -54,7 +54,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options,
|
|||||||
purge = false;
|
purge = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string fname = TableFileName(dbname, meta->fd.GetNumber());
|
std::string fname = TableFileName(options.db_paths, meta->fd.GetNumber(),
|
||||||
|
meta->fd.GetPathId());
|
||||||
if (iter->Valid()) {
|
if (iter->Valid()) {
|
||||||
unique_ptr<WritableFile> file;
|
unique_ptr<WritableFile> file;
|
||||||
s = env->NewWritableFile(fname, &file, soptions);
|
s = env->NewWritableFile(fname, &file, soptions);
|
||||||
|
@ -224,8 +224,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
|
|||||||
if (dummy_versions != nullptr) {
|
if (dummy_versions != nullptr) {
|
||||||
internal_stats_.reset(new InternalStats(
|
internal_stats_.reset(new InternalStats(
|
||||||
options_.num_levels, db_options->env, db_options->statistics.get()));
|
options_.num_levels, db_options->env, db_options->statistics.get()));
|
||||||
table_cache_.reset(
|
table_cache_.reset(new TableCache(&options_, storage_options, table_cache));
|
||||||
new TableCache(dbname, &options_, storage_options, table_cache));
|
|
||||||
if (options_.compaction_style == kCompactionStyleUniversal) {
|
if (options_.compaction_style == kCompactionStyleUniversal) {
|
||||||
compaction_picker_.reset(
|
compaction_picker_.reset(
|
||||||
new UniversalCompactionPicker(&options_, &internal_comparator_));
|
new UniversalCompactionPicker(&options_, &internal_comparator_));
|
||||||
|
@ -29,6 +29,7 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
|
|||||||
Compaction::Compaction(Version* input_version, int level, int out_level,
|
Compaction::Compaction(Version* input_version, int level, int out_level,
|
||||||
uint64_t target_file_size,
|
uint64_t target_file_size,
|
||||||
uint64_t max_grandparent_overlap_bytes,
|
uint64_t max_grandparent_overlap_bytes,
|
||||||
|
uint32_t output_path_id,
|
||||||
CompressionType output_compression, bool seek_compaction,
|
CompressionType output_compression, bool seek_compaction,
|
||||||
bool deletion_compaction)
|
bool deletion_compaction)
|
||||||
: level_(level),
|
: level_(level),
|
||||||
@ -38,6 +39,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
|
|||||||
input_version_(input_version),
|
input_version_(input_version),
|
||||||
number_levels_(input_version_->NumberLevels()),
|
number_levels_(input_version_->NumberLevels()),
|
||||||
cfd_(input_version_->cfd_),
|
cfd_(input_version_->cfd_),
|
||||||
|
output_path_id_(output_path_id),
|
||||||
output_compression_(output_compression),
|
output_compression_(output_compression),
|
||||||
seek_compaction_(seek_compaction),
|
seek_compaction_(seek_compaction),
|
||||||
deletion_compaction_(deletion_compaction),
|
deletion_compaction_(deletion_compaction),
|
||||||
|
@ -50,6 +50,9 @@ class Compaction {
|
|||||||
// What compression for output
|
// What compression for output
|
||||||
CompressionType OutputCompressionType() const { return output_compression_; }
|
CompressionType OutputCompressionType() const { return output_compression_; }
|
||||||
|
|
||||||
|
// Whether need to write output file to second DB path.
|
||||||
|
uint32_t GetOutputPathId() const { return output_path_id_; }
|
||||||
|
|
||||||
// Is this a trivial compaction that can be implemented by just
|
// Is this a trivial compaction that can be implemented by just
|
||||||
// moving a single input file to the next level (no merging or splitting)
|
// moving a single input file to the next level (no merging or splitting)
|
||||||
bool IsTrivialMove() const;
|
bool IsTrivialMove() const;
|
||||||
@ -104,8 +107,8 @@ class Compaction {
|
|||||||
|
|
||||||
Compaction(Version* input_version, int level, int out_level,
|
Compaction(Version* input_version, int level, int out_level,
|
||||||
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
|
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
|
||||||
CompressionType output_compression, bool seek_compaction = false,
|
uint32_t output_path_id, CompressionType output_compression,
|
||||||
bool deletion_compaction = false);
|
bool seek_compaction = false, bool deletion_compaction = false);
|
||||||
|
|
||||||
int level_;
|
int level_;
|
||||||
int out_level_; // levels to which output files are stored
|
int out_level_; // levels to which output files are stored
|
||||||
@ -116,6 +119,7 @@ class Compaction {
|
|||||||
int number_levels_;
|
int number_levels_;
|
||||||
ColumnFamilyData* cfd_;
|
ColumnFamilyData* cfd_;
|
||||||
|
|
||||||
|
uint32_t output_path_id_;
|
||||||
CompressionType output_compression_;
|
CompressionType output_compression_;
|
||||||
bool seek_compaction_;
|
bool seek_compaction_;
|
||||||
// if true, just delete files in inputs_[0]
|
// if true, just delete files in inputs_[0]
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
#define __STDC_FORMAT_MACROS
|
#define __STDC_FORMAT_MACROS
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
#include "db/filename.h"
|
||||||
#include "util/log_buffer.h"
|
#include "util/log_buffer.h"
|
||||||
#include "util/statistics.h"
|
#include "util/statistics.h"
|
||||||
|
|
||||||
@ -370,7 +371,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
|
|||||||
}
|
}
|
||||||
Compaction* c = new Compaction(version, input_level, output_level,
|
Compaction* c = new Compaction(version, input_level, output_level,
|
||||||
MaxFileSizeForLevel(output_level),
|
MaxFileSizeForLevel(output_level),
|
||||||
MaxGrandParentOverlapBytes(input_level),
|
MaxGrandParentOverlapBytes(input_level), 0,
|
||||||
GetCompressionType(*options_, output_level));
|
GetCompressionType(*options_, output_level));
|
||||||
|
|
||||||
c->inputs_[0] = inputs;
|
c->inputs_[0] = inputs;
|
||||||
@ -491,7 +492,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
|
|||||||
assert(level >= 0);
|
assert(level >= 0);
|
||||||
assert(level + 1 < NumberLevels());
|
assert(level + 1 < NumberLevels());
|
||||||
c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
|
c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
|
||||||
MaxGrandParentOverlapBytes(level),
|
MaxGrandParentOverlapBytes(level), 0,
|
||||||
GetCompressionType(*options_, level + 1));
|
GetCompressionType(*options_, level + 1));
|
||||||
c->score_ = score;
|
c->score_ = score;
|
||||||
|
|
||||||
@ -684,9 +685,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
|||||||
// first candidate to be compacted.
|
// first candidate to be compacted.
|
||||||
uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0;
|
uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0;
|
||||||
if (f != nullptr) {
|
if (f != nullptr) {
|
||||||
LogToBuffer(log_buffer,
|
LogToBuffer(
|
||||||
"[%s] Universal: Possible candidate file %" PRIu64 "[%d].",
|
log_buffer, "[%s] Universal: Possible candidate file %s[%d].",
|
||||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop);
|
version->cfd_->GetName().c_str(),
|
||||||
|
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the suceeding files need compaction.
|
// Check if the suceeding files need compaction.
|
||||||
@ -764,7 +766,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Compaction* c = new Compaction(
|
Compaction* c = new Compaction(
|
||||||
version, level, level, MaxFileSizeForLevel(level), LLONG_MAX,
|
version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, 0,
|
||||||
GetCompressionType(*options_, level, enable_compression));
|
GetCompressionType(*options_, level, enable_compression));
|
||||||
c->score_ = score;
|
c->score_ = score;
|
||||||
|
|
||||||
@ -772,11 +774,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
|
|||||||
FileMetaData* f = c->input_version_->files_[level][i];
|
FileMetaData* f = c->input_version_->files_[level][i];
|
||||||
c->inputs_[0].push_back(f);
|
c->inputs_[0].push_back(f);
|
||||||
LogToBuffer(log_buffer,
|
LogToBuffer(log_buffer,
|
||||||
"[%s] Universal: Picking file %" PRIu64 "[%d] "
|
"[%s] Universal: Picking file %s[%d] "
|
||||||
"with size %" PRIu64 " (compensated size %" PRIu64 ")\n",
|
"with size %" PRIu64 " (compensated size %" PRIu64 ")\n",
|
||||||
version->cfd_->GetName().c_str(),
|
version->cfd_->GetName().c_str(),
|
||||||
f->fd.GetNumber(), i,
|
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(),
|
||||||
f->fd.GetFileSize(), f->compensated_file_size);
|
i, f->fd.GetFileSize(), f->compensated_file_size);
|
||||||
}
|
}
|
||||||
return c;
|
return c;
|
||||||
}
|
}
|
||||||
@ -810,29 +812,29 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
|||||||
start_index = loop; // Consider this as the first candidate.
|
start_index = loop; // Consider this as the first candidate.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
LogToBuffer(log_buffer,
|
LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s",
|
||||||
"[%s] Universal: skipping file %" PRIu64 "[%d] compacted %s",
|
version->cfd_->GetName().c_str(),
|
||||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop,
|
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(),
|
||||||
" cannot be a candidate to reduce size amp.\n");
|
loop, " cannot be a candidate to reduce size amp.\n");
|
||||||
f = nullptr;
|
f = nullptr;
|
||||||
}
|
}
|
||||||
if (f == nullptr) {
|
if (f == nullptr) {
|
||||||
return nullptr; // no candidate files
|
return nullptr; // no candidate files
|
||||||
}
|
}
|
||||||
|
|
||||||
LogToBuffer(log_buffer,
|
LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s",
|
||||||
"[%s] Universal: First candidate file %" PRIu64 "[%d] %s",
|
version->cfd_->GetName().c_str(),
|
||||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), start_index,
|
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(),
|
||||||
" to reduce size amp.\n");
|
start_index, " to reduce size amp.\n");
|
||||||
|
|
||||||
// keep adding up all the remaining files
|
// keep adding up all the remaining files
|
||||||
for (unsigned int loop = start_index; loop < files.size() - 1; loop++) {
|
for (unsigned int loop = start_index; loop < files.size() - 1; loop++) {
|
||||||
f = files[loop];
|
f = files[loop];
|
||||||
if (f->being_compacted) {
|
if (f->being_compacted) {
|
||||||
LogToBuffer(
|
LogToBuffer(
|
||||||
log_buffer,
|
log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.",
|
||||||
"[%s] Universal: Possible candidate file %" PRIu64 "[%d] %s.",
|
version->cfd_->GetName().c_str(),
|
||||||
version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop,
|
FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop,
|
||||||
" is already being compacted. No size amp reduction possible.\n");
|
" is already being compacted. No size amp reduction possible.\n");
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
@ -867,7 +869,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
|
|||||||
// We always compact all the files, so always compress.
|
// We always compact all the files, so always compress.
|
||||||
Compaction* c =
|
Compaction* c =
|
||||||
new Compaction(version, level, level, MaxFileSizeForLevel(level),
|
new Compaction(version, level, level, MaxFileSizeForLevel(level),
|
||||||
LLONG_MAX, GetCompressionType(*options_, level));
|
LLONG_MAX, 0, GetCompressionType(*options_, level));
|
||||||
c->score_ = score;
|
c->score_ = score;
|
||||||
for (unsigned int loop = start_index; loop < files.size(); loop++) {
|
for (unsigned int loop = start_index; loop < files.size(); loop++) {
|
||||||
f = c->input_version_->files_[level][loop];
|
f = c->input_version_->files_[level][loop];
|
||||||
@ -909,7 +911,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
Compaction* c = new Compaction(version, 0, 0, 0, 0, kNoCompression, false,
|
Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false,
|
||||||
true /* is deletion compaction */);
|
true /* is deletion compaction */);
|
||||||
// delete old files (FIFO)
|
// delete old files (FIFO)
|
||||||
for (auto ritr = version->files_[0].rbegin();
|
for (auto ritr = version->files_[0].rbegin();
|
||||||
|
@ -98,7 +98,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Make a set of all of the live *.sst files
|
// Make a set of all of the live *.sst files
|
||||||
std::set<uint64_t> live;
|
std::vector<FileDescriptor> live;
|
||||||
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
for (auto cfd : *versions_->GetColumnFamilySet()) {
|
||||||
cfd->current()->AddLiveFiles(&live);
|
cfd->current()->AddLiveFiles(&live);
|
||||||
}
|
}
|
||||||
@ -109,7 +109,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
|
|||||||
// create names of the live files. The names are not absolute
|
// create names of the live files. The names are not absolute
|
||||||
// paths, instead they are relative to dbname_;
|
// paths, instead they are relative to dbname_;
|
||||||
for (auto live_file : live) {
|
for (auto live_file : live) {
|
||||||
ret.push_back(TableFileName("", live_file));
|
ret.push_back(MakeTableFileName("", live_file.GetNumber()));
|
||||||
}
|
}
|
||||||
|
|
||||||
ret.push_back(CurrentFileName(""));
|
ret.push_back(CurrentFileName(""));
|
||||||
|
178
db/db_impl.cc
178
db/db_impl.cc
@ -98,6 +98,7 @@ struct DBImpl::CompactionState {
|
|||||||
// Files produced by compaction
|
// Files produced by compaction
|
||||||
struct Output {
|
struct Output {
|
||||||
uint64_t number;
|
uint64_t number;
|
||||||
|
uint32_t path_id;
|
||||||
uint64_t file_size;
|
uint64_t file_size;
|
||||||
InternalKey smallest, largest;
|
InternalKey smallest, largest;
|
||||||
SequenceNumber smallest_seqno, largest_seqno;
|
SequenceNumber smallest_seqno, largest_seqno;
|
||||||
@ -294,6 +295,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
|
|||||||
result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
|
result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (result.db_paths.size() == 0) {
|
||||||
|
result.db_paths.push_back(dbname);
|
||||||
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -573,30 +578,48 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// don't delete live files
|
// don't delete live files
|
||||||
deletion_state.sst_live.assign(pending_outputs_.begin(),
|
for (auto pair : pending_outputs_) {
|
||||||
pending_outputs_.end());
|
deletion_state.sst_live.emplace_back(pair.first, pair.second, 0);
|
||||||
|
}
|
||||||
|
/* deletion_state.sst_live.insert(pending_outputs_.begin(),
|
||||||
|
pending_outputs_.end());*/
|
||||||
versions_->AddLiveFiles(&deletion_state.sst_live);
|
versions_->AddLiveFiles(&deletion_state.sst_live);
|
||||||
|
|
||||||
if (doing_the_full_scan) {
|
if (doing_the_full_scan) {
|
||||||
// set of all files in the directory. We'll exclude files that are still
|
for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) {
|
||||||
// alive in the subsequent processings.
|
// set of all files in the directory. We'll exclude files that are still
|
||||||
env_->GetChildren(
|
// alive in the subsequent processings.
|
||||||
dbname_, &deletion_state.candidate_files
|
std::vector<std::string> files;
|
||||||
); // Ignore errors
|
env_->GetChildren(dbname_, &files); // Ignore errors
|
||||||
|
for (std::string file : files) {
|
||||||
|
deletion_state.candidate_files.emplace_back(file, path_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//Add log files in wal_dir
|
//Add log files in wal_dir
|
||||||
if (options_.wal_dir != dbname_) {
|
if (options_.wal_dir != dbname_) {
|
||||||
std::vector<std::string> log_files;
|
std::vector<std::string> log_files;
|
||||||
env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
|
env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors
|
||||||
deletion_state.candidate_files.insert(
|
for (std::string log_file : log_files) {
|
||||||
deletion_state.candidate_files.end(),
|
deletion_state.candidate_files.emplace_back(log_file, 0);
|
||||||
log_files.begin(),
|
}
|
||||||
log_files.end()
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first,
|
||||||
|
const rocksdb::DBImpl::CandidateFileInfo& second) {
|
||||||
|
if (first.file_name > second.file_name) {
|
||||||
|
return true;
|
||||||
|
} else if (first.file_name < second.file_name) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return (first.path_id > first.path_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}; // namespace
|
||||||
|
|
||||||
// Diffs the files listed in filenames and those that do not
|
// Diffs the files listed in filenames and those that do not
|
||||||
// belong to live files are posibly removed. Also, removes all the
|
// belong to live files are posibly removed. Also, removes all the
|
||||||
// files in sst_delete_files and log_delete_files.
|
// files in sst_delete_files and log_delete_files.
|
||||||
@ -612,10 +635,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now, convert live list to an unordered set, WITHOUT mutex held;
|
// Now, convert live list to an unordered map, WITHOUT mutex held;
|
||||||
// set is slow.
|
// set is slow.
|
||||||
std::unordered_set<uint64_t> sst_live(state.sst_live.begin(),
|
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
|
||||||
state.sst_live.end());
|
for (FileDescriptor& fd : state.sst_live) {
|
||||||
|
sst_live_map[fd.GetNumber()] = &fd;
|
||||||
|
}
|
||||||
|
|
||||||
auto& candidate_files = state.candidate_files;
|
auto& candidate_files = state.candidate_files;
|
||||||
candidate_files.reserve(
|
candidate_files.reserve(
|
||||||
@ -625,26 +650,30 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
|||||||
// We may ignore the dbname when generating the file names.
|
// We may ignore the dbname when generating the file names.
|
||||||
const char* kDumbDbName = "";
|
const char* kDumbDbName = "";
|
||||||
for (auto file : state.sst_delete_files) {
|
for (auto file : state.sst_delete_files) {
|
||||||
candidate_files.push_back(
|
candidate_files.emplace_back(
|
||||||
TableFileName(kDumbDbName, file->fd.GetNumber()).substr(1));
|
MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
|
||||||
|
file->fd.GetPathId());
|
||||||
delete file;
|
delete file;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto file_num : state.log_delete_files) {
|
for (auto file_num : state.log_delete_files) {
|
||||||
if (file_num > 0) {
|
if (file_num > 0) {
|
||||||
candidate_files.push_back(LogFileName(kDumbDbName, file_num).substr(1));
|
candidate_files.emplace_back(LogFileName(kDumbDbName, file_num).substr(1),
|
||||||
|
0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// dedup state.candidate_files so we don't try to delete the same
|
// dedup state.candidate_files so we don't try to delete the same
|
||||||
// file twice
|
// file twice
|
||||||
sort(candidate_files.begin(), candidate_files.end());
|
sort(candidate_files.begin(), candidate_files.end(), CompareCandidateFile);
|
||||||
candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()),
|
candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()),
|
||||||
candidate_files.end());
|
candidate_files.end());
|
||||||
|
|
||||||
std::vector<std::string> old_info_log_files;
|
std::vector<std::string> old_info_log_files;
|
||||||
|
|
||||||
for (const auto& to_delete : candidate_files) {
|
for (const auto& candidate_file : candidate_files) {
|
||||||
|
std::string to_delete = candidate_file.file_name;
|
||||||
|
uint32_t path_id = candidate_file.path_id;
|
||||||
uint64_t number;
|
uint64_t number;
|
||||||
FileType type;
|
FileType type;
|
||||||
// Ignore file if we cannot recognize it.
|
// Ignore file if we cannot recognize it.
|
||||||
@ -664,7 +693,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
|||||||
keep = (number >= state.manifest_file_number);
|
keep = (number >= state.manifest_file_number);
|
||||||
break;
|
break;
|
||||||
case kTableFile:
|
case kTableFile:
|
||||||
keep = (sst_live.find(number) != sst_live.end());
|
keep = (sst_live_map.find(number) != sst_live_map.end());
|
||||||
break;
|
break;
|
||||||
case kTempFile:
|
case kTempFile:
|
||||||
// Any temp files that are currently being written to must
|
// Any temp files that are currently being written to must
|
||||||
@ -672,7 +701,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
|||||||
// Also, SetCurrentFile creates a temp file when writing out new
|
// Also, SetCurrentFile creates a temp file when writing out new
|
||||||
// manifest, which is equal to state.pending_manifest_file_number. We
|
// manifest, which is equal to state.pending_manifest_file_number. We
|
||||||
// should not delete that file
|
// should not delete that file
|
||||||
keep = (sst_live.find(number) != sst_live.end()) ||
|
keep = (sst_live_map.find(number) != sst_live_map.end()) ||
|
||||||
(number == state.pending_manifest_file_number);
|
(number == state.pending_manifest_file_number);
|
||||||
break;
|
break;
|
||||||
case kInfoLogFile:
|
case kInfoLogFile:
|
||||||
@ -693,13 +722,16 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::string fname;
|
||||||
if (type == kTableFile) {
|
if (type == kTableFile) {
|
||||||
// evict from cache
|
// evict from cache
|
||||||
TableCache::Evict(table_cache_.get(), number);
|
TableCache::Evict(table_cache_.get(), number);
|
||||||
|
fname = TableFileName(options_.db_paths, number, path_id);
|
||||||
|
} else {
|
||||||
|
fname =
|
||||||
|
((type == kLogFile) ? options_.wal_dir : dbname_) + "/" + to_delete;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) +
|
|
||||||
"/" + to_delete;
|
|
||||||
if (type == kLogFile &&
|
if (type == kLogFile &&
|
||||||
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
|
(options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) {
|
||||||
auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number);
|
auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number);
|
||||||
@ -1084,6 +1116,13 @@ Status DBImpl::Recover(
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (auto db_path : options_.db_paths) {
|
||||||
|
s = env_->CreateDirIfMissing(db_path);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
s = env_->NewDirectory(dbname_, &db_directory_);
|
s = env_->NewDirectory(dbname_, &db_directory_);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
@ -1349,8 +1388,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
|
|||||||
mutex_.AssertHeld();
|
mutex_.AssertHeld();
|
||||||
const uint64_t start_micros = env_->NowMicros();
|
const uint64_t start_micros = env_->NowMicros();
|
||||||
FileMetaData meta;
|
FileMetaData meta;
|
||||||
meta.fd.number = versions_->NewFileNumber();
|
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
||||||
pending_outputs_.insert(meta.fd.GetNumber());
|
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
|
||||||
Iterator* iter = mem->NewIterator(ReadOptions(), true);
|
Iterator* iter = mem->NewIterator(ReadOptions(), true);
|
||||||
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
||||||
const SequenceNumber earliest_seqno_in_memtable =
|
const SequenceNumber earliest_seqno_in_memtable =
|
||||||
@ -1381,9 +1420,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
|
|||||||
// should not be added to the manifest.
|
// should not be added to the manifest.
|
||||||
int level = 0;
|
int level = 0;
|
||||||
if (s.ok() && meta.fd.GetFileSize() > 0) {
|
if (s.ok() && meta.fd.GetFileSize() > 0) {
|
||||||
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(),
|
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
||||||
meta.smallest, meta.largest, meta.smallest_seqno,
|
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
||||||
meta.largest_seqno);
|
meta.smallest_seqno, meta.largest_seqno);
|
||||||
}
|
}
|
||||||
|
|
||||||
InternalStats::CompactionStats stats;
|
InternalStats::CompactionStats stats;
|
||||||
@ -1402,9 +1441,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
|
|||||||
mutex_.AssertHeld();
|
mutex_.AssertHeld();
|
||||||
const uint64_t start_micros = env_->NowMicros();
|
const uint64_t start_micros = env_->NowMicros();
|
||||||
FileMetaData meta;
|
FileMetaData meta;
|
||||||
meta.fd.number = versions_->NewFileNumber();
|
|
||||||
|
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
|
||||||
*filenumber = meta.fd.GetNumber();
|
*filenumber = meta.fd.GetNumber();
|
||||||
pending_outputs_.insert(meta.fd.GetNumber());
|
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
|
||||||
|
|
||||||
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
|
||||||
const SequenceNumber earliest_seqno_in_memtable =
|
const SequenceNumber earliest_seqno_in_memtable =
|
||||||
@ -1471,9 +1511,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
|
|||||||
cfd->options()->compaction_style == kCompactionStyleLevel) {
|
cfd->options()->compaction_style == kCompactionStyleLevel) {
|
||||||
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
|
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
|
||||||
}
|
}
|
||||||
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(),
|
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
|
||||||
meta.smallest, meta.largest, meta.smallest_seqno,
|
meta.fd.GetFileSize(), meta.smallest, meta.largest,
|
||||||
meta.largest_seqno);
|
meta.smallest_seqno, meta.largest_seqno);
|
||||||
}
|
}
|
||||||
|
|
||||||
InternalStats::CompactionStats stats;
|
InternalStats::CompactionStats stats;
|
||||||
@ -1529,7 +1569,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
|
|||||||
// Replace immutable memtable with the generated Table
|
// Replace immutable memtable with the generated Table
|
||||||
s = cfd->imm()->InstallMemtableFlushResults(
|
s = cfd->imm()->InstallMemtableFlushResults(
|
||||||
cfd, mems, versions_.get(), &mutex_, options_.info_log.get(),
|
cfd, mems, versions_.get(), &mutex_, options_.info_log.get(),
|
||||||
file_number, pending_outputs_, &deletion_state.memtables_to_free,
|
file_number, &pending_outputs_, &deletion_state.memtables_to_free,
|
||||||
db_directory_.get(), log_buffer);
|
db_directory_.get(), log_buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1673,9 +1713,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
|
|||||||
edit.SetColumnFamily(cfd->GetID());
|
edit.SetColumnFamily(cfd->GetID());
|
||||||
for (const auto& f : cfd->current()->files_[level]) {
|
for (const auto& f : cfd->current()->files_[level]) {
|
||||||
edit.DeleteFile(level, f->fd.GetNumber());
|
edit.DeleteFile(level, f->fd.GetNumber());
|
||||||
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetFileSize(),
|
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||||
f->smallest, f->largest, f->smallest_seqno,
|
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||||
f->largest_seqno);
|
f->smallest_seqno, f->largest_seqno);
|
||||||
}
|
}
|
||||||
Log(options_.info_log, "[%s] Apply version edit:\n%s",
|
Log(options_.info_log, "[%s] Apply version edit:\n%s",
|
||||||
cfd->GetName().c_str(), edit.DebugString().data());
|
cfd->GetName().c_str(), edit.DebugString().data());
|
||||||
@ -2172,9 +2212,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
|
|||||||
assert(c->num_input_files(0) == 1);
|
assert(c->num_input_files(0) == 1);
|
||||||
FileMetaData* f = c->input(0, 0);
|
FileMetaData* f = c->input(0, 0);
|
||||||
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
|
||||||
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetFileSize(),
|
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||||
f->smallest, f->largest, f->smallest_seqno,
|
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||||
f->largest_seqno);
|
f->smallest_seqno, f->largest_seqno);
|
||||||
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
|
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
|
||||||
db_directory_.get());
|
db_directory_.get());
|
||||||
InstallSuperVersion(c->column_family_data(), deletion_state);
|
InstallSuperVersion(c->column_family_data(), deletion_state);
|
||||||
@ -2280,7 +2320,7 @@ void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
|
|||||||
int filesNeeded = compact->compaction->num_input_files(1);
|
int filesNeeded = compact->compaction->num_input_files(1);
|
||||||
for (int i = 0; i < std::max(filesNeeded, 1); i++) {
|
for (int i = 0; i < std::max(filesNeeded, 1); i++) {
|
||||||
uint64_t file_number = versions_->NewFileNumber();
|
uint64_t file_number = versions_->NewFileNumber();
|
||||||
pending_outputs_.insert(file_number);
|
pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
|
||||||
compact->allocated_file_numbers.push_back(file_number);
|
compact->allocated_file_numbers.push_back(file_number);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2306,18 +2346,20 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
|
|||||||
} else {
|
} else {
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
file_number = versions_->NewFileNumber();
|
file_number = versions_->NewFileNumber();
|
||||||
pending_outputs_.insert(file_number);
|
pending_outputs_[file_number] = compact->compaction->GetOutputPathId();
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
}
|
}
|
||||||
CompactionState::Output out;
|
CompactionState::Output out;
|
||||||
out.number = file_number;
|
out.number = file_number;
|
||||||
|
out.path_id = compact->compaction->GetOutputPathId();
|
||||||
out.smallest.Clear();
|
out.smallest.Clear();
|
||||||
out.largest.Clear();
|
out.largest.Clear();
|
||||||
out.smallest_seqno = out.largest_seqno = 0;
|
out.smallest_seqno = out.largest_seqno = 0;
|
||||||
compact->outputs.push_back(out);
|
compact->outputs.push_back(out);
|
||||||
|
|
||||||
// Make the output file
|
// Make the output file
|
||||||
std::string fname = TableFileName(dbname_, file_number);
|
std::string fname = TableFileName(options_.db_paths, file_number,
|
||||||
|
compact->compaction->GetOutputPathId());
|
||||||
Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
|
Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_);
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
@ -2340,6 +2382,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
|||||||
assert(compact->builder != nullptr);
|
assert(compact->builder != nullptr);
|
||||||
|
|
||||||
const uint64_t output_number = compact->current_output()->number;
|
const uint64_t output_number = compact->current_output()->number;
|
||||||
|
const uint32_t output_path_id = compact->current_output()->path_id;
|
||||||
assert(output_number != 0);
|
assert(output_number != 0);
|
||||||
|
|
||||||
// Check for iterator errors
|
// Check for iterator errors
|
||||||
@ -2375,9 +2418,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
|
|||||||
if (s.ok() && current_entries > 0) {
|
if (s.ok() && current_entries > 0) {
|
||||||
// Verify that the table is usable
|
// Verify that the table is usable
|
||||||
ColumnFamilyData* cfd = compact->compaction->column_family_data();
|
ColumnFamilyData* cfd = compact->compaction->column_family_data();
|
||||||
FileDescriptor meta(output_number, current_bytes);
|
FileDescriptor fd(output_number, output_path_id, current_bytes);
|
||||||
Iterator* iter = cfd->table_cache()->NewIterator(
|
Iterator* iter = cfd->table_cache()->NewIterator(
|
||||||
ReadOptions(), storage_options_, cfd->internal_comparator(), meta);
|
ReadOptions(), storage_options_, cfd->internal_comparator(), fd);
|
||||||
s = iter->status();
|
s = iter->status();
|
||||||
delete iter;
|
delete iter;
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
@ -2420,9 +2463,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact,
|
|||||||
compact->compaction->AddInputDeletions(compact->compaction->edit());
|
compact->compaction->AddInputDeletions(compact->compaction->edit());
|
||||||
for (size_t i = 0; i < compact->outputs.size(); i++) {
|
for (size_t i = 0; i < compact->outputs.size(); i++) {
|
||||||
const CompactionState::Output& out = compact->outputs[i];
|
const CompactionState::Output& out = compact->outputs[i];
|
||||||
compact->compaction->edit()->AddFile(
|
compact->compaction->edit()->AddFile(compact->compaction->output_level(),
|
||||||
compact->compaction->output_level(), out.number, out.file_size,
|
out.number, out.path_id, out.file_size,
|
||||||
out.smallest, out.largest, out.smallest_seqno, out.largest_seqno);
|
out.smallest, out.largest,
|
||||||
|
out.smallest_seqno, out.largest_seqno);
|
||||||
}
|
}
|
||||||
return versions_->LogAndApply(compact->compaction->column_family_data(),
|
return versions_->LogAndApply(compact->compaction->column_family_data(),
|
||||||
compact->compaction->edit(), &mutex_,
|
compact->compaction->edit(), &mutex_,
|
||||||
@ -4118,7 +4162,7 @@ Status DBImpl::MakeRoomForWrite(
|
|||||||
// how do we fail if we're not creating new log?
|
// how do we fail if we're not creating new log?
|
||||||
assert(creating_new_log);
|
assert(creating_new_log);
|
||||||
// Avoid chewing through file number space in a tight loop.
|
// Avoid chewing through file number space in a tight loop.
|
||||||
versions_->ReuseFileNumber(new_log_number);
|
versions_->ReuseLogFileNumber(new_log_number);
|
||||||
assert(!new_mem);
|
assert(!new_mem);
|
||||||
assert(!new_log);
|
assert(!new_log);
|
||||||
break;
|
break;
|
||||||
@ -4361,14 +4405,15 @@ Status DBImpl::CheckConsistency() {
|
|||||||
|
|
||||||
std::string corruption_messages;
|
std::string corruption_messages;
|
||||||
for (const auto& md : metadata) {
|
for (const auto& md : metadata) {
|
||||||
std::string file_path = dbname_ + md.name;
|
std::string file_path = md.db_path + "/" + md.name;
|
||||||
|
|
||||||
uint64_t fsize = 0;
|
uint64_t fsize = 0;
|
||||||
Status s = env_->GetFileSize(file_path, &fsize);
|
Status s = env_->GetFileSize(file_path, &fsize);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
corruption_messages +=
|
corruption_messages +=
|
||||||
"Can't access " + md.name + ": " + s.ToString() + "\n";
|
"Can't access " + md.name + ": " + s.ToString() + "\n";
|
||||||
} else if (fsize != md.size) {
|
} else if (fsize != md.size) {
|
||||||
corruption_messages += "Sst file size mismatch: " + md.name +
|
corruption_messages += "Sst file size mismatch: " + file_path +
|
||||||
". Size recorded in manifest " +
|
". Size recorded in manifest " +
|
||||||
std::to_string(md.size) + ", actual size " +
|
std::to_string(md.size) + ", actual size " +
|
||||||
std::to_string(fsize) + "\n";
|
std::to_string(fsize) + "\n";
|
||||||
@ -4466,6 +4511,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
|
|||||||
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
||||||
const std::vector<ColumnFamilyDescriptor>& column_families,
|
const std::vector<ColumnFamilyDescriptor>& column_families,
|
||||||
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
|
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
|
||||||
|
if (db_options.db_paths.size() > 1) {
|
||||||
|
return Status::NotSupported(
|
||||||
|
"More than one DB paths are not supported yet. ");
|
||||||
|
}
|
||||||
|
|
||||||
*dbptr = nullptr;
|
*dbptr = nullptr;
|
||||||
handles->clear();
|
handles->clear();
|
||||||
|
|
||||||
@ -4481,6 +4531,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
|
|||||||
|
|
||||||
DBImpl* impl = new DBImpl(db_options, dbname);
|
DBImpl* impl = new DBImpl(db_options, dbname);
|
||||||
Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
|
Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir);
|
||||||
|
if (s.ok()) {
|
||||||
|
for (auto path : impl->options_.db_paths) {
|
||||||
|
s = impl->env_->CreateDirIfMissing(path);
|
||||||
|
if (!s.ok()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
delete impl;
|
delete impl;
|
||||||
return s;
|
return s;
|
||||||
@ -4643,6 +4702,21 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (auto db_path : options.db_paths) {
|
||||||
|
env->GetChildren(db_path, &filenames);
|
||||||
|
uint64_t number;
|
||||||
|
FileType type;
|
||||||
|
for (size_t i = 0; i < filenames.size(); i++) {
|
||||||
|
if (ParseFileName(filenames[i], &number, &type) &&
|
||||||
|
type == kTableFile) { // Lock file will be deleted at end
|
||||||
|
Status del = env->DeleteFile(db_path + "/" + filenames[i]);
|
||||||
|
if (result.ok() && !del.ok()) {
|
||||||
|
result = del;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
env->GetChildren(archivedir, &archiveFiles);
|
env->GetChildren(archivedir, &archiveFiles);
|
||||||
// Delete archival files.
|
// Delete archival files.
|
||||||
for (size_t i = 0; i < archiveFiles.size(); ++i) {
|
for (size_t i = 0; i < archiveFiles.size(); ++i) {
|
||||||
|
18
db/db_impl.h
18
db/db_impl.h
@ -198,6 +198,17 @@ class DBImpl : public DB {
|
|||||||
Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
|
Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence);
|
||||||
#endif // NDEBUG
|
#endif // NDEBUG
|
||||||
|
|
||||||
|
// Structure to store information for candidate files to delete.
|
||||||
|
struct CandidateFileInfo {
|
||||||
|
std::string file_name;
|
||||||
|
uint32_t path_id;
|
||||||
|
CandidateFileInfo(std::string name, uint32_t path)
|
||||||
|
: file_name(name), path_id(path) {}
|
||||||
|
bool operator==(const CandidateFileInfo& other) const {
|
||||||
|
return file_name == other.file_name && path_id == other.path_id;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// needed for CleanupIteratorState
|
// needed for CleanupIteratorState
|
||||||
struct DeletionState {
|
struct DeletionState {
|
||||||
inline bool HaveSomethingToDelete() const {
|
inline bool HaveSomethingToDelete() const {
|
||||||
@ -209,10 +220,10 @@ class DBImpl : public DB {
|
|||||||
// a list of all files that we'll consider deleting
|
// a list of all files that we'll consider deleting
|
||||||
// (every once in a while this is filled up with all files
|
// (every once in a while this is filled up with all files
|
||||||
// in the DB directory)
|
// in the DB directory)
|
||||||
std::vector<std::string> candidate_files;
|
std::vector<CandidateFileInfo> candidate_files;
|
||||||
|
|
||||||
// the list of all live sst files that cannot be deleted
|
// the list of all live sst files that cannot be deleted
|
||||||
std::vector<uint64_t> sst_live;
|
std::vector<FileDescriptor> sst_live;
|
||||||
|
|
||||||
// a list of sst files that we need to delete
|
// a list of sst files that we need to delete
|
||||||
std::vector<FileMetaData*> sst_delete_files;
|
std::vector<FileMetaData*> sst_delete_files;
|
||||||
@ -501,7 +512,8 @@ class DBImpl : public DB {
|
|||||||
|
|
||||||
// Set of table files to protect from deletion because they are
|
// Set of table files to protect from deletion because they are
|
||||||
// part of ongoing compactions.
|
// part of ongoing compactions.
|
||||||
std::set<uint64_t> pending_outputs_;
|
// map from pending file number ID to their path IDs.
|
||||||
|
FileNumToPathIdMap pending_outputs_;
|
||||||
|
|
||||||
// At least one compaction or flush job is pending but not yet scheduled
|
// At least one compaction or flush job is pending but not yet scheduled
|
||||||
// because of the max background thread limit.
|
// because of the max background thread limit.
|
||||||
|
@ -98,7 +98,12 @@ class AtomicCounter {
|
|||||||
count_ = 0;
|
count_ = 0;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
} // namespace anon
|
||||||
|
|
||||||
|
static std::string Key(int i) {
|
||||||
|
char buf[100];
|
||||||
|
snprintf(buf, sizeof(buf), "key%06d", i);
|
||||||
|
return std::string(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Special Env used to delay background operations
|
// Special Env used to delay background operations
|
||||||
@ -355,7 +360,10 @@ class DBTest {
|
|||||||
|
|
||||||
~DBTest() {
|
~DBTest() {
|
||||||
Close();
|
Close();
|
||||||
ASSERT_OK(DestroyDB(dbname_, Options()));
|
Options options;
|
||||||
|
options.db_paths.push_back(dbname_);
|
||||||
|
options.db_paths.push_back(dbname_ + "_2");
|
||||||
|
ASSERT_OK(DestroyDB(dbname_, options));
|
||||||
delete env_;
|
delete env_;
|
||||||
delete filter_policy_;
|
delete filter_policy_;
|
||||||
}
|
}
|
||||||
@ -897,6 +905,30 @@ class DBTest {
|
|||||||
return property;
|
return property;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int GetSstFileCount(std::string path) {
|
||||||
|
std::vector<std::string> files;
|
||||||
|
env_->GetChildren(path, &files);
|
||||||
|
|
||||||
|
int sst_count = 0;
|
||||||
|
uint64_t number;
|
||||||
|
FileType type;
|
||||||
|
for (size_t i = 0; i < files.size(); i++) {
|
||||||
|
if (ParseFileName(files[i], &number, &type) && type == kTableFile) {
|
||||||
|
sst_count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sst_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
void GenerateNewFile(Random* rnd, int* key_idx) {
|
||||||
|
for (int i = 0; i < 11; i++) {
|
||||||
|
ASSERT_OK(Put(Key(*key_idx), RandomString(rnd, (i == 10) ? 1 : 10000)));
|
||||||
|
(*key_idx)++;
|
||||||
|
}
|
||||||
|
dbfull()->TEST_WaitForFlushMemTable();
|
||||||
|
dbfull()->TEST_WaitForCompact();
|
||||||
|
}
|
||||||
|
|
||||||
std::string IterStatus(Iterator* iter) {
|
std::string IterStatus(Iterator* iter) {
|
||||||
std::string result;
|
std::string result;
|
||||||
if (iter->Valid()) {
|
if (iter->Valid()) {
|
||||||
@ -1037,12 +1069,6 @@ class DBTest {
|
|||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
static std::string Key(int i) {
|
|
||||||
char buf[100];
|
|
||||||
snprintf(buf, sizeof(buf), "key%06d", i);
|
|
||||||
return std::string(buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
static long TestGetTickerCount(const Options& options, Tickers ticker_type) {
|
static long TestGetTickerCount(const Options& options, Tickers ticker_type) {
|
||||||
return options.statistics->getTickerCount(ticker_type);
|
return options.statistics->getTickerCount(ticker_type);
|
||||||
}
|
}
|
||||||
@ -3434,6 +3460,13 @@ TEST(DBTest, UniversalCompactionCompressRatio2) {
|
|||||||
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(),
|
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(),
|
||||||
120000 * 12 * 0.8 + 120000 * 2);
|
120000 * 12 * 0.8 + 120000 * 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(DBTest, FailMoreDbPaths) {
|
||||||
|
Options options;
|
||||||
|
options.db_paths.push_back(dbname_);
|
||||||
|
options.db_paths.push_back(dbname_ + "_2");
|
||||||
|
ASSERT_TRUE(TryReopen(&options).IsNotSupported());
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
TEST(DBTest, ConvertCompactionStyle) {
|
TEST(DBTest, ConvertCompactionStyle) {
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
#include <ctype.h>
|
#include <ctype.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
#include <vector>
|
||||||
#include "db/dbformat.h"
|
#include "db/dbformat.h"
|
||||||
#include "rocksdb/env.h"
|
#include "rocksdb/env.h"
|
||||||
#include "util/logging.h"
|
#include "util/logging.h"
|
||||||
@ -66,9 +67,28 @@ std::string ArchivedLogFileName(const std::string& name, uint64_t number) {
|
|||||||
return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log");
|
return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string TableFileName(const std::string& name, uint64_t number) {
|
std::string MakeTableFileName(const std::string& path, uint64_t number) {
|
||||||
|
return MakeFileName(path, number, "sst");
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string TableFileName(const std::vector<std::string> db_paths,
|
||||||
|
uint64_t number, uint32_t path_id) {
|
||||||
assert(number > 0);
|
assert(number > 0);
|
||||||
return MakeFileName(name, number, "sst");
|
std::string path;
|
||||||
|
if (path_id >= db_paths.size()) {
|
||||||
|
path = db_paths.back();
|
||||||
|
} else {
|
||||||
|
path = db_paths[path_id];
|
||||||
|
}
|
||||||
|
return MakeTableFileName(path, number);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string FormatFileNumber(uint64_t number, uint32_t path_id) {
|
||||||
|
if (path_id == 0) {
|
||||||
|
return std::to_string(number);
|
||||||
|
} else {
|
||||||
|
return std::to_string(number) + "(path " + std::to_string(path_id) + ")";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
|
std::string DescriptorFileName(const std::string& dbname, uint64_t number) {
|
||||||
|
@ -11,7 +11,9 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
#include <unordered_map>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
#include "rocksdb/slice.h"
|
#include "rocksdb/slice.h"
|
||||||
#include "rocksdb/status.h"
|
#include "rocksdb/status.h"
|
||||||
#include "rocksdb/transaction_log.h"
|
#include "rocksdb/transaction_log.h"
|
||||||
@ -34,6 +36,9 @@ enum FileType {
|
|||||||
kIdentityFile
|
kIdentityFile
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// map from file number to path ID.
|
||||||
|
typedef std::unordered_map<uint64_t, uint32_t> FileNumToPathIdMap;
|
||||||
|
|
||||||
// Return the name of the log file with the specified number
|
// Return the name of the log file with the specified number
|
||||||
// in the db named by "dbname". The result will be prefixed with
|
// in the db named by "dbname". The result will be prefixed with
|
||||||
// "dbname".
|
// "dbname".
|
||||||
@ -48,10 +53,15 @@ extern std::string ArchivalDirectory(const std::string& dbname);
|
|||||||
extern std::string ArchivedLogFileName(const std::string& dbname,
|
extern std::string ArchivedLogFileName(const std::string& dbname,
|
||||||
uint64_t num);
|
uint64_t num);
|
||||||
|
|
||||||
|
extern std::string MakeTableFileName(const std::string& name, uint64_t number);
|
||||||
|
|
||||||
// Return the name of the sstable with the specified number
|
// Return the name of the sstable with the specified number
|
||||||
// in the db named by "dbname". The result will be prefixed with
|
// in the db named by "dbname". The result will be prefixed with
|
||||||
// "dbname".
|
// "dbname".
|
||||||
extern std::string TableFileName(const std::string& dbname, uint64_t number);
|
extern std::string TableFileName(const std::vector<std::string> db_paths,
|
||||||
|
uint64_t number, uint32_t path_id);
|
||||||
|
|
||||||
|
extern std::string FormatFileNumber(uint64_t number, uint32_t path_id);
|
||||||
|
|
||||||
// Return the name of the descriptor file for the db named by
|
// Return the name of the descriptor file for the db named by
|
||||||
// "dbname" and the specified incarnation number. The result will be
|
// "dbname" and the specified incarnation number. The result will be
|
||||||
|
@ -108,7 +108,9 @@ TEST(FileNameTest, Construction) {
|
|||||||
ASSERT_EQ(192U, number);
|
ASSERT_EQ(192U, number);
|
||||||
ASSERT_EQ(kLogFile, type);
|
ASSERT_EQ(kLogFile, type);
|
||||||
|
|
||||||
fname = TableFileName("bar", 200);
|
fname = TableFileName({"bar"}, 200, 0);
|
||||||
|
std::string fname1 = TableFileName({"foo", "bar"}, 200, 1);
|
||||||
|
ASSERT_EQ(fname, fname1);
|
||||||
ASSERT_EQ("bar/", std::string(fname.data(), 4));
|
ASSERT_EQ("bar/", std::string(fname.data(), 4));
|
||||||
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
|
ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type));
|
||||||
ASSERT_EQ(200U, number);
|
ASSERT_EQ(200U, number);
|
||||||
|
@ -51,7 +51,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
|
|||||||
for (int i = 0; i < num_base_files; i++) {
|
for (int i = 0; i < num_base_files; i++) {
|
||||||
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
|
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
|
||||||
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
|
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
|
||||||
vbase.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1);
|
vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
|
||||||
}
|
}
|
||||||
ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu));
|
ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu));
|
||||||
}
|
}
|
||||||
@ -61,7 +61,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
|
|||||||
vedit.DeleteFile(2, fnum);
|
vedit.DeleteFile(2, fnum);
|
||||||
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
|
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
|
||||||
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
|
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
|
||||||
vedit.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1);
|
vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
|
||||||
vset->LogAndApply(default_cfd, &vedit, &mu);
|
vset->LogAndApply(default_cfd, &vedit, &mu);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -140,7 +140,7 @@ void MemTableList::PickMemtablesToFlush(autovector<MemTable*>* ret) {
|
|||||||
|
|
||||||
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||||
uint64_t file_number,
|
uint64_t file_number,
|
||||||
std::set<uint64_t>* pending_outputs) {
|
FileNumToPathIdMap* pending_outputs) {
|
||||||
assert(!mems.empty());
|
assert(!mems.empty());
|
||||||
|
|
||||||
// If the flush was not successful, then just reset state.
|
// If the flush was not successful, then just reset state.
|
||||||
@ -162,7 +162,7 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
|||||||
Status MemTableList::InstallMemtableFlushResults(
|
Status MemTableList::InstallMemtableFlushResults(
|
||||||
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
|
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
|
||||||
port::Mutex* mu, Logger* info_log, uint64_t file_number,
|
port::Mutex* mu, Logger* info_log, uint64_t file_number,
|
||||||
std::set<uint64_t>& pending_outputs, autovector<MemTable*>* to_delete,
|
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
|
||||||
Directory* db_directory, LogBuffer* log_buffer) {
|
Directory* db_directory, LogBuffer* log_buffer) {
|
||||||
mu->AssertHeld();
|
mu->AssertHeld();
|
||||||
|
|
||||||
@ -219,7 +219,7 @@ Status MemTableList::InstallMemtableFlushResults(
|
|||||||
// has been written to a committed version so that other concurrently
|
// has been written to a committed version so that other concurrently
|
||||||
// executing compaction threads do not mistakenly assume that this
|
// executing compaction threads do not mistakenly assume that this
|
||||||
// file is not live.
|
// file is not live.
|
||||||
pending_outputs.erase(m->file_number_);
|
pending_outputs->erase(m->file_number_);
|
||||||
if (m->Unref() != nullptr) {
|
if (m->Unref() != nullptr) {
|
||||||
to_delete->push_back(m);
|
to_delete->push_back(m);
|
||||||
}
|
}
|
||||||
@ -233,7 +233,7 @@ Status MemTableList::InstallMemtableFlushResults(
|
|||||||
m->flush_in_progress_ = false;
|
m->flush_in_progress_ = false;
|
||||||
m->edit_.Clear();
|
m->edit_.Clear();
|
||||||
num_flush_not_started_++;
|
num_flush_not_started_++;
|
||||||
pending_outputs.erase(m->file_number_);
|
pending_outputs->erase(m->file_number_);
|
||||||
m->file_number_ = 0;
|
m->file_number_ = 0;
|
||||||
imm_flush_needed.Release_Store((void *)1);
|
imm_flush_needed.Release_Store((void *)1);
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
#include "rocksdb/iterator.h"
|
#include "rocksdb/iterator.h"
|
||||||
|
|
||||||
#include "db/dbformat.h"
|
#include "db/dbformat.h"
|
||||||
|
#include "db/filename.h"
|
||||||
#include "db/skiplist.h"
|
#include "db/skiplist.h"
|
||||||
#include "db/memtable.h"
|
#include "db/memtable.h"
|
||||||
#include "rocksdb/db.h"
|
#include "rocksdb/db.h"
|
||||||
@ -108,17 +109,14 @@ class MemTableList {
|
|||||||
// they can get picked up again on the next round of flush.
|
// they can get picked up again on the next round of flush.
|
||||||
void RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
void RollbackMemtableFlush(const autovector<MemTable*>& mems,
|
||||||
uint64_t file_number,
|
uint64_t file_number,
|
||||||
std::set<uint64_t>* pending_outputs);
|
FileNumToPathIdMap* pending_outputs);
|
||||||
|
|
||||||
// Commit a successful flush in the manifest file
|
// Commit a successful flush in the manifest file
|
||||||
Status InstallMemtableFlushResults(ColumnFamilyData* cfd,
|
Status InstallMemtableFlushResults(
|
||||||
const autovector<MemTable*>& m,
|
ColumnFamilyData* cfd, const autovector<MemTable*>& m, VersionSet* vset,
|
||||||
VersionSet* vset, port::Mutex* mu,
|
port::Mutex* mu, Logger* info_log, uint64_t file_number,
|
||||||
Logger* info_log, uint64_t file_number,
|
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
|
||||||
std::set<uint64_t>& pending_outputs,
|
Directory* db_directory, LogBuffer* log_buffer);
|
||||||
autovector<MemTable*>* to_delete,
|
|
||||||
Directory* db_directory,
|
|
||||||
LogBuffer* log_buffer);
|
|
||||||
|
|
||||||
// New memtables are inserted at the front of the list.
|
// New memtables are inserted at the front of the list.
|
||||||
// Takes ownership of the referenced held on *m by the caller of Add().
|
// Takes ownership of the referenced held on *m by the caller of Add().
|
||||||
|
91
db/repair.cc
91
db/repair.cc
@ -65,8 +65,8 @@ class Repairer {
|
|||||||
NewLRUCache(10, options_.table_cache_numshardbits,
|
NewLRUCache(10, options_.table_cache_numshardbits,
|
||||||
options_.table_cache_remove_scan_count_limit)),
|
options_.table_cache_remove_scan_count_limit)),
|
||||||
next_file_number_(1) {
|
next_file_number_(1) {
|
||||||
table_cache_ = new TableCache(dbname_, &options_, storage_options_,
|
table_cache_ =
|
||||||
raw_table_cache_.get());
|
new TableCache(&options_, storage_options_, raw_table_cache_.get());
|
||||||
edit_ = new VersionEdit();
|
edit_ = new VersionEdit();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,7 +116,7 @@ class Repairer {
|
|||||||
VersionEdit* edit_;
|
VersionEdit* edit_;
|
||||||
|
|
||||||
std::vector<std::string> manifests_;
|
std::vector<std::string> manifests_;
|
||||||
std::vector<uint64_t> table_numbers_;
|
std::vector<FileDescriptor> table_fds_;
|
||||||
std::vector<uint64_t> logs_;
|
std::vector<uint64_t> logs_;
|
||||||
std::vector<TableInfo> tables_;
|
std::vector<TableInfo> tables_;
|
||||||
uint64_t next_file_number_;
|
uint64_t next_file_number_;
|
||||||
@ -124,35 +124,43 @@ class Repairer {
|
|||||||
|
|
||||||
Status FindFiles() {
|
Status FindFiles() {
|
||||||
std::vector<std::string> filenames;
|
std::vector<std::string> filenames;
|
||||||
Status status = env_->GetChildren(dbname_, &filenames);
|
bool found_file = false;
|
||||||
if (!status.ok()) {
|
for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) {
|
||||||
return status;
|
Status status = env_->GetChildren(options_.db_paths[path_id], &filenames);
|
||||||
}
|
if (!status.ok()) {
|
||||||
if (filenames.empty()) {
|
return status;
|
||||||
return Status::Corruption(dbname_, "repair found no files");
|
}
|
||||||
}
|
if (!filenames.empty()) {
|
||||||
|
found_file = true;
|
||||||
|
}
|
||||||
|
|
||||||
uint64_t number;
|
uint64_t number;
|
||||||
FileType type;
|
FileType type;
|
||||||
for (size_t i = 0; i < filenames.size(); i++) {
|
for (size_t i = 0; i < filenames.size(); i++) {
|
||||||
if (ParseFileName(filenames[i], &number, &type)) {
|
if (ParseFileName(filenames[i], &number, &type)) {
|
||||||
if (type == kDescriptorFile) {
|
if (type == kDescriptorFile) {
|
||||||
manifests_.push_back(filenames[i]);
|
assert(path_id == 0);
|
||||||
} else {
|
manifests_.push_back(filenames[i]);
|
||||||
if (number + 1 > next_file_number_) {
|
|
||||||
next_file_number_ = number + 1;
|
|
||||||
}
|
|
||||||
if (type == kLogFile) {
|
|
||||||
logs_.push_back(number);
|
|
||||||
} else if (type == kTableFile) {
|
|
||||||
table_numbers_.push_back(number);
|
|
||||||
} else {
|
} else {
|
||||||
// Ignore other files
|
if (number + 1 > next_file_number_) {
|
||||||
|
next_file_number_ = number + 1;
|
||||||
|
}
|
||||||
|
if (type == kLogFile) {
|
||||||
|
assert(path_id == 0);
|
||||||
|
logs_.push_back(number);
|
||||||
|
} else if (type == kTableFile) {
|
||||||
|
table_fds_.emplace_back(number, path_id, 0);
|
||||||
|
} else {
|
||||||
|
// Ignore other files
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return status;
|
if (!found_file) {
|
||||||
|
return Status::Corruption(dbname_, "repair found no files");
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConvertLogFilesToTables() {
|
void ConvertLogFilesToTables() {
|
||||||
@ -228,7 +236,7 @@ class Repairer {
|
|||||||
// Do not record a version edit for this conversion to a Table
|
// Do not record a version edit for this conversion to a Table
|
||||||
// since ExtractMetaData() will also generate edits.
|
// since ExtractMetaData() will also generate edits.
|
||||||
FileMetaData meta;
|
FileMetaData meta;
|
||||||
meta.fd.number = next_file_number_++;
|
meta.fd = FileDescriptor(next_file_number_++, 0, 0);
|
||||||
ReadOptions ro;
|
ReadOptions ro;
|
||||||
Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */);
|
Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */);
|
||||||
status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_,
|
status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_,
|
||||||
@ -239,7 +247,7 @@ class Repairer {
|
|||||||
mem = nullptr;
|
mem = nullptr;
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
if (meta.fd.GetFileSize() > 0) {
|
if (meta.fd.GetFileSize() > 0) {
|
||||||
table_numbers_.push_back(meta.fd.GetNumber());
|
table_fds_.push_back(meta.fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Log(options_.info_log,
|
Log(options_.info_log,
|
||||||
@ -249,14 +257,17 @@ class Repairer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void ExtractMetaData() {
|
void ExtractMetaData() {
|
||||||
for (size_t i = 0; i < table_numbers_.size(); i++) {
|
for (size_t i = 0; i < table_fds_.size(); i++) {
|
||||||
TableInfo t;
|
TableInfo t;
|
||||||
t.meta.fd.number = table_numbers_[i];
|
t.meta.fd = table_fds_[i];
|
||||||
Status status = ScanTable(&t);
|
Status status = ScanTable(&t);
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
std::string fname = TableFileName(dbname_, table_numbers_[i]);
|
std::string fname = TableFileName(
|
||||||
Log(options_.info_log, "Table #%" PRIu64 ": ignoring %s",
|
options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId());
|
||||||
table_numbers_[i], status.ToString().c_str());
|
Log(options_.info_log, "Table #%s: ignoring %s",
|
||||||
|
FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId())
|
||||||
|
.c_str(),
|
||||||
|
status.ToString().c_str());
|
||||||
ArchiveFile(fname);
|
ArchiveFile(fname);
|
||||||
} else {
|
} else {
|
||||||
tables_.push_back(t);
|
tables_.push_back(t);
|
||||||
@ -265,9 +276,13 @@ class Repairer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Status ScanTable(TableInfo* t) {
|
Status ScanTable(TableInfo* t) {
|
||||||
std::string fname = TableFileName(dbname_, t->meta.fd.GetNumber());
|
std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(),
|
||||||
|
t->meta.fd.GetPathId());
|
||||||
int counter = 0;
|
int counter = 0;
|
||||||
Status status = env_->GetFileSize(fname, &t->meta.fd.file_size);
|
uint64_t file_size;
|
||||||
|
Status status = env_->GetFileSize(fname, &file_size);
|
||||||
|
t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
|
||||||
|
file_size);
|
||||||
if (status.ok()) {
|
if (status.ok()) {
|
||||||
Iterator* iter = table_cache_->NewIterator(
|
Iterator* iter = table_cache_->NewIterator(
|
||||||
ReadOptions(), storage_options_, icmp_, t->meta.fd);
|
ReadOptions(), storage_options_, icmp_, t->meta.fd);
|
||||||
@ -330,9 +345,9 @@ class Repairer {
|
|||||||
for (size_t i = 0; i < tables_.size(); i++) {
|
for (size_t i = 0; i < tables_.size(); i++) {
|
||||||
// TODO(opt): separate out into multiple levels
|
// TODO(opt): separate out into multiple levels
|
||||||
const TableInfo& t = tables_[i];
|
const TableInfo& t = tables_[i];
|
||||||
edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetFileSize(),
|
edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(),
|
||||||
t.meta.smallest, t.meta.largest, t.min_sequence,
|
t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest,
|
||||||
t.max_sequence);
|
t.min_sequence, t.max_sequence);
|
||||||
}
|
}
|
||||||
|
|
||||||
//fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
|
//fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str());
|
||||||
|
@ -36,10 +36,10 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) {
|
|||||||
sizeof(*file_number));
|
sizeof(*file_number));
|
||||||
}
|
}
|
||||||
|
|
||||||
TableCache::TableCache(const std::string& dbname, const Options* options,
|
TableCache::TableCache(const Options* options,
|
||||||
const EnvOptions& storage_options, Cache* const cache)
|
const EnvOptions& storage_options, Cache* const cache)
|
||||||
: env_(options->env),
|
: env_(options->env),
|
||||||
dbname_(dbname),
|
db_paths_(options->db_paths),
|
||||||
options_(options),
|
options_(options),
|
||||||
storage_options_(storage_options),
|
storage_options_(storage_options),
|
||||||
cache_(cache) {}
|
cache_(cache) {}
|
||||||
@ -60,13 +60,15 @@ Status TableCache::FindTable(const EnvOptions& toptions,
|
|||||||
const FileDescriptor& fd, Cache::Handle** handle,
|
const FileDescriptor& fd, Cache::Handle** handle,
|
||||||
const bool no_io) {
|
const bool no_io) {
|
||||||
Status s;
|
Status s;
|
||||||
Slice key = GetSliceForFileNumber(&fd.number);
|
uint64_t number = fd.GetNumber();
|
||||||
|
Slice key = GetSliceForFileNumber(&number);
|
||||||
*handle = cache_->Lookup(key);
|
*handle = cache_->Lookup(key);
|
||||||
if (*handle == nullptr) {
|
if (*handle == nullptr) {
|
||||||
if (no_io) { // Dont do IO and return a not-found status
|
if (no_io) { // Dont do IO and return a not-found status
|
||||||
return Status::Incomplete("Table not found in table_cache, no_io is set");
|
return Status::Incomplete("Table not found in table_cache, no_io is set");
|
||||||
}
|
}
|
||||||
std::string fname = TableFileName(dbname_, fd.GetNumber());
|
std::string fname =
|
||||||
|
TableFileName(db_paths_, fd.GetNumber(), fd.GetPathId());
|
||||||
unique_ptr<RandomAccessFile> file;
|
unique_ptr<RandomAccessFile> file;
|
||||||
unique_ptr<TableReader> table_reader;
|
unique_ptr<TableReader> table_reader;
|
||||||
s = env_->NewRandomAccessFile(fname, &file, toptions);
|
s = env_->NewRandomAccessFile(fname, &file, toptions);
|
||||||
|
@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#include "db/dbformat.h"
|
#include "db/dbformat.h"
|
||||||
@ -28,8 +29,8 @@ struct FileDescriptor;
|
|||||||
|
|
||||||
class TableCache {
|
class TableCache {
|
||||||
public:
|
public:
|
||||||
TableCache(const std::string& dbname, const Options* options,
|
TableCache(const Options* options, const EnvOptions& storage_options,
|
||||||
const EnvOptions& storage_options, Cache* cache);
|
Cache* cache);
|
||||||
~TableCache();
|
~TableCache();
|
||||||
|
|
||||||
// Return an iterator for the specified file number (the corresponding
|
// Return an iterator for the specified file number (the corresponding
|
||||||
@ -84,7 +85,7 @@ class TableCache {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
Env* const env_;
|
Env* const env_;
|
||||||
const std::string dbname_;
|
const std::vector<std::string> db_paths_;
|
||||||
const Options* options_;
|
const Options* options_;
|
||||||
const EnvOptions& storage_options_;
|
const EnvOptions& storage_options_;
|
||||||
Cache* const cache_;
|
Cache* const cache_;
|
||||||
|
@ -18,25 +18,30 @@ namespace rocksdb {
|
|||||||
// Tag numbers for serialized VersionEdit. These numbers are written to
|
// Tag numbers for serialized VersionEdit. These numbers are written to
|
||||||
// disk and should not be changed.
|
// disk and should not be changed.
|
||||||
enum Tag {
|
enum Tag {
|
||||||
kComparator = 1,
|
kComparator = 1,
|
||||||
kLogNumber = 2,
|
kLogNumber = 2,
|
||||||
kNextFileNumber = 3,
|
kNextFileNumber = 3,
|
||||||
kLastSequence = 4,
|
kLastSequence = 4,
|
||||||
kCompactPointer = 5,
|
kCompactPointer = 5,
|
||||||
kDeletedFile = 6,
|
kDeletedFile = 6,
|
||||||
kNewFile = 7,
|
kNewFile = 7,
|
||||||
// 8 was used for large value refs
|
// 8 was used for large value refs
|
||||||
kPrevLogNumber = 9,
|
kPrevLogNumber = 9,
|
||||||
|
|
||||||
// these are new formats divergent from open source leveldb
|
// these are new formats divergent from open source leveldb
|
||||||
kNewFile2 = 100, // store smallest & largest seqno
|
kNewFile2 = 100,
|
||||||
|
kNewFile3 = 102,
|
||||||
kColumnFamily = 200, // specify column family for version edit
|
kColumnFamily = 200, // specify column family for version edit
|
||||||
kColumnFamilyAdd = 201,
|
kColumnFamilyAdd = 201,
|
||||||
kColumnFamilyDrop = 202,
|
kColumnFamilyDrop = 202,
|
||||||
kMaxColumnFamily = 203,
|
kMaxColumnFamily = 203,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) {
|
||||||
|
assert(number <= kFileNumberMask);
|
||||||
|
return number | (path_id * (kFileNumberMask + 1));
|
||||||
|
}
|
||||||
|
|
||||||
void VersionEdit::Clear() {
|
void VersionEdit::Clear() {
|
||||||
comparator_.clear();
|
comparator_.clear();
|
||||||
max_level_ = 0;
|
max_level_ = 0;
|
||||||
@ -93,9 +98,18 @@ void VersionEdit::EncodeTo(std::string* dst) const {
|
|||||||
|
|
||||||
for (size_t i = 0; i < new_files_.size(); i++) {
|
for (size_t i = 0; i < new_files_.size(); i++) {
|
||||||
const FileMetaData& f = new_files_[i].second;
|
const FileMetaData& f = new_files_[i].second;
|
||||||
PutVarint32(dst, kNewFile2);
|
if (f.fd.GetPathId() == 0) {
|
||||||
|
// Use older format to make sure user can roll back the build if they
|
||||||
|
// don't config multiple DB paths.
|
||||||
|
PutVarint32(dst, kNewFile2);
|
||||||
|
} else {
|
||||||
|
PutVarint32(dst, kNewFile3);
|
||||||
|
}
|
||||||
PutVarint32(dst, new_files_[i].first); // level
|
PutVarint32(dst, new_files_[i].first); // level
|
||||||
PutVarint64(dst, f.fd.GetNumber());
|
PutVarint64(dst, f.fd.GetNumber());
|
||||||
|
if (f.fd.GetPathId() != 0) {
|
||||||
|
PutVarint32(dst, f.fd.GetPathId());
|
||||||
|
}
|
||||||
PutVarint64(dst, f.fd.GetFileSize());
|
PutVarint64(dst, f.fd.GetFileSize());
|
||||||
PutLengthPrefixedSlice(dst, f.smallest.Encode());
|
PutLengthPrefixedSlice(dst, f.smallest.Encode());
|
||||||
PutLengthPrefixedSlice(dst, f.largest.Encode());
|
PutLengthPrefixedSlice(dst, f.largest.Encode());
|
||||||
@ -237,7 +251,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
GetVarint64(&input, &file_size) &&
|
GetVarint64(&input, &file_size) &&
|
||||||
GetInternalKey(&input, &f.smallest) &&
|
GetInternalKey(&input, &f.smallest) &&
|
||||||
GetInternalKey(&input, &f.largest)) {
|
GetInternalKey(&input, &f.largest)) {
|
||||||
f.fd = FileDescriptor(number, file_size);
|
f.fd = FileDescriptor(number, 0, file_size);
|
||||||
new_files_.push_back(std::make_pair(level, f));
|
new_files_.push_back(std::make_pair(level, f));
|
||||||
} else {
|
} else {
|
||||||
if (!msg) {
|
if (!msg) {
|
||||||
@ -255,7 +269,27 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
|
|||||||
GetInternalKey(&input, &f.largest) &&
|
GetInternalKey(&input, &f.largest) &&
|
||||||
GetVarint64(&input, &f.smallest_seqno) &&
|
GetVarint64(&input, &f.smallest_seqno) &&
|
||||||
GetVarint64(&input, &f.largest_seqno)) {
|
GetVarint64(&input, &f.largest_seqno)) {
|
||||||
f.fd = FileDescriptor(number, file_size);
|
f.fd = FileDescriptor(number, 0, file_size);
|
||||||
|
new_files_.push_back(std::make_pair(level, f));
|
||||||
|
} else {
|
||||||
|
if (!msg) {
|
||||||
|
msg = "new-file2 entry";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case kNewFile3: {
|
||||||
|
uint64_t number;
|
||||||
|
uint32_t path_id;
|
||||||
|
uint64_t file_size;
|
||||||
|
if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) &&
|
||||||
|
GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) &&
|
||||||
|
GetInternalKey(&input, &f.smallest) &&
|
||||||
|
GetInternalKey(&input, &f.largest) &&
|
||||||
|
GetVarint64(&input, &f.smallest_seqno) &&
|
||||||
|
GetVarint64(&input, &f.largest_seqno)) {
|
||||||
|
f.fd = FileDescriptor(number, path_id, file_size);
|
||||||
new_files_.push_back(std::make_pair(level, f));
|
new_files_.push_back(std::make_pair(level, f));
|
||||||
} else {
|
} else {
|
||||||
if (!msg) {
|
if (!msg) {
|
||||||
|
@ -19,21 +19,41 @@ namespace rocksdb {
|
|||||||
|
|
||||||
class VersionSet;
|
class VersionSet;
|
||||||
|
|
||||||
|
const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF;
|
||||||
|
|
||||||
|
extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id);
|
||||||
|
|
||||||
// A copyable structure contains information needed to read data from an SST
|
// A copyable structure contains information needed to read data from an SST
|
||||||
// file. It can contains a pointer to a table reader opened for the file, or
|
// file. It can contains a pointer to a table reader opened for the file, or
|
||||||
// file number and size, which can be used to create a new table reader for it.
|
// file number and size, which can be used to create a new table reader for it.
|
||||||
// The behavior is undefined when a copied of the structure is used when the
|
// The behavior is undefined when a copied of the structure is used when the
|
||||||
// file is not in any live version any more.
|
// file is not in any live version any more.
|
||||||
struct FileDescriptor {
|
struct FileDescriptor {
|
||||||
uint64_t number;
|
|
||||||
uint64_t file_size; // File size in bytes
|
|
||||||
// Table reader in table_reader_handle
|
// Table reader in table_reader_handle
|
||||||
TableReader* table_reader;
|
TableReader* table_reader;
|
||||||
|
uint64_t packed_number_and_path_id;
|
||||||
|
uint64_t file_size; // File size in bytes
|
||||||
|
|
||||||
FileDescriptor(uint64_t number, uint64_t file_size)
|
FileDescriptor() : FileDescriptor(0, 0, 0) {}
|
||||||
: number(number), file_size(file_size), table_reader(nullptr) {}
|
|
||||||
|
|
||||||
uint64_t GetNumber() const { return number; }
|
FileDescriptor(uint64_t number, uint32_t path_id, uint64_t file_size)
|
||||||
|
: table_reader(nullptr),
|
||||||
|
packed_number_and_path_id(PackFileNumberAndPathId(number, path_id)),
|
||||||
|
file_size(file_size) {}
|
||||||
|
|
||||||
|
FileDescriptor& operator=(const FileDescriptor& fd) {
|
||||||
|
table_reader = fd.table_reader;
|
||||||
|
packed_number_and_path_id = fd.packed_number_and_path_id;
|
||||||
|
file_size = fd.file_size;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t GetNumber() const {
|
||||||
|
return packed_number_and_path_id & kFileNumberMask;
|
||||||
|
}
|
||||||
|
uint32_t GetPathId() const {
|
||||||
|
return packed_number_and_path_id / (kFileNumberMask + 1);
|
||||||
|
}
|
||||||
uint64_t GetFileSize() const { return file_size; }
|
uint64_t GetFileSize() const { return file_size; }
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -58,7 +78,6 @@ struct FileMetaData {
|
|||||||
|
|
||||||
FileMetaData()
|
FileMetaData()
|
||||||
: refs(0),
|
: refs(0),
|
||||||
fd(0, 0),
|
|
||||||
being_compacted(false),
|
being_compacted(false),
|
||||||
table_reader_handle(nullptr),
|
table_reader_handle(nullptr),
|
||||||
compensated_file_size(0),
|
compensated_file_size(0),
|
||||||
@ -103,15 +122,13 @@ class VersionEdit {
|
|||||||
// Add the specified file at the specified number.
|
// Add the specified file at the specified number.
|
||||||
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
|
// REQUIRES: This version has not been saved (see VersionSet::SaveTo)
|
||||||
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
|
// REQUIRES: "smallest" and "largest" are smallest and largest keys in file
|
||||||
void AddFile(int level, uint64_t file,
|
void AddFile(int level, uint64_t file, uint64_t file_size,
|
||||||
uint64_t file_size,
|
uint64_t file_path_id, const InternalKey& smallest,
|
||||||
const InternalKey& smallest,
|
const InternalKey& largest, const SequenceNumber& smallest_seqno,
|
||||||
const InternalKey& largest,
|
|
||||||
const SequenceNumber& smallest_seqno,
|
|
||||||
const SequenceNumber& largest_seqno) {
|
const SequenceNumber& largest_seqno) {
|
||||||
assert(smallest_seqno <= largest_seqno);
|
assert(smallest_seqno <= largest_seqno);
|
||||||
FileMetaData f;
|
FileMetaData f;
|
||||||
f.fd = FileDescriptor(file, file_size);
|
f.fd = FileDescriptor(file, file_size, file_path_id);
|
||||||
f.smallest = smallest;
|
f.smallest = smallest;
|
||||||
f.largest = largest;
|
f.largest = largest;
|
||||||
f.smallest_seqno = smallest_seqno;
|
f.smallest_seqno = smallest_seqno;
|
||||||
|
@ -30,11 +30,10 @@ TEST(VersionEditTest, EncodeDecode) {
|
|||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
TestEncodeDecode(edit);
|
TestEncodeDecode(edit);
|
||||||
edit.AddFile(3, kBig + 300 + i, kBig + 400 + i,
|
edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, 0,
|
||||||
InternalKey("foo", kBig + 500 + i, kTypeValue),
|
InternalKey("foo", kBig + 500 + i, kTypeValue),
|
||||||
InternalKey("zoo", kBig + 600 + i, kTypeDeletion),
|
InternalKey("zoo", kBig + 600 + i, kTypeDeletion),
|
||||||
kBig + 500 + i,
|
kBig + 500 + i, kBig + 600 + i);
|
||||||
kBig + 600 + i);
|
|
||||||
edit.DeleteFile(4, kBig + 700 + i);
|
edit.DeleteFile(4, kBig + 700 + i);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
#include <set>
|
#include <set>
|
||||||
#include <climits>
|
#include <climits>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
|
||||||
#include "db/filename.h"
|
#include "db/filename.h"
|
||||||
@ -171,7 +172,7 @@ class Version::LevelFileNumIterator : public Iterator {
|
|||||||
: icmp_(icmp),
|
: icmp_(icmp),
|
||||||
flist_(flist),
|
flist_(flist),
|
||||||
index_(flist->size()),
|
index_(flist->size()),
|
||||||
current_value_(0, 0) { // Marks as invalid
|
current_value_(0, 0, 0) { // Marks as invalid
|
||||||
}
|
}
|
||||||
virtual bool Valid() const {
|
virtual bool Valid() const {
|
||||||
return index_ < flist_->size();
|
return index_ < flist_->size();
|
||||||
@ -276,7 +277,8 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
|
|||||||
*fname, &file, vset_->storage_options_);
|
*fname, &file, vset_->storage_options_);
|
||||||
} else {
|
} else {
|
||||||
s = options->env->NewRandomAccessFile(
|
s = options->env->NewRandomAccessFile(
|
||||||
TableFileName(vset_->dbname_, file_meta->fd.GetNumber()),
|
TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(),
|
||||||
|
file_meta->fd.GetPathId()),
|
||||||
&file, vset_->storage_options_);
|
&file, vset_->storage_options_);
|
||||||
}
|
}
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
@ -303,7 +305,9 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
|
|||||||
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
|
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
|
||||||
for (int level = 0; level < num_levels_; level++) {
|
for (int level = 0; level < num_levels_; level++) {
|
||||||
for (const auto& file_meta : files_[level]) {
|
for (const auto& file_meta : files_[level]) {
|
||||||
auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber());
|
auto fname =
|
||||||
|
TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(),
|
||||||
|
file_meta->fd.GetPathId());
|
||||||
// 1. If the table is already present in table cache, load table
|
// 1. If the table is already present in table cache, load table
|
||||||
// properties from there.
|
// properties from there.
|
||||||
std::shared_ptr<const TableProperties> table_properties;
|
std::shared_ptr<const TableProperties> table_properties;
|
||||||
@ -1268,11 +1272,11 @@ int64_t Version::MaxNextLevelOverlappingBytes() {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Version::AddLiveFiles(std::set<uint64_t>* live) {
|
void Version::AddLiveFiles(std::vector<FileDescriptor>* live) {
|
||||||
for (int level = 0; level < NumberLevels(); level++) {
|
for (int level = 0; level < NumberLevels(); level++) {
|
||||||
const std::vector<FileMetaData*>& files = files_[level];
|
const std::vector<FileMetaData*>& files = files_[level];
|
||||||
for (const auto& file : files) {
|
for (const auto& file : files) {
|
||||||
live->insert(file->fd.GetNumber());
|
live->push_back(file->fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1425,7 +1429,7 @@ class VersionSet::Builder {
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void CheckConsistencyForDeletes(VersionEdit* edit, unsigned int number,
|
void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
|
||||||
int level) {
|
int level) {
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
// a file to be deleted better exist in the previous version
|
// a file to be deleted better exist in the previous version
|
||||||
@ -1467,6 +1471,9 @@ class VersionSet::Builder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!found) {
|
||||||
|
fprintf(stderr, "not found %ld\n", number);
|
||||||
|
}
|
||||||
assert(found);
|
assert(found);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
@ -2160,17 +2167,15 @@ Status VersionSet::Recover(
|
|||||||
last_sequence_ = last_sequence;
|
last_sequence_ = last_sequence;
|
||||||
prev_log_number_ = prev_log_number;
|
prev_log_number_ = prev_log_number;
|
||||||
|
|
||||||
Log(options_->info_log, "Recovered from manifest file:%s succeeded,"
|
Log(options_->info_log,
|
||||||
|
"Recovered from manifest file:%s succeeded,"
|
||||||
"manifest_file_number is %lu, next_file_number is %lu, "
|
"manifest_file_number is %lu, next_file_number is %lu, "
|
||||||
"last_sequence is %lu, log_number is %lu,"
|
"last_sequence is %lu, log_number is %lu,"
|
||||||
"prev_log_number is %lu,"
|
"prev_log_number is %lu,"
|
||||||
"max_column_family is %u\n",
|
"max_column_family is %u\n",
|
||||||
manifest_filename.c_str(),
|
manifest_filename.c_str(), (unsigned long)manifest_file_number_,
|
||||||
(unsigned long)manifest_file_number_,
|
(unsigned long)next_file_number_, (unsigned long)last_sequence_,
|
||||||
(unsigned long)next_file_number_,
|
(unsigned long)log_number, (unsigned long)prev_log_number_,
|
||||||
(unsigned long)last_sequence_,
|
|
||||||
(unsigned long)log_number,
|
|
||||||
(unsigned long)prev_log_number_,
|
|
||||||
column_family_set_->GetMaxColumnFamily());
|
column_family_set_->GetMaxColumnFamily());
|
||||||
|
|
||||||
for (auto cfd : *column_family_set_) {
|
for (auto cfd : *column_family_set_) {
|
||||||
@ -2557,9 +2562,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
|
|||||||
|
|
||||||
for (int level = 0; level < cfd->NumberLevels(); level++) {
|
for (int level = 0; level < cfd->NumberLevels(); level++) {
|
||||||
for (const auto& f : cfd->current()->files_[level]) {
|
for (const auto& f : cfd->current()->files_[level]) {
|
||||||
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetFileSize(),
|
edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
|
||||||
f->smallest, f->largest, f->smallest_seqno,
|
f->fd.GetFileSize(), f->smallest, f->largest,
|
||||||
f->largest_seqno);
|
f->smallest_seqno, f->largest_seqno);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
edit.SetLogNumber(cfd->GetLogNumber());
|
edit.SetLogNumber(cfd->GetLogNumber());
|
||||||
@ -2641,7 +2646,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
|
void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
|
||||||
// pre-calculate space requirement
|
// pre-calculate space requirement
|
||||||
int64_t total_files = 0;
|
int64_t total_files = 0;
|
||||||
for (auto cfd : *column_family_set_) {
|
for (auto cfd : *column_family_set_) {
|
||||||
@ -2663,7 +2668,7 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
|
|||||||
v = v->next_) {
|
v = v->next_) {
|
||||||
for (int level = 0; level < v->NumberLevels(); level++) {
|
for (int level = 0; level < v->NumberLevels(); level++) {
|
||||||
for (const auto& f : v->files_[level]) {
|
for (const auto& f : v->files_[level]) {
|
||||||
live_list->push_back(f->fd.GetNumber());
|
live_list->push_back(f->fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2786,7 +2791,14 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
|
|||||||
for (const auto& file : cfd->current()->files_[level]) {
|
for (const auto& file : cfd->current()->files_[level]) {
|
||||||
LiveFileMetaData filemetadata;
|
LiveFileMetaData filemetadata;
|
||||||
filemetadata.column_family_name = cfd->GetName();
|
filemetadata.column_family_name = cfd->GetName();
|
||||||
filemetadata.name = TableFileName("", file->fd.GetNumber());
|
uint32_t path_id = file->fd.GetPathId();
|
||||||
|
if (path_id < options_->db_paths.size()) {
|
||||||
|
filemetadata.db_path = options_->db_paths[path_id];
|
||||||
|
} else {
|
||||||
|
assert(!options_->db_paths.empty());
|
||||||
|
filemetadata.db_path = options_->db_paths.back();
|
||||||
|
}
|
||||||
|
filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
|
||||||
filemetadata.level = level;
|
filemetadata.level = level;
|
||||||
filemetadata.size = file->fd.GetFileSize();
|
filemetadata.size = file->fd.GetFileSize();
|
||||||
filemetadata.smallestkey = file->smallest.user_key().ToString();
|
filemetadata.smallestkey = file->smallest.user_key().ToString();
|
||||||
|
@ -188,7 +188,7 @@ class Version {
|
|||||||
int64_t MaxNextLevelOverlappingBytes();
|
int64_t MaxNextLevelOverlappingBytes();
|
||||||
|
|
||||||
// Add all files listed in the current version to *live.
|
// Add all files listed in the current version to *live.
|
||||||
void AddLiveFiles(std::set<uint64_t>* live);
|
void AddLiveFiles(std::vector<FileDescriptor>* live);
|
||||||
|
|
||||||
// Return a human readable string that describes this version's contents.
|
// Return a human readable string that describes this version's contents.
|
||||||
std::string DebugString(bool hex = false) const;
|
std::string DebugString(bool hex = false) const;
|
||||||
@ -399,7 +399,7 @@ class VersionSet {
|
|||||||
// Arrange to reuse "file_number" unless a newer file number has
|
// Arrange to reuse "file_number" unless a newer file number has
|
||||||
// already been allocated.
|
// already been allocated.
|
||||||
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
|
// REQUIRES: "file_number" was returned by a call to NewFileNumber().
|
||||||
void ReuseFileNumber(uint64_t file_number) {
|
void ReuseLogFileNumber(uint64_t file_number) {
|
||||||
if (next_file_number_ == file_number + 1) {
|
if (next_file_number_ == file_number + 1) {
|
||||||
next_file_number_ = file_number;
|
next_file_number_ = file_number;
|
||||||
}
|
}
|
||||||
@ -440,7 +440,7 @@ class VersionSet {
|
|||||||
Iterator* MakeInputIterator(Compaction* c);
|
Iterator* MakeInputIterator(Compaction* c);
|
||||||
|
|
||||||
// Add all files listed in any live version to *live.
|
// Add all files listed in any live version to *live.
|
||||||
void AddLiveFiles(std::vector<uint64_t>* live_list);
|
void AddLiveFiles(std::vector<FileDescriptor>* live_list);
|
||||||
|
|
||||||
// Return the approximate offset in the database of the data for
|
// Return the approximate offset in the database of the data for
|
||||||
// "key" as of version "v".
|
// "key" as of version "v".
|
||||||
|
@ -31,7 +31,7 @@ class FindFileTest {
|
|||||||
SequenceNumber smallest_seq = 100,
|
SequenceNumber smallest_seq = 100,
|
||||||
SequenceNumber largest_seq = 100) {
|
SequenceNumber largest_seq = 100) {
|
||||||
FileMetaData* f = new FileMetaData;
|
FileMetaData* f = new FileMetaData;
|
||||||
f->fd = FileDescriptor(files_.size() + 1, 0);
|
f->fd = FileDescriptor(files_.size() + 1, 0, 0);
|
||||||
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
|
f->smallest = InternalKey(smallest, smallest_seq, kTypeValue);
|
||||||
f->largest = InternalKey(largest, largest_seq, kTypeValue);
|
f->largest = InternalKey(largest, largest_seq, kTypeValue);
|
||||||
files_.push_back(f);
|
files_.push_back(f);
|
||||||
|
@ -55,6 +55,7 @@ class Env;
|
|||||||
// Metadata associated with each SST file.
|
// Metadata associated with each SST file.
|
||||||
struct LiveFileMetaData {
|
struct LiveFileMetaData {
|
||||||
std::string column_family_name; // Name of the column family
|
std::string column_family_name; // Name of the column family
|
||||||
|
std::string db_path;
|
||||||
std::string name; // Name of the file
|
std::string name; // Name of the file
|
||||||
int level; // Level at which this file resides.
|
int level; // Level at which this file resides.
|
||||||
size_t size; // File size in bytes.
|
size_t size; // File size in bytes.
|
||||||
|
@ -675,6 +675,13 @@ struct DBOptions {
|
|||||||
// Default value is 1800 (half an hour).
|
// Default value is 1800 (half an hour).
|
||||||
int db_stats_log_interval;
|
int db_stats_log_interval;
|
||||||
|
|
||||||
|
// A list paths where SST files can be put into. A compaction style can
|
||||||
|
// determine which of those paths it will put the file to.
|
||||||
|
// If left empty, only one path will be used, which is db_name passed when
|
||||||
|
// opening the DB.
|
||||||
|
// Default: empty
|
||||||
|
std::vector<std::string> db_paths;
|
||||||
|
|
||||||
// This specifies the info LOG dir.
|
// This specifies the info LOG dir.
|
||||||
// If it is empty, the log files will be in the same dir as data.
|
// If it is empty, the log files will be in the same dir as data.
|
||||||
// If it is non empty, the log files will be in the specified dir,
|
// If it is non empty, the log files will be in the specified dir,
|
||||||
|
@ -8,6 +8,7 @@
|
|||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <climits>
|
#include <climits>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
@ -61,6 +62,7 @@ class CompactionOptionsUniversal {
|
|||||||
// well as the total size of C1...Ct as total_C, the compaction output file
|
// well as the total size of C1...Ct as total_C, the compaction output file
|
||||||
// will be compressed iff
|
// will be compressed iff
|
||||||
// total_C / total_size < this percentage
|
// total_C / total_size < this percentage
|
||||||
|
// Default: -1
|
||||||
int compression_size_percent;
|
int compression_size_percent;
|
||||||
|
|
||||||
// The algorithm used to stop picking files into a single compaction run
|
// The algorithm used to stop picking files into a single compaction run
|
||||||
@ -68,14 +70,13 @@ class CompactionOptionsUniversal {
|
|||||||
CompactionStopStyle stop_style;
|
CompactionStopStyle stop_style;
|
||||||
|
|
||||||
// Default set of parameters
|
// Default set of parameters
|
||||||
CompactionOptionsUniversal() :
|
CompactionOptionsUniversal()
|
||||||
size_ratio(1),
|
: size_ratio(1),
|
||||||
min_merge_width(2),
|
min_merge_width(2),
|
||||||
max_merge_width(UINT_MAX),
|
max_merge_width(UINT_MAX),
|
||||||
max_size_amplification_percent(200),
|
max_size_amplification_percent(200),
|
||||||
compression_size_percent(-1),
|
compression_size_percent(-1),
|
||||||
stop_style(kCompactionStopStyleTotalSize) {
|
stop_style(kCompactionStopStyleTotalSize) {}
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
@ -286,6 +286,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (opt.db_paths.size() == 0) {
|
||||||
|
opt.db_paths.push_back(db_path_);
|
||||||
|
}
|
||||||
|
|
||||||
return opt;
|
return opt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,6 +214,7 @@ DBOptions::DBOptions(const Options& options)
|
|||||||
disableDataSync(options.disableDataSync),
|
disableDataSync(options.disableDataSync),
|
||||||
use_fsync(options.use_fsync),
|
use_fsync(options.use_fsync),
|
||||||
db_stats_log_interval(options.db_stats_log_interval),
|
db_stats_log_interval(options.db_stats_log_interval),
|
||||||
|
db_paths(options.db_paths),
|
||||||
db_log_dir(options.db_log_dir),
|
db_log_dir(options.db_log_dir),
|
||||||
wal_dir(options.wal_dir),
|
wal_dir(options.wal_dir),
|
||||||
delete_obsolete_files_period_micros(
|
delete_obsolete_files_period_micros(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user