From 6b2e7a2a01836c8338a4fb11b1790917f72cd080 Mon Sep 17 00:00:00 2001 From: sdong Date: Thu, 20 Mar 2014 17:32:55 -0700 Subject: [PATCH 01/23] When Options.max_num_files=-1, non level0 files also by pass table cache Summary: This is the part that was not finished when doing the Options.max_num_files=-1 feature. For iterating non level0 SST files (which was done using two level iterator), table cache is not bypassed. With this patch, the leftover feature is done. Test Plan: make all check; change Options.max_num_files=-1 in one of the tests to cover the codes. Reviewers: haobo, igor, dhruba, ljin, yhchiang Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17001 --- db/db_test.cc | 1 + db/table_cache.cc | 26 ++++++++++++++++++-------- db/table_cache.h | 2 +- db/version_set.cc | 26 ++++++++++++-------------- 4 files changed, 32 insertions(+), 23 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index d3bbf8382..ef39a014b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3330,6 +3330,7 @@ TEST(DBTest, InPlaceUpdateCallbackNoAction) { TEST(DBTest, CompactionFilter) { Options options = CurrentOptions(); + options.max_open_files = -1; options.num_levels = 3; options.max_mem_compaction_level = 0; options.compaction_filter_factory = std::make_shared(); diff --git a/db/table_cache.cc b/db/table_cache.cc index 222e60eda..7058221e0 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -192,17 +192,27 @@ Status TableCache::GetTableProperties( bool TableCache::PrefixMayMatch(const ReadOptions& options, const InternalKeyComparator& icomparator, - uint64_t file_number, uint64_t file_size, + const FileMetaData& file_meta, const Slice& internal_prefix, bool* table_io) { - Cache::Handle* handle = nullptr; - Status s = FindTable(storage_options_, icomparator, file_number, file_size, - &handle, table_io); bool may_match = true; - if (s.ok()) { - TableReader* t = GetTableReaderFromHandle(handle); - may_match = t->PrefixMayMatch(internal_prefix); - ReleaseHandle(handle); + auto table_handle = file_meta.table_reader_handle; + if (table_handle == nullptr) { + // Need to get table handle from file number + Status s = FindTable(storage_options_, icomparator, file_meta.number, + file_meta.file_size, &table_handle, table_io); + if (!s.ok()) { + return may_match; + } } + + auto table = GetTableReaderFromHandle(table_handle); + may_match = table->PrefixMayMatch(internal_prefix); + + if (file_meta.table_reader_handle == nullptr) { + // Need to release handle if it is generated from here. + ReleaseHandle(table_handle); + } + return may_match; } diff --git a/db/table_cache.h b/db/table_cache.h index 38f08031d..42dee2f0f 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -58,7 +58,7 @@ class TableCache { // the table index or blooms are not in memory, this may cause an I/O bool PrefixMayMatch(const ReadOptions& options, const InternalKeyComparator& internal_comparator, - uint64_t file_number, uint64_t file_size, + const FileMetaData& file_meta, const Slice& internal_prefix, bool* table_io); // Evict any entry for the specified file number diff --git a/db/version_set.cc b/db/version_set.cc index 3d9b0f128..913263ee0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -181,18 +181,14 @@ class Version::LevelFileNumIterator : public Iterator { } Slice value() const { assert(Valid()); - EncodeFixed64(value_buf_, (*flist_)[index_]->number); - EncodeFixed64(value_buf_+8, (*flist_)[index_]->file_size); - return Slice(value_buf_, sizeof(value_buf_)); + return Slice(reinterpret_cast((*flist_)[index_]), + sizeof(FileMetaData)); } virtual Status status() const { return Status::OK(); } private: const InternalKeyComparator icmp_; const std::vector* const flist_; uint32_t index_; - - // Backing store for value(). Holds the file number and size. - mutable char value_buf_[16]; }; static Iterator* GetFileIterator(void* arg, const ReadOptions& options, @@ -200,7 +196,7 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options, const InternalKeyComparator& icomparator, const Slice& file_value, bool for_compaction) { TableCache* cache = reinterpret_cast(arg); - if (file_value.size() != 16) { + if (file_value.size() != sizeof(FileMetaData)) { return NewErrorIterator( Status::Corruption("FileReader invoked with unexpected value")); } else { @@ -211,11 +207,12 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options, options_copy = options; options_copy.prefix = nullptr; } - FileMetaData meta(DecodeFixed64(file_value.data()), - DecodeFixed64(file_value.data() + 8)); + + const FileMetaData* meta_file = + reinterpret_cast(file_value.data()); return cache->NewIterator( - options.prefix ? options_copy : options, soptions, icomparator, meta, - nullptr /* don't need reference to table*/, for_compaction); + options.prefix ? options_copy : options, soptions, icomparator, + *meta_file, nullptr /* don't need reference to table*/, for_compaction); } } @@ -234,10 +231,11 @@ bool Version::PrefixMayMatch(const ReadOptions& options, // key() will always be the biggest value for this SST? may_match = true; } else { + const FileMetaData* meta_file = + reinterpret_cast(level_iter->value().data()); + may_match = vset_->table_cache_->PrefixMayMatch( - options, vset_->icmp_, DecodeFixed64(level_iter->value().data()), - DecodeFixed64(level_iter->value().data() + 8), internal_prefix, - nullptr); + options, vset_->icmp_, *meta_file, internal_prefix, nullptr); } return may_match; } From ad9a39c9b49b6df97c165130a843899535548826 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 26 Mar 2014 09:37:53 -0700 Subject: [PATCH 02/23] [RocksDB] Preallocate new MANIFEST files Summary: We don't preallocate MANIFEST file, even though we have an option for that. This diff preallocates manifest file every time we create it Test Plan: make check Reviewers: dhruba, haobo Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17163 --- db/version_set.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db/version_set.cc b/db/version_set.cc index 913263ee0..a97686cda 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1554,6 +1554,8 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, DescriptorFileName(dbname_, pending_manifest_file_number_), &descriptor_file, env_->OptimizeForManifestWrite(storage_options_)); if (s.ok()) { + descriptor_file->SetPreallocationBlockSize( + options_->manifest_preallocation_size); descriptor_log_.reset(new log::Writer(std::move(descriptor_file))); s = WriteSnapshot(descriptor_log_.get()); } From 954679bb0f96c103376cff954c93f0d2bc4ef437 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 26 Mar 2014 11:24:52 -0700 Subject: [PATCH 03/23] AssertHeld() should do things Summary: AssertHeld() was a no-op before. Now it does things. Also, this change caught a bad bug in SuperVersion::Init(). The method is calling db->mutex.AssertHeld(), but db variable is not initialized yet! I also fixed that issue. Test Plan: make check Reviewers: dhruba, haobo, ljin, sdong, yhchiang Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17193 --- db/db_impl.cc | 2 +- port/port_posix.cc | 27 +++++++++++++++++++++++++-- port/port_posix.h | 7 ++++++- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index d1003bfdc..9cc957f85 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3331,10 +3331,10 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { DBImpl::SuperVersion* DBImpl::InstallSuperVersion( SuperVersion* new_superversion) { mutex_.AssertHeld(); + new_superversion->db = this; new_superversion->Init(mem_, imm_.current(), versions_->current()); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; - super_version_->db = this; ++super_version_number_; super_version_->version_number = super_version_number_; diff --git a/port/port_posix.cc b/port/port_posix.cc index f7025f461..911cebdf2 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include "util/logging.h" @@ -45,9 +46,25 @@ Mutex::Mutex(bool adaptive) { Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); } -void Mutex::Lock() { PthreadCall("lock", pthread_mutex_lock(&mu_)); } +void Mutex::Lock() { + PthreadCall("lock", pthread_mutex_lock(&mu_)); +#ifndef NDEBUG + locked_ = true; +#endif +} -void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } +void Mutex::Unlock() { +#ifndef NDEBUG + locked_ = false; +#endif + PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} + +void Mutex::AssertHeld() { +#ifndef NDEBUG + assert(locked_); +#endif +} CondVar::CondVar(Mutex* mu) : mu_(mu) { @@ -57,7 +74,13 @@ CondVar::CondVar(Mutex* mu) CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } void CondVar::Wait() { +#ifndef NDEBUG + mu_->locked_ = false; +#endif PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); +#ifndef NDEBUG + mu_->locked_ = true; +#endif } void CondVar::Signal() { diff --git a/port/port_posix.h b/port/port_posix.h index aaea0b574..d393af6da 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -97,11 +97,16 @@ class Mutex { void Lock(); void Unlock(); - void AssertHeld() { } + // this will assert if the mutex is not locked + // it does NOT verify that mutex is held by a calling thread + void AssertHeld(); private: friend class CondVar; pthread_mutex_t mu_; +#ifndef NDEBUG + bool locked_; +#endif // No copying Mutex(const Mutex&); From 1e9621d4e50a0301fa17fc6471bfa60e30768ef9 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 26 Mar 2014 13:30:14 -0700 Subject: [PATCH 04/23] Sort files correctly in Builder::SaveTo Summary: Previously, we used to sort all files by BySmallestFirst comparator and then re-sort level0 files in the Finalize() (recently moved to end of SaveTo). In this diff, I chose the correct comparator at the beginning and sort the files correctly in Builder::SaveTo. I also added a verification that all files are sorted correctly in CheckConsistency() NOTE: This diff depends on D17037 Test Plan: make check. Will also run db_stress Reviewers: dhruba, haobo, sdong, ljin Reviewed By: ljin CC: leveldb Differential Revision: https://reviews.facebook.net/D17049 --- db/version_set.cc | 105 ++++++++++++++++++++++++++++++---------------- 1 file changed, 68 insertions(+), 37 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index a97686cda..bda1b56ab 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -430,17 +430,30 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, return false; } -static bool NewestFirst(FileMetaData* a, FileMetaData* b) { +namespace { +bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } -static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { - if (a->smallest_seqno > b->smallest_seqno) { - assert(a->largest_seqno > b->largest_seqno); - return true; +bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno != b->smallest_seqno) { + return a->smallest_seqno > b->smallest_seqno; } - assert(a->largest_seqno <= b->largest_seqno); - return false; + if (a->largest_seqno != b->largest_seqno) { + return a->largest_seqno > b->largest_seqno; + } + // Break ties by file number + return NewestFirst(a, b); } +bool BySmallestKey(FileMetaData* a, FileMetaData* b, + const InternalKeyComparator* cmp) { + int r = cmp->Compare(a->smallest, b->smallest); + if (r != 0) { + return (r < 0); + } + // Break ties by file number + return (a->number < b->number); +} +} // anonymous namespace Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), @@ -1172,22 +1185,32 @@ struct VersionSet::ManifestWriter { // Versions that contain full copies of the intermediate state. class VersionSet::Builder { private: - // Helper to sort by v->files_[file_number].smallest - struct BySmallestKey { + // Helper to sort v->files_ + // kLevel0LevelCompaction -- NewestFirst + // kLevel0UniversalCompaction -- NewestFirstBySeqNo + // kLevelNon0 -- BySmallestKey + struct FileComparator { + enum SortMethod { + kLevel0LevelCompaction = 0, + kLevel0UniversalCompaction = 1, + kLevelNon0 = 2, + } sort_method; const InternalKeyComparator* internal_comparator; bool operator()(FileMetaData* f1, FileMetaData* f2) const { - int r = internal_comparator->Compare(f1->smallest, f2->smallest); - if (r != 0) { - return (r < 0); - } else { - // Break ties by file number - return (f1->number < f2->number); + switch (sort_method) { + case kLevel0LevelCompaction: + return NewestFirst(f1, f2); + case kLevel0UniversalCompaction: + return NewestFirstBySeqNo(f1, f2); + case kLevelNon0: + return BySmallestKey(f1, f2, internal_comparator); } + assert(false); } }; - typedef std::set FileSet; + typedef std::set FileSet; struct LevelState { std::set deleted_files; FileSet* added_files; @@ -1196,16 +1219,24 @@ class VersionSet::Builder { VersionSet* vset_; Version* base_; LevelState* levels_; + FileComparator level_zero_cmp_; + FileComparator level_nonzero_cmp_; public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) { base_->Ref(); levels_ = new LevelState[base->NumberLevels()]; - BySmallestKey cmp; - cmp.internal_comparator = &vset_->icmp_; - for (int level = 0; level < base->NumberLevels(); level++) { - levels_[level].added_files = new FileSet(cmp); + level_zero_cmp_.sort_method = + (vset_->options_->compaction_style == kCompactionStyleUniversal) + ? FileComparator::kLevel0UniversalCompaction + : FileComparator::kLevel0LevelCompaction; + level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; + level_nonzero_cmp_.internal_comparator = &vset_->icmp_; + + levels_[0].added_files = new FileSet(level_zero_cmp_); + for (int level = 1; level < base->NumberLevels(); level++) { + levels_[level].added_files = new FileSet(level_nonzero_cmp_); } } @@ -1239,16 +1270,24 @@ class VersionSet::Builder { void CheckConsistency(Version* v) { #ifndef NDEBUG + // make sure the files are sorted correctly for (int level = 0; level < v->NumberLevels(); level++) { - // Make sure there is no overlap in levels > 0 - if (level > 0) { - for (uint32_t i = 1; i < v->files_[level].size(); i++) { - const InternalKey& prev_end = v->files_[level][i-1]->largest; - const InternalKey& this_begin = v->files_[level][i]->smallest; - if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { + for (size_t i = 1; i < v->files_[level].size(); i++) { + auto f1 = v->files_[level][i - 1]; + auto f2 = v->files_[level][i]; + if (level == 0) { + assert(level_zero_cmp_(f1, f2)); + if (vset_->options_->compaction_style == kCompactionStyleUniversal) { + assert(f1->largest_seqno > f2->largest_seqno); + } + } else { + assert(level_nonzero_cmp_(f1, f2)); + + // Make sure there is no overlap in levels > 0 + if (vset_->icmp_.Compare(f1->largest, f2->smallest) >= 0) { fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", - prev_end.DebugString().c_str(), - this_begin.DebugString().c_str()); + (f1->largest).DebugString().c_str(), + (f2->smallest).DebugString().c_str()); abort(); } } @@ -1347,9 +1386,8 @@ class VersionSet::Builder { void SaveTo(Version* v) { CheckConsistency(base_); CheckConsistency(v); - BySmallestKey cmp; - cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < base_->NumberLevels(); level++) { + const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. const auto& base_files = base_->files_[level]; @@ -1375,13 +1413,6 @@ class VersionSet::Builder { } } - // TODO(icanadi) do it in the loop above, which already sorts the files - // Pre-sort level0 for Get() - if (v->vset_->options_->compaction_style == kCompactionStyleUniversal) { - std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo); - } else { - std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirst); - } CheckConsistency(v); } From 6a08bc042a99112204e1675ee98d1e289599aa42 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 26 Mar 2014 14:46:07 -0700 Subject: [PATCH 05/23] Fix no return warning in FileComparator --- db/version_set.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/db/version_set.cc b/db/version_set.cc index bda1b56ab..17ba77922 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1207,6 +1207,7 @@ class VersionSet::Builder { return BySmallestKey(f1, f2, internal_comparator); } assert(false); + return false; } }; From d5562002645cc741104c2f18bce7a721202ada76 Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 26 Mar 2014 17:53:39 -0700 Subject: [PATCH 06/23] Some small cleaning up to make some compiling environment happy Summary: Compiler complains some errors when building using our internal build settings. Fix them. Test Plan: rebuild Reviewers: haobo, dhruba, igor, yhchiang, ljin Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D17199 --- db/corruption_test.cc | 2 +- db/merge_operator.cc | 2 +- util/env_test.cc | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 2e630c11b..f0397b6f5 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -387,7 +387,7 @@ TEST(CorruptionTest, FileSystemStateCorrupted) { DBImpl* dbi = reinterpret_cast(db_); std::vector metadata; dbi->GetLiveFilesMetaData(&metadata); - ASSERT_GT(metadata.size(), 0); + ASSERT_GT(metadata.size(), size_t(0)); std::string filename = dbname_ + metadata[0].name; delete db_; diff --git a/db/merge_operator.cc b/db/merge_operator.cc index a01d389e9..a14df8a87 100644 --- a/db/merge_operator.cc +++ b/db/merge_operator.cc @@ -23,7 +23,7 @@ bool MergeOperator::PartialMergeMulti(const Slice& key, std::string temp_value; Slice temp_slice(operand_list[0]); - for (int i = 1; i < operand_list.size(); ++i) { + for (size_t i = 1; i < operand_list.size(); ++i) { auto& operand = operand_list[i]; if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) { return false; diff --git a/util/env_test.cc b/util/env_test.cc index b7009bf5d..0a83037c3 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -481,9 +481,9 @@ class TestLogger : public Logger { if (new_format[0] == '[') { // "[DEBUG] " - ASSERT_TRUE(n <= 56 + (512 - sizeof(struct timeval))); + ASSERT_TRUE(n <= 56 + (512 - static_cast(sizeof(struct timeval)))); } else { - ASSERT_TRUE(n <= 48 + (512 - sizeof(struct timeval))); + ASSERT_TRUE(n <= 48 + (512 - static_cast(sizeof(struct timeval)))); } va_end(backup_ap); } From 1c9f8f0884b0b7ee1cf01a9ae42e32049d94b8a8 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 27 Mar 2014 08:22:59 -0700 Subject: [PATCH 07/23] Fix valgrind issues Summary: NewFixedPrefixTransform is leaked in default options. Broken by https://github.com/facebook/rocksdb/commit/b47812fba601e23872349407d565d15f0b41a2fe Also included in the diff some code cleanup Test Plan: valgrind env_test also make check Reviewers: haobo, danguo, yhchiang Reviewed By: danguo CC: leveldb Differential Revision: https://reviews.facebook.net/D17211 --- db/db_impl.cc | 2 -- include/rocksdb/compaction_filter.h | 6 +++--- include/rocksdb/options.h | 2 ++ util/options.cc | 4 +--- 4 files changed, 6 insertions(+), 8 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 9cc957f85..2c24df79d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -70,7 +70,6 @@ namespace rocksdb { int DBImpl::SuperVersion::dummy = 0; void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy; void* const DBImpl::SuperVersion::kSVObsolete = nullptr; -const std::string kNullString = "NULL"; void DumpLeveldbBuildVersion(Logger * log); @@ -2897,7 +2896,6 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, assert(compact); compact->CleanupBatchBuffer(); compact->CleanupMergedBuffer(); - compact->cur_prefix_ = kNullString; bool prefix_initialized = false; int64_t imm_micros = 0; // Micros spent doing imm_ compactions diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index 9576bf2ca..f54ee620c 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -139,6 +139,7 @@ class DefaultCompactionFilterFactory : public CompactionFilterFactory { // class CompactionFilterFactoryV2 { public: + // NOTE: CompactionFilterFactoryV2 will not delete prefix_extractor explicit CompactionFilterFactoryV2(const SliceTransform* prefix_extractor) : prefix_extractor_(prefix_extractor) { } @@ -169,9 +170,8 @@ class CompactionFilterFactoryV2 { // return any filter class DefaultCompactionFilterFactoryV2 : public CompactionFilterFactoryV2 { public: - explicit DefaultCompactionFilterFactoryV2( - const SliceTransform* prefix_extractor) - : CompactionFilterFactoryV2(prefix_extractor) { } + explicit DefaultCompactionFilterFactoryV2() + : CompactionFilterFactoryV2(nullptr) { } virtual std::unique_ptr CreateCompactionFilterV2( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 28b2d58bc..b7723ff59 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -126,6 +126,8 @@ struct Options { // Version TWO of the compaction_filter_factory // It supports rolling compaction + // + // Default: a factory that doesn't provide any object std::shared_ptr compaction_filter_factory_v2; // If true, the database will be created if it is missing. diff --git a/util/options.cc b/util/options.cc index bc325e9c0..7997aa969 100644 --- a/util/options.cc +++ b/util/options.cc @@ -32,9 +32,7 @@ Options::Options() compaction_filter(nullptr), compaction_filter_factory(std::shared_ptr( new DefaultCompactionFilterFactory())), - compaction_filter_factory_v2( - new DefaultCompactionFilterFactoryV2( - NewFixedPrefixTransform(8))), + compaction_filter_factory_v2(new DefaultCompactionFilterFactoryV2()), create_if_missing(false), error_if_exists(false), paranoid_checks(false), From 5826f9528f6c12c50b9b157defbf049c0cd20b9a Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 27 Mar 2014 11:53:05 -0700 Subject: [PATCH 08/23] Make rate limiting unit test more robust --- util/options.cc | 2 +- utilities/backupable/backupable_db_test.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/util/options.cc b/util/options.cc index 7997aa969..c995877da 100644 --- a/util/options.cc +++ b/util/options.cc @@ -91,7 +91,7 @@ Options::Options() purge_redundant_kvs_while_flush(true), allow_os_buffer(true), allow_mmap_reads(false), - allow_mmap_writes(true), + allow_mmap_writes(false), is_fd_close_on_exec(true), skip_log_error_on_recovery(false), stats_dump_period_sec(3600), diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 62717f5dc..5a0b6928b 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -826,7 +826,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / backupable_options_->backup_rate_limit; ASSERT_GT(backup_time, 0.9 * rate_limited_backup_time); - ASSERT_LT(backup_time, 1.3 * rate_limited_backup_time); + ASSERT_LT(backup_time, 1.5 * rate_limited_backup_time); CloseBackupableDB(); @@ -838,7 +838,7 @@ TEST(BackupableDBTest, RateLimiting) { auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / backupable_options_->restore_rate_limit; ASSERT_GT(restore_time, 0.9 * rate_limited_restore_time); - ASSERT_LT(restore_time, 1.3 * rate_limited_restore_time); + ASSERT_LT(restore_time, 1.5 * rate_limited_restore_time); AssertBackupConsistency(0, 0, 100000, 100010); } From b14c1f995b496a464deb89ef779e287e15de76a8 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 27 Mar 2014 12:00:38 -0700 Subject: [PATCH 09/23] allow mmap writes --- util/options.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/options.cc b/util/options.cc index c995877da..7997aa969 100644 --- a/util/options.cc +++ b/util/options.cc @@ -91,7 +91,7 @@ Options::Options() purge_redundant_kvs_while_flush(true), allow_os_buffer(true), allow_mmap_reads(false), - allow_mmap_writes(false), + allow_mmap_writes(true), is_fd_close_on_exec(true), skip_log_error_on_recovery(false), stats_dump_period_sec(3600), From a92194e5b2cd555a2318e74074db5583bdfd7678 Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Thu, 27 Mar 2014 11:59:37 -0700 Subject: [PATCH 10/23] [RocksDB] Add db property "rocksdb.cur-size-active-mem-table" Summary: as title Test Plan: db_test Reviewers: sdong Reviewed By: sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D17217 --- db/db_impl.cc | 3 +-- db/db_impl.h | 1 + db/db_test.cc | 7 +++++++ db/internal_stats.cc | 14 ++++++++++---- db/internal_stats.h | 5 +++-- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 2c24df79d..67daecb48 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4089,8 +4089,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { value->clear(); DBPropertyType property_type = GetPropertyType(property); MutexLock l(&mutex_); - return internal_stats_.GetProperty(property_type, property, value, - versions_.get(), imm_); + return internal_stats_.GetProperty(property_type, property, value, this); } void DBImpl::GetApproximateSizes( diff --git a/db/db_impl.h b/db/db_impl.h index 6165f93d3..4cfb6ecaf 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -297,6 +297,7 @@ class DBImpl : public DB { private: friend class DB; + friend class InternalStats; friend class TailingIterator; friend struct SuperVersion; struct CompactionState; diff --git a/db/db_test.cc b/db/db_test.cc index ef39a014b..ee70871a2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2036,6 +2036,8 @@ TEST(DBTest, NumImmutableMemTable) { ASSERT_EQ(1, (int) perf_context.get_from_memtable_count); ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value)); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.cur-size-active-mem-table", + &num)); ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); ASSERT_EQ(num, "2"); perf_context.Reset(); @@ -2051,6 +2053,11 @@ TEST(DBTest, NumImmutableMemTable) { dbfull()->Flush(FlushOptions()); ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num)); ASSERT_EQ(num, "0"); + ASSERT_TRUE(dbfull()->GetProperty("rocksdb.cur-size-active-mem-table", + &num)); + // "208" is the size of the metadata of an empty skiplist, this would + // break if we change the default skiplist implementation + ASSERT_EQ(num, "208"); SetPerfLevel(kDisable); } while (ChangeCompactOptions()); } diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 629941c88..4cc049965 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -7,6 +7,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/internal_stats.h" +#include "db/db_impl.h" #include "db/memtable_list.h" #include @@ -35,15 +36,18 @@ DBPropertyType GetPropertyType(const Slice& property) { return kCompactionPending; } else if (in == "background-errors") { return kBackgroundErrors; + } else if (in == "cur-size-active-mem-table") { + return kCurSizeActiveMemTable; } return kUnknown; } bool InternalStats::GetProperty(DBPropertyType property_type, const Slice& property, std::string* value, - VersionSet* version_set, - const MemTableList& imm) { + DBImpl* db) { + VersionSet* version_set = db->versions_.get(); Version* current = version_set->current(); + const MemTableList& imm = db->imm_; Slice in = property; switch (property_type) { @@ -341,12 +345,14 @@ bool InternalStats::GetProperty(DBPropertyType property_type, // 0 otherwise, *value = std::to_string(current->NeedsCompaction() ? 1 : 0); return true; - ///////////// case kBackgroundErrors: // Accumulated number of errors in background flushes or compactions. *value = std::to_string(GetBackgroundErrorCount()); return true; - ///////// + case kCurSizeActiveMemTable: + // Current size of the active memtable + *value = std::to_string(db->mem_->ApproximateMemoryUsage()); + return true; default: return false; } diff --git a/db/internal_stats.h b/db/internal_stats.h index b6032d014..e140e7280 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -19,6 +19,7 @@ namespace rocksdb { class MemTableList; +class DBImpl; enum DBPropertyType { kNumFilesAtLevel, // Number of files at a specific level @@ -31,6 +32,7 @@ enum DBPropertyType { // 0. kCompactionPending, // Return 1 if a compaction is pending. Otherwise 0. kBackgroundErrors, // Return accumulated background errors encountered. + kCurSizeActiveMemTable, // Return current size of the active memtable kUnknown, }; @@ -124,8 +126,7 @@ class InternalStats { uint64_t BumpAndGetBackgroundErrorCount() { return ++bg_error_count_; } bool GetProperty(DBPropertyType property_type, const Slice& property, - std::string* value, VersionSet* version_set, - const MemTableList& imm); + std::string* value, DBImpl* db); private: std::vector compaction_stats_; From 10cebec79efa43a6f3e02183049ee091ae335cd1 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 27 Mar 2014 16:15:25 -0700 Subject: [PATCH 11/23] Fix the bug in MergeUtil which causes mixing values of different keys. Summary: Fix the bug in MergeUtil which causes mixing values of different keys. Test Plan: stringappend_test make all check Reviewers: haobo, igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D17235 --- db/merge_helper.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/merge_helper.cc b/db/merge_helper.cc index f5244498d..0e36f6ae0 100644 --- a/db/merge_helper.cc +++ b/db/merge_helper.cc @@ -40,12 +40,12 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, ParseInternalKey(keys_.back(), &orig_ikey); bool hit_the_next_user_key = false; - ParsedInternalKey ikey; std::string merge_result; // Temporary value for merge results if (steps) { ++(*steps); } for (iter->Next(); iter->Valid(); iter->Next()) { + ParsedInternalKey ikey; assert(operands_.size() >= 1); // Should be invariants! assert(keys_.size() == operands_.size()); @@ -194,7 +194,7 @@ void MergeHelper::MergeUntil(Iterator* iter, SequenceNumber stop_before, if (operands_.size() >= 2 && operands_.size() >= min_partial_merge_operands_ && user_merge_operator_->PartialMergeMulti( - ikey.user_key, + orig_ikey.user_key, std::deque(operands_.begin(), operands_.end()), &merge_result, logger_)) { // Merging of operands (associative merge) was successful. From 0d755fff14258bf8496bd019b93a1f0d5771757d Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Fri, 28 Mar 2014 09:21:20 -0700 Subject: [PATCH 12/23] cache friendly blocked bloomfilter Summary: By constraining the probes within cache line(s), we can improve the cache miss rate thus performance. This probably only makes sense for in-memory workload so defaults the option to off. Numbers and comparision can be found in wiki: https://our.intern.facebook.com/intern/wiki/index.php/Ljin/rocksdb_perf/2014_03_17#Bloom_Filter_Study Test Plan: benchmarked this change substantially. Will run make all check as well Reviewers: haobo, igor, dhruba, sdong, yhchiang Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17133 --- db/db_bench.cc | 13 ++- db/memtable.cc | 1 + include/rocksdb/options.h | 11 ++ port/port_posix.h | 2 + table/plain_table_reader.cc | 4 +- util/dynamic_bloom.cc | 38 ++++--- util/dynamic_bloom.h | 61 ++++++++--- util/dynamic_bloom_test.cc | 195 ++++++++++++++++++++++++++---------- util/options.cc | 1 + 9 files changed, 238 insertions(+), 88 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 6d7c0898a..14d886f5c 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -134,6 +134,8 @@ DEFINE_int64(read_range, 1, "When ==1 reads use ::Get, when >1 reads use" DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms"); +DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality"); + DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for" " prefixscanrandom. If true, use_prefix_blooms must also be true."); @@ -1543,6 +1545,7 @@ class Benchmark { NewFixedPrefixTransform(FLAGS_prefix_size)); } options.memtable_prefix_bloom_bits = FLAGS_memtable_bloom_bits; + options.bloom_locality = FLAGS_bloom_locality; options.max_open_files = FLAGS_open_files; options.statistics = dbstats; options.env = FLAGS_env; @@ -1916,7 +1919,7 @@ class Benchmark { Duration duration(FLAGS_duration, reads_); int64_t found = 0; - + int64_t read = 0; if (FLAGS_use_multiget) { // MultiGet const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group long keys_left = reads_; @@ -1924,6 +1927,7 @@ class Benchmark { // Recalculate number of keys per group, and call MultiGet until done long num_keys; while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) { + read += num_keys; found += MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ""); thread->stats.FinishedSingleOp(db_); @@ -1937,8 +1941,9 @@ class Benchmark { std::string key = GenerateKeyFromInt(k, FLAGS_num); iter->Seek(key); + read++; if (iter->Valid() && iter->key().compare(Slice(key)) == 0) { - ++found; + found++; } thread->stats.FinishedSingleOp(db_); @@ -1957,6 +1962,7 @@ class Benchmark { } if (FLAGS_read_range < 2) { + read++; if (db_->Get(options, key, &value).ok()) { found++; } @@ -1972,6 +1978,7 @@ class Benchmark { db_->GetApproximateSizes(&range, 1, &sizes); } + read += FLAGS_read_range; for (iter->Seek(key); iter->Valid() && count <= FLAGS_read_range; ++count, iter->Next()) { @@ -1992,7 +1999,7 @@ class Benchmark { char msg[100]; snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", - found, reads_); + found, read); thread->stats.AddMessage(msg); diff --git a/db/memtable.cc b/db/memtable.cc index 5fefab04b..41dd66cb9 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -52,6 +52,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) assert(!should_flush_); if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, + options.bloom_locality, options.memtable_prefix_bloom_probes)); } } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b7723ff59..bd51669a2 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -719,6 +719,17 @@ struct Options { // number of hash probes per key uint32_t memtable_prefix_bloom_probes; + // Control locality of bloom filter probes to improve cache miss rate. + // This option only applies to memtable prefix bloom and plaintable + // prefix bloom. It essentially limits the max number of cache lines each + // bloom filter check can touch. + // This optimization is turned off when set to 0. The number should never + // be greater than number of probes. This option can boost performance + // for in-memory workload but should use with care since it can cause + // higher false positive rate. + // Default: 0 + uint32_t bloom_locality; + // Maximum number of successive merge operations on a key in the memtable. // // When a merge operation is added to the memtable and the maximum number of diff --git a/port/port_posix.h b/port/port_posix.h index d393af6da..6a7382926 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -480,6 +480,8 @@ inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) { return false; } +#define CACHE_LINE_SIZE 64U + } // namespace port } // namespace rocksdb diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 46886291e..d521446f8 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -270,7 +270,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes) { if (options_.prefix_extractor != nullptr) { uint32_t bloom_total_bits = num_prefixes * kBloomBitsPerKey; if (bloom_total_bits > 0) { - bloom_.reset(new DynamicBloom(bloom_total_bits)); + bloom_.reset(new DynamicBloom(bloom_total_bits, options_.bloom_locality)); } } @@ -388,7 +388,7 @@ Status PlainTableReader::PopulateIndex() { if (IsTotalOrderMode()) { uint32_t num_bloom_bits = table_properties_->num_entries * kBloomBitsPerKey; if (num_bloom_bits > 0) { - bloom_.reset(new DynamicBloom(num_bloom_bits)); + bloom_.reset(new DynamicBloom(num_bloom_bits, options_.bloom_locality)); } } diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index 94df660ef..5d3d30f4e 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -5,6 +5,9 @@ #include "dynamic_bloom.h" +#include + +#include "port/port.h" #include "rocksdb/slice.h" #include "util/hash.h" @@ -17,20 +20,31 @@ static uint32_t BloomHash(const Slice& key) { } DynamicBloom::DynamicBloom(uint32_t total_bits, - uint32_t (*hash_func)(const Slice& key), - uint32_t num_probes) - : hash_func_(hash_func), - kTotalBits((total_bits + 7) / 8 * 8), - kNumProbes(num_probes) { - assert(hash_func_); + uint32_t cl_per_block, + uint32_t num_probes, + uint32_t (*hash_func)(const Slice& key)) + : kBlocked(cl_per_block > 0), + kBitsPerBlock(std::min(cl_per_block, num_probes) * CACHE_LINE_SIZE * 8), + kTotalBits((kBlocked ? (total_bits + kBitsPerBlock - 1) / kBitsPerBlock + * kBitsPerBlock : + total_bits + 7) / 8 * 8), + kNumBlocks(kBlocked ? kTotalBits / kBitsPerBlock : 1), + kNumProbes(num_probes), + hash_func_(hash_func == nullptr ? &BloomHash : hash_func) { + assert(kBlocked ? kTotalBits > 0 : kTotalBits >= kBitsPerBlock); assert(kNumProbes > 0); - assert(kTotalBits > 0); - data_.reset(new unsigned char[kTotalBits / 8]()); -} -DynamicBloom::DynamicBloom(uint32_t total_bits, - uint32_t num_probes) - : DynamicBloom(total_bits, &BloomHash, num_probes) { + uint32_t sz = kTotalBits / 8; + if (kBlocked) { + sz += CACHE_LINE_SIZE - 1; + } + raw_ = new unsigned char[sz](); + if (kBlocked) { + data_ = raw_ + CACHE_LINE_SIZE - + reinterpret_cast(raw_) % CACHE_LINE_SIZE; + } else { + data_ = raw_; + } } } // rocksdb diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index 0851becbf..efc461cf9 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -15,13 +15,17 @@ class Slice; class DynamicBloom { public: // total_bits: fixed total bits for the bloom - // hash_func: customized hash function // num_probes: number of hash probes for a single key - DynamicBloom(uint32_t total_bits, - uint32_t (*hash_func)(const Slice& key), - uint32_t num_probes = 6); + // cl_per_block: block size in cache lines. When this is non-zero, a + // query/set is done within a block to improve cache locality. + // hash_func: customized hash function + explicit DynamicBloom(uint32_t total_bits, uint32_t cl_per_block = 0, + uint32_t num_probes = 6, + uint32_t (*hash_func)(const Slice& key) = nullptr); - explicit DynamicBloom(uint32_t total_bits, uint32_t num_probes = 6); + ~DynamicBloom() { + delete[] raw_; + } // Assuming single threaded access to this function. void Add(const Slice& key); @@ -36,10 +40,15 @@ class DynamicBloom { bool MayContainHash(uint32_t hash); private: - uint32_t (*hash_func_)(const Slice& key); + const bool kBlocked; + const uint32_t kBitsPerBlock; const uint32_t kTotalBits; + const uint32_t kNumBlocks; const uint32_t kNumProbes; - std::unique_ptr data_; + + uint32_t (*hash_func_)(const Slice& key); + unsigned char* data_; + unsigned char* raw_; }; inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); } @@ -50,22 +59,42 @@ inline bool DynamicBloom::MayContain(const Slice& key) { inline bool DynamicBloom::MayContainHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - for (uint32_t i = 0; i < kNumProbes; i++) { - const uint32_t bitpos = h % kTotalBits; - if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { - return false; + if (kBlocked) { + uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock; + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = b + h % kBitsPerBlock; + if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + return false; + } + h += delta; + } + } else { + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = h % kTotalBits; + if (((data_[bitpos / 8]) & (1 << (bitpos % 8))) == 0) { + return false; + } + h += delta; } - h += delta; } return true; } inline void DynamicBloom::AddHash(uint32_t h) { const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits - for (uint32_t i = 0; i < kNumProbes; i++) { - const uint32_t bitpos = h % kTotalBits; - data_[bitpos / 8] |= (1 << (bitpos % 8)); - h += delta; + if (kBlocked) { + uint32_t b = ((h >> 11 | (h << 21)) % kNumBlocks) * kBitsPerBlock; + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = b + h % kBitsPerBlock; + data_[bitpos / 8] |= (1 << (bitpos % 8)); + h += delta; + } + } else { + for (uint32_t i = 0; i < kNumProbes; ++i) { + const uint32_t bitpos = h % kTotalBits; + data_[bitpos / 8] |= (1 << (bitpos % 8)); + h += delta; + } } } diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index 58f05ae50..d9ececa5a 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -3,19 +3,23 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include #include "dynamic_bloom.h" +#include "port/port.h" #include "util/logging.h" #include "util/testharness.h" #include "util/testutil.h" +#include "util/stop_watch.h" DEFINE_int32(bits_per_key, 10, ""); DEFINE_int32(num_probes, 6, ""); +DEFINE_bool(enable_perf, false, ""); namespace rocksdb { -static Slice Key(int i, char* buffer) { +static Slice Key(uint64_t i, char* buffer) { memcpy(buffer, &i, sizeof(i)); return Slice(buffer, sizeof(i)); } @@ -24,32 +28,44 @@ class DynamicBloomTest { }; TEST(DynamicBloomTest, EmptyFilter) { - DynamicBloom bloom(100, 2); - ASSERT_TRUE(! bloom.MayContain("hello")); - ASSERT_TRUE(! bloom.MayContain("world")); + DynamicBloom bloom1(100, 0, 2); + ASSERT_TRUE(!bloom1.MayContain("hello")); + ASSERT_TRUE(!bloom1.MayContain("world")); + + DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + ASSERT_TRUE(!bloom2.MayContain("hello")); + ASSERT_TRUE(!bloom2.MayContain("world")); } TEST(DynamicBloomTest, Small) { - DynamicBloom bloom(100, 2); - bloom.Add("hello"); - bloom.Add("world"); - ASSERT_TRUE(bloom.MayContain("hello")); - ASSERT_TRUE(bloom.MayContain("world")); - ASSERT_TRUE(! bloom.MayContain("x")); - ASSERT_TRUE(! bloom.MayContain("foo")); + DynamicBloom bloom1(100, 0, 2); + bloom1.Add("hello"); + bloom1.Add("world"); + ASSERT_TRUE(bloom1.MayContain("hello")); + ASSERT_TRUE(bloom1.MayContain("world")); + ASSERT_TRUE(!bloom1.MayContain("x")); + ASSERT_TRUE(!bloom1.MayContain("foo")); + + DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + bloom2.Add("hello"); + bloom2.Add("world"); + ASSERT_TRUE(bloom2.MayContain("hello")); + ASSERT_TRUE(bloom2.MayContain("world")); + ASSERT_TRUE(!bloom2.MayContain("x")); + ASSERT_TRUE(!bloom2.MayContain("foo")); } -static int NextLength(int length) { - if (length < 10) { - length += 1; - } else if (length < 100) { - length += 10; - } else if (length < 1000) { - length += 100; +static uint32_t NextNum(uint32_t num) { + if (num < 10) { + num += 1; + } else if (num < 100) { + num += 10; + } else if (num < 1000) { + num += 100; } else { - length += 1000; + num += 1000; } - return length; + return num; } TEST(DynamicBloomTest, VaryingLengths) { @@ -62,47 +78,116 @@ TEST(DynamicBloomTest, VaryingLengths) { fprintf(stderr, "bits_per_key: %d num_probes: %d\n", FLAGS_bits_per_key, FLAGS_num_probes); - for (int length = 1; length <= 10000; length = NextLength(length)) { - uint32_t bloom_bits = std::max(length * FLAGS_bits_per_key, 64); - DynamicBloom bloom(bloom_bits, FLAGS_num_probes); - for (int i = 0; i < length; i++) { - bloom.Add(Key(i, buffer)); - ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); - } - - // All added keys must match - for (int i = 0; i < length; i++) { - ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) - << "Length " << length << "; key " << i; - } - - // Check false positive rate - - int result = 0; - for (int i = 0; i < 10000; i++) { - if (bloom.MayContain(Key(i + 1000000000, buffer))) { - result++; + for (uint32_t cl_per_block = 0; cl_per_block < FLAGS_num_probes; + ++cl_per_block) { + for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { + uint32_t bloom_bits = 0; + if (cl_per_block == 0) { + bloom_bits = std::max(num * FLAGS_bits_per_key, 64U); + } else { + bloom_bits = std::max(num * FLAGS_bits_per_key, + cl_per_block * CACHE_LINE_SIZE * 8); } + DynamicBloom bloom(bloom_bits, cl_per_block, FLAGS_num_probes); + for (uint64_t i = 0; i < num; i++) { + bloom.Add(Key(i, buffer)); + ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); + } + + // All added keys must match + for (uint64_t i = 0; i < num; i++) { + ASSERT_TRUE(bloom.MayContain(Key(i, buffer))) + << "Num " << num << "; key " << i; + } + + // Check false positive rate + + int result = 0; + for (uint64_t i = 0; i < 10000; i++) { + if (bloom.MayContain(Key(i + 1000000000, buffer))) { + result++; + } + } + double rate = result / 10000.0; + + fprintf(stderr, "False positives: %5.2f%% @ num = %6u, bloom_bits = %6u, " + "cl per block = %u\n", rate*100.0, num, bloom_bits, cl_per_block); + + if (rate > 0.0125) + mediocre_filters++; // Allowed, but not too often + else + good_filters++; } - double rate = result / 10000.0; - fprintf(stderr, "False positives: %5.2f%% @ length = %6d ; \n", - rate*100.0, length); - - //ASSERT_LE(rate, 0.02); // Must not be over 2% - if (rate > 0.0125) - mediocre_filters++; // Allowed, but not too often - else - good_filters++; + fprintf(stderr, "Filters: %d good, %d mediocre\n", + good_filters, mediocre_filters); + ASSERT_LE(mediocre_filters, good_filters/5); } - - fprintf(stderr, "Filters: %d good, %d mediocre\n", - good_filters, mediocre_filters); - - ASSERT_LE(mediocre_filters, good_filters/5); } -// Different bits-per-byte +TEST(DynamicBloomTest, perf) { + StopWatchNano timer(Env::Default()); + + if (!FLAGS_enable_perf) { + return; + } + + for (uint64_t m = 1; m <= 8; ++m) { + const uint64_t num_keys = m * 8 * 1024 * 1024; + fprintf(stderr, "testing %luM keys\n", m * 8); + + DynamicBloom std_bloom(num_keys * 10, 0, FLAGS_num_probes); + + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + std_bloom.Add(Slice(reinterpret_cast(&i), 8)); + } + + uint64_t elapsed = timer.ElapsedNanos(); + fprintf(stderr, "standard bloom, avg add latency %lu\n", + elapsed / num_keys); + + uint64_t count = 0; + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + if (std_bloom.MayContain(Slice(reinterpret_cast(&i), 8))) { + ++count; + } + } + elapsed = timer.ElapsedNanos(); + fprintf(stderr, "standard bloom, avg query latency %lu\n", + elapsed / count); + ASSERT_TRUE(count == num_keys); + + for (int cl_per_block = 1; cl_per_block <= FLAGS_num_probes; + ++cl_per_block) { + DynamicBloom blocked_bloom(num_keys * 10, cl_per_block, FLAGS_num_probes); + + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + blocked_bloom.Add(Slice(reinterpret_cast(&i), 8)); + } + + uint64_t elapsed = timer.ElapsedNanos(); + fprintf(stderr, "blocked bloom(%d), avg add latency %lu\n", + cl_per_block, elapsed / num_keys); + + uint64_t count = 0; + timer.Start(); + for (uint64_t i = 1; i <= num_keys; ++i) { + if (blocked_bloom.MayContain( + Slice(reinterpret_cast(&i), 8))) { + ++count; + } + } + + elapsed = timer.ElapsedNanos(); + fprintf(stderr, "blocked bloom(%d), avg query latency %lu\n", + cl_per_block, elapsed / count); + ASSERT_TRUE(count == num_keys); + } + } +} } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index 7997aa969..aa1a4c64e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -112,6 +112,7 @@ Options::Options() inplace_callback(nullptr), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(6), + bloom_locality(0), max_successive_merges(0), min_partial_merge_operands(2), allow_thread_local(true) { From 0d463a36859fe3a2006c8dbb5c1ab0818c5d7c2b Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Fri, 28 Mar 2014 14:19:21 -0700 Subject: [PATCH 13/23] Add a jni library for rocksdb which supports Open, Get, Put, and Close. Summary: This diff contains a simple jni library for rocksdb which supports open, get, put and closeusing default options (including Options, ReadOptions, and WriteOptions.) In the usual case, Java developers can use the c++ rocksdb library in the way similar to the following: RocksDB db = RocksDB.open(path_to_db); ... db.put("hello".getBytes(), "world".getBytes(); byte[] value = db.get("hello".getBytes()); ... db.close(); Specifically, this diff has the following major classes: * RocksDB: a Java wrapper class which forwards the operations from the java side to c++ rocksdb library. * RocksDBException: ncapsulates the error of an operation. This exception type is used to describe an internal error from the c++ rocksdb library. This diff also include a simple java sample code calling c++ rocksdb library. To build the rocksdb jni library, simply run make jni, and make jtest will try to build and run the sample code. Note that if the rocksdb is not built with the default glibc that Java uses, java will try to load the wrong glibc during the run time. As a result, the sample code might not work properly during the run time. Test Plan: * make jni * make jtest Reviewers: haobo, dhruba, sdong, igor, ljin Reviewed By: dhruba CC: leveldb, xjin Differential Revision: https://reviews.facebook.net/D17109 --- Makefile | 29 ++++ java/Makefile | 17 +++ java/RocksDBSample.java | 79 +++++++++++ java/org/rocksdb/RocksDB.java | 101 ++++++++++++++ java/org/rocksdb/RocksDBException.java | 24 ++++ java/rocksjni/portal.h | 81 +++++++++++ java/rocksjni/rocksjni.cc | 177 +++++++++++++++++++++++++ 7 files changed, 508 insertions(+) create mode 100644 java/Makefile create mode 100644 java/RocksDBSample.java create mode 100644 java/org/rocksdb/RocksDB.java create mode 100644 java/org/rocksdb/RocksDBException.java create mode 100644 java/rocksjni/portal.h create mode 100644 java/rocksjni/rocksjni.cc diff --git a/Makefile b/Makefile index 2e05e6513..63a3646e4 100644 --- a/Makefile +++ b/Makefile @@ -394,6 +394,31 @@ sst_dump: tools/sst_dump.o $(LIBOBJECTS) ldb: tools/ldb.o $(LIBOBJECTS) $(CXX) tools/ldb.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +# --------------------------------------------------------------------------- +# Jni stuff +# --------------------------------------------------------------------------- +JNI_NATIVE_SOURCES = ./java/rocksjni/rocksjni.cc + +JAVA_INCLUDE = -I/usr/lib/jvm/java-openjdk/include/ -I/usr/lib/jvm/java-openjdk/include/linux +ROCKSDBJNILIB = ./java/librocksdbjni.so + +ifeq ($(PLATFORM), OS_MACOSX) +ROCKSDBJNILIB = ./java/librocksdbjni.jnilib +JAVA_INCLUDE = -I/System/Library/Frameworks/JavaVM.framework/Headers/ +endif + +jni: clean + OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32 + cd java;$(MAKE) java; + $(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o $(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(LDFLAGS) $(COVERAGEFLAGS) + +jclean: + cd java;$(MAKE) clean; + rm -f $(ROCKSDBJNILIB) + +jtest: + cd java;$(MAKE) sample; + # --------------------------------------------------------------------------- # Platform-specific compilation # --------------------------------------------------------------------------- @@ -457,6 +482,10 @@ depend: $(DEPFILES) # working solution. ifneq ($(MAKECMDGOALS),clean) ifneq ($(MAKECMDGOALS),format) +ifneq ($(MAKECMDGOALS),jclean) +ifneq ($(MAKECMDGOALS),jtest) -include $(DEPFILES) endif endif +endif +endif diff --git a/java/Makefile b/java/Makefile new file mode 100644 index 000000000..794ec1439 --- /dev/null +++ b/java/Makefile @@ -0,0 +1,17 @@ +NATIVE_JAVA_CLASSES = org.rocksdb.RocksDB +NATIVE_INCLUDE = ./include +ROCKSDB_JAR = rocksdbjni.jar + +clean: + -find . -name "*.class" -exec rm {} \; + -find . -name "hs*.log" -exec rm {} \; + rm -f $(ROCKSDB_JAR) + +java: + javac org/rocksdb/*.java + jar -cf $(ROCKSDB_JAR) org/rocksdb/*.class + javah -d $(NATIVE_INCLUDE) -jni $(NATIVE_JAVA_CLASSES) + +sample: + javac -cp $(ROCKSDB_JAR) RocksDBSample.java + java -ea -Djava.library.path=.:../ -cp ".:./*" RocksDBSample /tmp/rocksdbjni/ diff --git a/java/RocksDBSample.java b/java/RocksDBSample.java new file mode 100644 index 000000000..dc2ae6cd9 --- /dev/null +++ b/java/RocksDBSample.java @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +import java.util.*; +import java.lang.*; +import org.rocksdb.*; +import java.io.IOException; + +public class RocksDBSample { + static { + System.loadLibrary("rocksdbjni"); + } + + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("usage: RocksDBSample db_path"); + return; + } + String db_path = args[0]; + + System.out.println("RocksDBSample"); + + try { + RocksDB db = RocksDB.open(db_path); + db.put("hello".getBytes(), "world".getBytes()); + byte[] value = db.get("hello".getBytes()); + System.out.format("Get('hello') = %s\n", + new String(value)); + + for (int i = 1; i <= 9; ++i) { + for (int j = 1; j <= 9; ++j) { + db.put(String.format("%dx%d", i, j).getBytes(), + String.format("%d", i * j).getBytes()); + } + } + + for (int i = 1; i <= 9; ++i) { + for (int j = 1; j <= 9; ++j) { + System.out.format("%s ", new String(db.get( + String.format("%dx%d", i, j).getBytes()))); + } + System.out.println(""); + } + + value = db.get("1x1".getBytes()); + assert(value != null); + value = db.get("world".getBytes()); + assert(value == null); + + byte[] testKey = "asdf".getBytes(); + byte[] testValue = + "asdfghjkl;'?> testKey.length); + len = db.get("asdfjkl;".getBytes(), enoughArray); + assert(len == RocksDB.NOT_FOUND); + len = db.get(testKey, enoughArray); + assert(len == testValue.length); + try { + db.close(); + } catch (IOException e) { + System.err.println(e); + } + } catch (RocksDBException e) { + System.err.println(e); + } + } +} diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java new file mode 100644 index 000000000..cd43cf4e5 --- /dev/null +++ b/java/org/rocksdb/RocksDB.java @@ -0,0 +1,101 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import java.lang.*; +import java.util.*; +import java.io.Closeable; +import java.io.IOException; + +/** + * A RocksDB is a persistent ordered map from keys to values. It is safe for + * concurrent access from multiple threads without any external synchronization. + * All methods of this class could potentially throw RocksDBException, which + * indicates sth wrong at the rocksdb library side and the call failed. + */ +public class RocksDB implements Closeable { + public static final int NOT_FOUND = -1; + /** + * The factory constructor of RocksDB that opens a RocksDB instance given + * the path to the database. + * + * @param path the path to the rocksdb. + * @param status an out value indicating the status of the Open(). + * @return a rocksdb instance on success, null if the specified rocksdb can + * not be opened. + */ + public static RocksDB open(String path) throws RocksDBException { + RocksDB db = new RocksDB(); + db.open0(path); + return db; + } + + @Override public void close() throws IOException { + close0(); + } + + /** + * Set the database entry for "key" to "value". + * + * @param key the specified key to be inserted. + * @param value the value associated with the specified key. + */ + public void put(byte[] key, byte[] value) throws RocksDBException { + put(key, key.length, value, value.length); + } + + /** + * Get the value associated with the specified key. + * + * @param key the key to retrieve the value. + * @param value the out-value to receive the retrieved value. + * @return The size of the actual value that matches the specified + * {@code key} in byte. If the return value is greater than the + * length of {@code value}, then it indicates that the size of the + * input buffer {@code value} is insufficient and partial result will + * be returned. RocksDB.NOT_FOUND will be returned if the value not + * found. + */ + public int get(byte[] key, byte[] value) throws RocksDBException { + return get(key, key.length, value, value.length); + } + + /** + * The simplified version of get which returns a new byte array storing + * the value associated with the specified input key if any. null will be + * returned if the specified key is not found. + * + * @param key the key retrieve the value. + * @return a byte array storing the value associated with the input key if + * any. null if it does not find the specified key. + * + * @see RocksDBException + */ + public byte[] get(byte[] key) throws RocksDBException { + return get(key, key.length); + } + + /** + * Private constructor. + */ + private RocksDB() { + nativeHandle = -1; + } + + // native methods + private native void open0(String path) throws RocksDBException; + private native void put( + byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; + private native int get( + byte[] key, int keyLen, + byte[] value, int valueLen) throws RocksDBException; + private native byte[] get( + byte[] key, int keyLen) throws RocksDBException; + private native void close0(); + + private long nativeHandle; +} diff --git a/java/org/rocksdb/RocksDBException.java b/java/org/rocksdb/RocksDBException.java new file mode 100644 index 000000000..e426e03ee --- /dev/null +++ b/java/org/rocksdb/RocksDBException.java @@ -0,0 +1,24 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb; + +import java.lang.*; +import java.util.*; + +/** + * A RocksDBException encapsulates the error of an operation. This exception + * type is used to describe an internal error from the c++ rocksdb library. + */ +public class RocksDBException extends Exception { + /** + * The private construct used by a set of public static factory method. + * + * @param msg the specified error message. + */ + public RocksDBException(String msg) { + super(msg); + } +} diff --git a/java/rocksjni/portal.h b/java/rocksjni/portal.h new file mode 100644 index 000000000..d51ea2059 --- /dev/null +++ b/java/rocksjni/portal.h @@ -0,0 +1,81 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +// This file is designed for caching those frequently used IDs and provide +// efficient portal (i.e, a set of static functions) to access java code +// from c++. + +#ifndef JAVA_ROCKSJNI_PORTAL_H_ +#define JAVA_ROCKSJNI_PORTAL_H_ + +#include +#include "rocksdb/db.h" + +namespace rocksdb { + +// The portal class for org.rocksdb.RocksDB +class RocksDBJni { + public: + // Get the java class id of org.rocksdb.RocksDB. + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/RocksDB"); + assert(jclazz != nullptr); + return jclazz; + } + + // Get the field id of the member variable of org.rocksdb.RocksDB + // that stores the pointer to rocksdb::DB. + static jfieldID getHandleFieldID(JNIEnv* env) { + static jfieldID fid = env->GetFieldID( + getJClass(env), "nativeHandle", "J"); + assert(fid != nullptr); + return fid; + } + + // Get the pointer to rocksdb::DB of the specified org.rocksdb.RocksDB. + static rocksdb::DB* getHandle(JNIEnv* env, jobject jdb) { + return reinterpret_cast( + env->GetLongField(jdb, getHandleFieldID(env))); + } + + // Pass the rocksdb::DB pointer to the java side. + static void setHandle(JNIEnv* env, jobject jdb, rocksdb::DB* db) { + env->SetLongField( + jdb, getHandleFieldID(env), + reinterpret_cast(db)); + } +}; + +// The portal class for org.rocksdb.RocksDBException +class RocksDBExceptionJni { + public: + // Get the jclass of org.rocksdb.RocksDBException + static jclass getJClass(JNIEnv* env) { + static jclass jclazz = env->FindClass("org/rocksdb/RocksDBException"); + assert(jclazz != nullptr); + return jclazz; + } + + // Create and throw a java exception by converting the input + // Status to an RocksDBException. + // + // In case s.ok() is true, then this function will not throw any + // exception. + static void ThrowNew(JNIEnv* env, Status s) { + if (s.ok()) { + return; + } + jstring msg = env->NewStringUTF(s.ToString().c_str()); + // get the constructor id of org.rocksdb.RocksDBException + static jmethodID mid = env->GetMethodID( + getJClass(env), "", "(Ljava/lang/String;)V"); + assert(mid != nullptr); + + env->Throw((jthrowable)env->NewObject(getJClass(env), mid, msg)); + } +}; + +} // namespace rocksdb +#endif // JAVA_ROCKSJNI_PORTAL_H_ diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc new file mode 100644 index 000000000..6c992bc00 --- /dev/null +++ b/java/rocksjni/rocksjni.cc @@ -0,0 +1,177 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// This file implements the "bridge" between Java and C++ and enables +// calling c++ rocksdb::DB methods from Java side. + +#include +#include +#include +#include + +#include "include/org_rocksdb_RocksDB.h" +#include "rocksjni/portal.h" +#include "rocksdb/db.h" + +/* + * Class: org_rocksdb_RocksDB + * Method: open0 + * Signature: (Ljava/lang/String;)V + */ +void Java_org_rocksdb_RocksDB_open0( + JNIEnv* env, jobject java_db, jstring jdb_path) { + rocksdb::DB* db; + rocksdb::Options options; + options.create_if_missing = true; + + jboolean isCopy = false; + const char* db_path = env->GetStringUTFChars(jdb_path, &isCopy); + rocksdb::Status s = rocksdb::DB::Open(options, db_path, &db); + env->ReleaseStringUTFChars(jdb_path, db_path); + + if (s.ok()) { + rocksdb::RocksDBJni::setHandle(env, java_db, db); + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: put + * Signature: ([BI[BI)V + */ +void Java_org_rocksdb_RocksDB_put( + JNIEnv* env, jobject jdb, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); + + jboolean isCopy; + jbyte* key = env->GetByteArrayElements(jkey, &isCopy); + jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); + rocksdb::Slice key_slice( + reinterpret_cast(key), jkey_len); + rocksdb::Slice value_slice( + reinterpret_cast(value), jvalue_len); + + rocksdb::Status s = db->Put( + rocksdb::WriteOptions(), key_slice, value_slice); + + // trigger java unref on key and value. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); + + if (s.ok()) { + return; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); +} + +/* + * Class: org_rocksdb_RocksDB + * Method: get + * Signature: ([BI)[B + */ +jbyteArray Java_org_rocksdb_RocksDB_get___3BI( + JNIEnv* env, jobject jdb, jbyteArray jkey, jint jkey_len) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); + + jboolean isCopy; + jbyte* key = env->GetByteArrayElements(jkey, &isCopy); + rocksdb::Slice key_slice( + reinterpret_cast(key), jkey_len); + + std::string value; + rocksdb::Status s = db->Get( + rocksdb::ReadOptions(), + key_slice, &value); + + // trigger java unref on key. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + + if (s.IsNotFound()) { + return nullptr; + } + + if (s.ok()) { + jbyteArray jvalue = env->NewByteArray(value.size()); + env->SetByteArrayRegion( + jvalue, 0, value.size(), + reinterpret_cast(value.c_str())); + return jvalue; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + + return nullptr; +} + +/* + * Class: org_rocksdb_RocksDB + * Method: get + * Signature: ([BI[BI)I + */ +jint Java_org_rocksdb_RocksDB_get___3BI_3BI( + JNIEnv* env, jobject jdb, + jbyteArray jkey, jint jkey_len, + jbyteArray jvalue, jint jvalue_len) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); + + jboolean isCopy; + jbyte* key = env->GetByteArrayElements(jkey, &isCopy); + jbyte* value = env->GetByteArrayElements(jvalue, &isCopy); + rocksdb::Slice key_slice( + reinterpret_cast(key), jkey_len); + + // TODO(yhchiang): we might save one memory allocation here by adding + // a DB::Get() function which takes preallocated jbyte* as input. + std::string cvalue; + rocksdb::Status s = db->Get( + rocksdb::ReadOptions(), key_slice, &cvalue); + + // trigger java unref on key. + // by passing JNI_ABORT, it will simply release the reference without + // copying the result back to the java byte array. + env->ReleaseByteArrayElements(jkey, key, JNI_ABORT); + + if (s.IsNotFound()) { + env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); + return -1; + } else if (s.ok()) { + int cvalue_len = static_cast(cvalue.size()); + int length = cvalue_len; + // currently we prevent overflowing. + if (length > jvalue_len) { + length = jvalue_len; + } + memcpy(value, cvalue.c_str(), length); + env->ReleaseByteArrayElements(jvalue, value, JNI_COMMIT); + if (cvalue_len > length) { + return static_cast(cvalue.size()); + } + return length; + } + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + + return -1; +} + +/* + * Class: org_rocksdb_RocksDB + * Method: close0 + * Signature: ()V + */ +void Java_org_rocksdb_RocksDB_close0( + JNIEnv* env, jobject java_db) { + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, java_db); + delete db; + db = nullptr; + + rocksdb::RocksDBJni::setHandle(env, java_db, db); +} From 64ae6e9eb94228c11498edd8756b5f439338ff8b Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 28 Mar 2014 15:04:11 -0700 Subject: [PATCH 14/23] Don't preallocate log files --- util/env_posix.cc | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 237038fcb..da65d7374 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1363,7 +1363,10 @@ class PosixEnv : public Env { EnvOptions OptimizeForLogWrite(const EnvOptions& env_options) const { EnvOptions optimized = env_options; optimized.use_mmap_writes = false; - optimized.fallocate_with_keep_size = false; + // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it + // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit + // test and make this false + optimized.fallocate_with_keep_size = true; return optimized; } From 4031b98373c8770a13a6cca168495fd7c5c12af0 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Fri, 20 Dec 2013 17:17:00 -0800 Subject: [PATCH 15/23] A GIS implementation for rocksdb. Summary: This patch stores gps locations in rocksdb. Each object is uniquely identified by an id. Each object has a gps (latitude, longitude) associated with it. The geodb supports looking up an object either by its gps location or by its id. There is a method to retrieve all objects within a circular radius centered at a specified gps location. Test Plan: Simple unit-test attached. Reviewers: leveldb, haobo Reviewed By: haobo CC: leveldb, tecbot, haobo Differential Revision: https://reviews.facebook.net/D15567 --- Makefile | 6 +- include/utilities/geo_db.h | 103 ++++++++ utilities/geodb/geodb_impl.cc | 427 ++++++++++++++++++++++++++++++++++ utilities/geodb/geodb_impl.h | 187 +++++++++++++++ utilities/geodb/geodb_test.cc | 123 ++++++++++ 5 files changed, 845 insertions(+), 1 deletion(-) create mode 100644 include/utilities/geo_db.h create mode 100644 utilities/geodb/geodb_impl.cc create mode 100644 utilities/geodb/geodb_impl.h create mode 100644 utilities/geodb/geodb_test.cc diff --git a/Makefile b/Makefile index 63a3646e4..5e7d87e58 100644 --- a/Makefile +++ b/Makefile @@ -93,7 +93,8 @@ TESTS = \ write_batch_test\ deletefile_test \ table_test \ - thread_local_test + thread_local_test \ + geodb_test TOOLS = \ sst_dump \ @@ -366,6 +367,9 @@ merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) deletefile_test: db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/deletefile_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +geodb_test: utilities/geodb/geodb_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) utilities/geodb/geodb_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + $(MEMENVLIBRARY) : $(MEMENVOBJECTS) rm -f $@ $(AR) -rs $@ $(MEMENVOBJECTS) diff --git a/include/utilities/geo_db.h b/include/utilities/geo_db.h new file mode 100644 index 000000000..8b3e44b06 --- /dev/null +++ b/include/utilities/geo_db.h @@ -0,0 +1,103 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// + +#pragma once +#include +#include + +#include "utilities/stackable_db.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +// +// Configurable options needed for setting up a Geo database +// +struct GeoDBOptions { + // Backup info and error messages will be written to info_log + // if non-nullptr. + // Default: nullptr + Logger* info_log; + + explicit GeoDBOptions(Logger* _info_log = nullptr):info_log(_info_log) { } +}; + +// +// A position in the earth's geoid +// +class GeoPosition { + public: + double latitude; + double longitude; + + explicit GeoPosition(double la = 0, double lo = 0) : + latitude(la), longitude(lo) { + } +}; + +// +// Description of an object on the Geoid. It is located by a GPS location, +// and is identified by the id. The value associated with this object is +// an opaque string 'value'. Different objects identified by unique id's +// can have the same gps-location associated with them. +// +class GeoObject { + public: + GeoPosition position; + std::string id; + std::string value; + + GeoObject() {} + + GeoObject(const GeoPosition& pos, const std::string& i, + const std::string& val) : + position(pos), id(i), value(val) { + } +}; + +// +// Stack your DB with GeoDB to be able to get geo-spatial support +// +class GeoDB : public StackableDB { + public: + // GeoDBOptions have to be the same as the ones used in a previous + // incarnation of the DB + // + // GeoDB owns the pointer `DB* db` now. You should not delete it or + // use it after the invocation of GeoDB + // GeoDB(DB* db, const GeoDBOptions& options) : StackableDB(db) {} + GeoDB(DB* db, const GeoDBOptions& options) : StackableDB(db) {} + virtual ~GeoDB() {} + + // Insert a new object into the location database. The object is + // uniquely identified by the id. If an object with the same id already + // exists in the db, then the old one is overwritten by the new + // object being inserted here. + virtual Status Insert(const GeoObject& object) = 0; + + // Retrieve the value of the object located at the specified GPS + // location and is identified by the 'id'. + virtual Status GetByPosition(const GeoPosition& pos, + const Slice& id, std::string* value) = 0; + + // Retrieve the value of the object identified by the 'id'. This method + // could be potentially slower than GetByPosition + virtual Status GetById(const Slice& id, GeoObject* object) = 0; + + // Delete the specified object + virtual Status Remove(const Slice& id) = 0; + + // Returns a list of all items within a circular radius from the + // specified gps location. If 'number_of_values' is specified, + // then this call returns at most that many number of objects. + // The radius is specified in 'meters'. + virtual Status SearchRadial(const GeoPosition& pos, + double radius, + std::vector* values, + int number_of_values = INT_MAX) = 0; +}; + +} // namespace rocksdb diff --git a/utilities/geodb/geodb_impl.cc b/utilities/geodb/geodb_impl.cc new file mode 100644 index 000000000..095ecf8ab --- /dev/null +++ b/utilities/geodb/geodb_impl.cc @@ -0,0 +1,427 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#include "utilities/geodb/geodb_impl.h" + +#define __STDC_FORMAT_MACROS + +#include +#include +#include +#include +#include "db/filename.h" +#include "util/coding.h" + +// +// There are two types of keys. The first type of key-values +// maps a geo location to the set of object ids and their values. +// Table 1 +// key : p + : + $quadkey + : + $id + +// : + $latitude + : + $longitude +// value : value of the object +// This table can be used to find all objects that reside near +// a specified geolocation. +// +// Table 2 +// key : 'k' + : + $id +// value: $quadkey + +namespace rocksdb { + +GeoDBImpl::GeoDBImpl(DB* db, const GeoDBOptions& options) : + GeoDB(db, options), db_(db), options_(options) { +} + +GeoDBImpl::~GeoDBImpl() { +} + +Status GeoDBImpl::Insert(const GeoObject& obj) { + WriteBatch batch; + + // It is possible that this id is already associated with + // with a different position. We first have to remove that + // association before we can insert the new one. + + // remove existing object, if it exists + GeoObject old; + Status status = GetById(obj.id, &old); + if (status.ok()) { + assert(obj.id.compare(old.id) == 0); + std::string quadkey = PositionToQuad(old.position, Detail); + std::string key1 = MakeKey1(old.position, old.id, quadkey); + std::string key2 = MakeKey2(old.id); + batch.Delete(Slice(key1)); + batch.Delete(Slice(key2)); + } else if (status.IsNotFound()) { + // What if another thread is trying to insert the same ID concurrently? + } else { + return status; + } + + // insert new object + std::string quadkey = PositionToQuad(obj.position, Detail); + std::string key1 = MakeKey1(obj.position, obj.id, quadkey); + std::string key2 = MakeKey2(obj.id); + batch.Put(Slice(key1), Slice(obj.value)); + batch.Put(Slice(key2), Slice(quadkey)); + return db_->Write(woptions_, &batch); +} + +Status GeoDBImpl::GetByPosition(const GeoPosition& pos, + const Slice& id, + std::string* value) { + std::string quadkey = PositionToQuad(pos, Detail); + std::string key1 = MakeKey1(pos, id, quadkey); + return db_->Get(roptions_, Slice(key1), value); +} + +Status GeoDBImpl::GetById(const Slice& id, GeoObject* object) { + Status status; + Slice quadkey; + + // create an iterator so that we can get a consistent picture + // of the database. + Iterator* iter = db_->NewIterator(roptions_); + + // create key for table2 + std::string kt = MakeKey2(id); + Slice key2(kt); + + iter->Seek(key2); + if (iter->Valid() && iter->status().ok()) { + if (iter->key().compare(key2) == 0) { + quadkey = iter->value(); + } + } + if (quadkey.size() == 0) { + delete iter; + return Status::NotFound(key2); + } + + // + // Seek to the quadkey + id prefix + // + std::string prefix = MakeKey1Prefix(quadkey.ToString(), id); + iter->Seek(Slice(prefix)); + assert(iter->Valid()); + if (!iter->Valid() || !iter->status().ok()) { + delete iter; + return Status::NotFound(); + } + + // split the key into p + quadkey + id + lat + lon + std::vector parts; + Slice key = iter->key(); + StringSplit(&parts, key.ToString(), ':'); + assert(parts.size() == 5); + assert(parts[0] == "p"); + assert(parts[1] == quadkey); + assert(parts[2] == id); + + // fill up output parameters + object->position.latitude = atof(parts[3].c_str()); + object->position.longitude = atof(parts[4].c_str()); + object->id = id.ToString(); // this is redundant + object->value = iter->value().ToString(); + delete iter; + return Status::OK(); +} + + +Status GeoDBImpl::Remove(const Slice& id) { + // Read the object from the database + GeoObject obj; + Status status = GetById(id, &obj); + if (!status.ok()) { + return status; + } + + // remove the object by atomically deleting it from both tables + std::string quadkey = PositionToQuad(obj.position, Detail); + std::string key1 = MakeKey1(obj.position, obj.id, quadkey); + std::string key2 = MakeKey2(obj.id); + WriteBatch batch; + batch.Delete(Slice(key1)); + batch.Delete(Slice(key2)); + return db_->Write(woptions_, &batch); +} + +Status GeoDBImpl::SearchRadial(const GeoPosition& pos, + double radius, + std::vector* values, + int number_of_values) { + // Gather all bounding quadkeys + std::vector qids; + Status s = searchQuadIds(pos, radius, &qids); + if (!s.ok()) { + return s; + } + + // create an iterator + Iterator* iter = db_->NewIterator(ReadOptions()); + + // Process each prospective quadkey + for (std::string qid : qids) { + // The user is interested in only these many objects. + if (number_of_values == 0) { + break; + } + + // convert quadkey to db key prefix + std::string dbkey = MakeQuadKeyPrefix(qid); + + for (iter->Seek(dbkey); + number_of_values > 0 && iter->Valid() && iter->status().ok(); + iter->Next()) { + // split the key into p + quadkey + id + lat + lon + std::vector parts; + Slice key = iter->key(); + StringSplit(&parts, key.ToString(), ':'); + assert(parts.size() == 5); + assert(parts[0] == "p"); + std::string* quadkey = &parts[1]; + + // If the key we are looking for is a prefix of the key + // we found from the database, then this is one of the keys + // we are looking for. + auto res = std::mismatch(qid.begin(), qid.end(), quadkey->begin()); + if (res.first == qid.end()) { + GeoPosition pos(atof(parts[3].c_str()), atof(parts[4].c_str())); + GeoObject obj(pos, parts[4], iter->value().ToString()); + values->push_back(obj); + number_of_values--; + } else { + break; + } + } + } + delete iter; + return Status::OK(); +} + +std::string GeoDBImpl::MakeKey1(const GeoPosition& pos, Slice id, + std::string quadkey) { + std::string lat = std::to_string(pos.latitude); + std::string lon = std::to_string(pos.longitude); + std::string key = "p:"; + key.reserve(5 + quadkey.size() + id.size() + lat.size() + lon.size()); + key.append(quadkey); + key.append(":"); + key.append(id.ToString()); + key.append(":"); + key.append(lat); + key.append(":"); + key.append(lon); + return key; +} + +std::string GeoDBImpl::MakeKey2(Slice id) { + std::string key = "k:"; + key.append(id.ToString()); + return key; +} + +std::string GeoDBImpl::MakeKey1Prefix(std::string quadkey, + Slice id) { + std::string key = "p:"; + key.reserve(3 + quadkey.size() + id.size()); + key.append(quadkey); + key.append(":"); + key.append(id.ToString()); + return key; +} + +std::string GeoDBImpl::MakeQuadKeyPrefix(std::string quadkey) { + std::string key = "p:"; + key.append(quadkey); + return key; +} + +void GeoDBImpl::StringSplit(std::vector* tokens, + const std::string &text, char sep) { + std::size_t start = 0, end = 0; + while ((end = text.find(sep, start)) != std::string::npos) { + tokens->push_back(text.substr(start, end - start)); + start = end + 1; + } + tokens->push_back(text.substr(start)); +} + +// convert degrees to radians +double GeoDBImpl::radians(double x) { + return (x * PI) / 180; +} + +// convert radians to degrees +double GeoDBImpl::degrees(double x) { + return (x * 180) / PI; +} + +// convert a gps location to quad coordinate +std::string GeoDBImpl::PositionToQuad(const GeoPosition& pos, + int levelOfDetail) { + Pixel p = PositionToPixel(pos, levelOfDetail); + Tile tile = PixelToTile(p); + return TileToQuadKey(tile, levelOfDetail); +} + +GeoPosition GeoDBImpl::displaceLatLon(double lat, double lon, + double deltay, double deltax) { + double dLat = deltay / EarthRadius; + double dLon = deltax / (EarthRadius * cos(radians(lat))); + return GeoPosition(lat + degrees(dLat), + lon + degrees(dLon)); +} + +// +// Return the distance between two positions on the earth +// +double GeoDBImpl::distance(double lat1, double lon1, + double lat2, double lon2) { + double lon = radians(lon2 - lon1); + double lat = radians(lat2 - lat1); + + double a = (sin(lat / 2) * sin(lat / 2)) + + cos(radians(lat1)) * cos(radians(lat2)) * + (sin(lon / 2) * sin(lon / 2)); + double angle = 2 * atan2(sqrt(a), sqrt(1 - a)); + return angle * EarthRadius; +} + +// +// Returns all the quadkeys inside the search range +// +Status GeoDBImpl::searchQuadIds(const GeoPosition& position, + double radius, + std::vector* quadKeys) { + // get the outline of the search square + GeoPosition topLeftPos = boundingTopLeft(position, radius); + GeoPosition bottomRightPos = boundingBottomRight(position, radius); + + Pixel topLeft = PositionToPixel(topLeftPos, Detail); + Pixel bottomRight = PositionToPixel(bottomRightPos, Detail); + + // how many level of details to look for + int numberOfTilesAtMaxDepth = floor((bottomRight.x - topLeft.x) / 256); + int zoomLevelsToRise = floor(log(numberOfTilesAtMaxDepth) / log(2)); + zoomLevelsToRise++; + int levels = std::max(0, Detail - zoomLevelsToRise); + + quadKeys->push_back(PositionToQuad(GeoPosition(topLeftPos.latitude, + topLeftPos.longitude), + levels)); + quadKeys->push_back(PositionToQuad(GeoPosition(topLeftPos.latitude, + bottomRightPos.longitude), + levels)); + quadKeys->push_back(PositionToQuad(GeoPosition(bottomRightPos.latitude, + topLeftPos.longitude), + levels)); + quadKeys->push_back(PositionToQuad(GeoPosition(bottomRightPos.latitude, + bottomRightPos.longitude), + levels)); + return Status::OK(); +} + +// Determines the ground resolution (in meters per pixel) at a specified +// latitude and level of detail. +// Latitude (in degrees) at which to measure the ground resolution. +// Level of detail, from 1 (lowest detail) to 23 (highest detail). +// Returns the ground resolution, in meters per pixel. +double GeoDBImpl::GroundResolution(double latitude, int levelOfDetail) { + latitude = clip(latitude, MinLatitude, MaxLatitude); + return cos(latitude * PI / 180) * 2 * PI * EarthRadius / + MapSize(levelOfDetail); +} + +// Converts a point from latitude/longitude WGS-84 coordinates (in degrees) +// into pixel XY coordinates at a specified level of detail. +GeoDBImpl::Pixel GeoDBImpl::PositionToPixel(const GeoPosition& pos, + int levelOfDetail) { + double latitude = clip(pos.latitude, MinLatitude, MaxLatitude); + double x = (pos.longitude + 180) / 360; + double sinLatitude = sin(latitude * PI / 180); + double y = 0.5 - log((1 + sinLatitude) / (1 - sinLatitude)) / (4 * PI); + double mapSize = MapSize(levelOfDetail); + double X = floor(clip(x * mapSize + 0.5, 0, mapSize - 1)); + double Y = floor(clip(y * mapSize + 0.5, 0, mapSize - 1)); + return Pixel((unsigned int)X, (unsigned int)Y); +} + +GeoPosition GeoDBImpl::PixelToPosition(const Pixel& pixel, int levelOfDetail) { + double mapSize = MapSize(levelOfDetail); + double x = (clip(pixel.x, 0, mapSize - 1) / mapSize) - 0.5; + double y = 0.5 - (clip(pixel.y, 0, mapSize - 1) / mapSize); + double latitude = 90 - 360 * atan(exp(-y * 2 * PI)) / PI; + double longitude = 360 * x; + return GeoPosition(latitude, longitude); +} + +// Converts a Pixel to a Tile +GeoDBImpl::Tile GeoDBImpl::PixelToTile(const Pixel& pixel) { + unsigned int tileX = floor(pixel.x / 256); + unsigned int tileY = floor(pixel.y / 256); + return Tile(tileX, tileY); +} + +GeoDBImpl::Pixel GeoDBImpl::TileToPixel(const Tile& tile) { + unsigned int pixelX = tile.x * 256; + unsigned int pixelY = tile.y * 256; + return Pixel(pixelX, pixelY); +} + +// Convert a Tile to a quadkey +std::string GeoDBImpl::TileToQuadKey(const Tile& tile, int levelOfDetail) { + std::stringstream quadKey; + for (int i = levelOfDetail; i > 0; i--) { + char digit = '0'; + int mask = 1 << (i - 1); + if ((tile.x & mask) != 0) { + digit++; + } + if ((tile.y & mask) != 0) { + digit++; + digit++; + } + quadKey << digit; + } + return quadKey.str(); +} + +// +// Convert a quadkey to a tile and its level of detail +// +void GeoDBImpl::QuadKeyToTile(std::string quadkey, Tile* tile, + int *levelOfDetail) { + tile->x = tile->y = 0; + *levelOfDetail = quadkey.size(); + const char* key = reinterpret_cast(quadkey.c_str()); + for (int i = *levelOfDetail; i > 0; i--) { + int mask = 1 << (i - 1); + switch (key[*levelOfDetail - i]) { + case '0': + break; + + case '1': + tile->x |= mask; + break; + + case '2': + tile->y |= mask; + break; + + case '3': + tile->x |= mask; + tile->y |= mask; + break; + + default: + std::stringstream msg; + msg << quadkey; + msg << " Invalid QuadKey."; + throw std::runtime_error(msg.str()); + } + } +} +} // namespace rocksdb diff --git a/utilities/geodb/geodb_impl.h b/utilities/geodb/geodb_impl.h new file mode 100644 index 000000000..376a211c6 --- /dev/null +++ b/utilities/geodb/geodb_impl.h @@ -0,0 +1,187 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// + +#pragma once +#include +#include +#include +#include +#include +#include + +#include "utilities/geo_db.h" +#include "utilities/stackable_db.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +// A specific implementation of GeoDB + +class GeoDBImpl : public GeoDB { + public: + GeoDBImpl(DB* db, const GeoDBOptions& options); + ~GeoDBImpl(); + + // Associate the GPS location with the identified by 'id'. The value + // is a blob that is associated with this object. + virtual Status Insert(const GeoObject& object); + + // Retrieve the value of the object located at the specified GPS + // location and is identified by the 'id'. + virtual Status GetByPosition(const GeoPosition& pos, + const Slice& id, + std::string* value); + + // Retrieve the value of the object identified by the 'id'. This method + // could be potentially slower than GetByPosition + virtual Status GetById(const Slice& id, GeoObject* object); + + // Delete the specified object + virtual Status Remove(const Slice& id); + + // Returns a list of all items within a circular radius from the + // specified gps location + virtual Status SearchRadial(const GeoPosition& pos, + double radius, + std::vector* values, + int number_of_values); + + private: + DB* db_; + const GeoDBOptions options_; + const WriteOptions woptions_; + const ReadOptions roptions_; + + // The value of PI + static constexpr double PI = 3.141592653589793; + + // convert degrees to radians + static double radians(double x); + + // convert radians to degrees + static double degrees(double x); + + // A pixel class that captures X and Y coordinates + class Pixel { + public: + unsigned int x; + unsigned int y; + Pixel(unsigned int a, unsigned int b) : + x(a), y(b) { + } + }; + + // A Tile in the geoid + class Tile { + public: + unsigned int x; + unsigned int y; + Tile(unsigned int a, unsigned int b) : + x(a), y(b) { + } + }; + + // convert a gps location to quad coordinate + static std::string PositionToQuad(const GeoPosition& pos, int levelOfDetail); + + // arbitrary constant use for WGS84 via + // http://en.wikipedia.org/wiki/World_Geodetic_System + // http://mathforum.org/library/drmath/view/51832.html + // http://msdn.microsoft.com/en-us/library/bb259689.aspx + // http://www.tuicool.com/articles/NBrE73 + // + const int Detail = 23; + static constexpr double EarthRadius = 6378137; + static constexpr double MinLatitude = -85.05112878; + static constexpr double MaxLatitude = 85.05112878; + static constexpr double MinLongitude = -180; + static constexpr double MaxLongitude = 180; + + // clips a number to the specified minimum and maximum values. + static double clip(double n, double minValue, double maxValue) { + return fmin(fmax(n, minValue), maxValue); + } + + // Determines the map width and height (in pixels) at a specified level + // of detail, from 1 (lowest detail) to 23 (highest detail). + // Returns the map width and height in pixels. + static unsigned int MapSize(int levelOfDetail) { + return (unsigned int)(256 << levelOfDetail); + } + + // Determines the ground resolution (in meters per pixel) at a specified + // latitude and level of detail. + // Latitude (in degrees) at which to measure the ground resolution. + // Level of detail, from 1 (lowest detail) to 23 (highest detail). + // Returns the ground resolution, in meters per pixel. + static double GroundResolution(double latitude, int levelOfDetail); + + // Converts a point from latitude/longitude WGS-84 coordinates (in degrees) + // into pixel XY coordinates at a specified level of detail. + static Pixel PositionToPixel(const GeoPosition& pos, int levelOfDetail); + + static GeoPosition PixelToPosition(const Pixel& pixel, int levelOfDetail); + + // Converts a Pixel to a Tile + static Tile PixelToTile(const Pixel& pixel); + + static Pixel TileToPixel(const Tile& tile); + + // Convert a Tile to a quadkey + static std::string TileToQuadKey(const Tile& tile, int levelOfDetail); + + // Convert a quadkey to a tile and its level of detail + static void QuadKeyToTile(std::string quadkey, Tile* tile, + int *levelOfDetail); + + // Return the distance between two positions on the earth + static double distance(double lat1, double lon1, + double lat2, double lon2); + static GeoPosition displaceLatLon(double lat, double lon, + double deltay, double deltax); + + // + // Returns the top left position after applying the delta to + // the specified position + // + static GeoPosition boundingTopLeft(const GeoPosition& in, double radius) { + return displaceLatLon(in.latitude, in.longitude, -radius, -radius); + } + + // + // Returns the bottom right position after applying the delta to + // the specified position + static GeoPosition boundingBottomRight(const GeoPosition& in, + double radius) { + return displaceLatLon(in.latitude, in.longitude, radius, radius); + } + + // + // Get all quadkeys within a radius of a specified position + // + Status searchQuadIds(const GeoPosition& position, + double radius, + std::vector* quadKeys); + + // splits a string into its components + static void StringSplit(std::vector* tokens, + const std::string &text, + char sep); + + // + // Create keys for accessing rocksdb table(s) + // + static std::string MakeKey1(const GeoPosition& pos, + Slice id, + std::string quadkey); + static std::string MakeKey2(Slice id); + static std::string MakeKey1Prefix(std::string quadkey, + Slice id); + static std::string MakeQuadKeyPrefix(std::string quadkey); +}; + +} // namespace rocksdb diff --git a/utilities/geodb/geodb_test.cc b/utilities/geodb/geodb_test.cc new file mode 100644 index 000000000..d7af6c32b --- /dev/null +++ b/utilities/geodb/geodb_test.cc @@ -0,0 +1,123 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// +#include "utilities/geodb/geodb_impl.h" + +#include +#include "util/testharness.h" + +namespace rocksdb { + +class GeoDBTest { + public: + static const std::string kDefaultDbName; + static Options options; + DB* db; + GeoDB* geodb; + + GeoDBTest() { + GeoDBOptions geodb_options; + ASSERT_OK(DestroyDB(kDefaultDbName, options)); + options.create_if_missing = true; + Status status = DB::Open(options, kDefaultDbName, &db); + geodb = new GeoDBImpl(db, geodb_options); + } + + ~GeoDBTest() { + delete geodb; + } + + GeoDB* getdb() { + return geodb; + } +}; + +const std::string GeoDBTest::kDefaultDbName = "/tmp/geodefault/"; +Options GeoDBTest::options = Options(); + +// Insert, Get and Remove +TEST(GeoDBTest, SimpleTest) { + GeoPosition pos1(100, 101); + std::string id1("id1"); + std::string value1("value1"); + + // insert first object into database + GeoObject obj1(pos1, id1, value1); + Status status = getdb()->Insert(obj1); + ASSERT_TRUE(status.ok()); + + // insert second object into database + GeoPosition pos2(200, 201); + std::string id2("id2"); + std::string value2 = "value2"; + GeoObject obj2(pos2, id2, value2); + status = getdb()->Insert(obj2); + ASSERT_TRUE(status.ok()); + + // retrieve first object using position + std::string value; + status = getdb()->GetByPosition(pos1, Slice(id1), &value); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(value, value1); + + // retrieve first object using id + GeoObject obj; + status = getdb()->GetById(Slice(id1), &obj); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(obj.position.latitude, 100); + ASSERT_EQ(obj.position.longitude, 101); + ASSERT_EQ(obj.id.compare(id1), 0); + ASSERT_EQ(obj.value, value1); + + // delete first object + status = getdb()->Remove(Slice(id1)); + ASSERT_TRUE(status.ok()); + status = getdb()->GetByPosition(pos1, Slice(id1), &value); + ASSERT_TRUE(status.IsNotFound()); + status = getdb()->GetById(id1, &obj); + ASSERT_TRUE(status.IsNotFound()); + + // check that we can still find second object + status = getdb()->GetByPosition(pos2, id2, &value); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(value, value2); + status = getdb()->GetById(id2, &obj); + ASSERT_TRUE(status.ok()); +} + +// Search. +// Verify distances via http://www.stevemorse.org/nearest/distance.php +TEST(GeoDBTest, Search) { + GeoPosition pos1(45, 45); + std::string id1("mid1"); + std::string value1 = "midvalue1"; + + // insert object at 45 degree latitude + GeoObject obj1(pos1, id1, value1); + Status status = getdb()->Insert(obj1); + ASSERT_TRUE(status.ok()); + + // search all objects centered at 46 degree latitude with + // a radius of 200 kilometers. We should find the one object that + // we inserted earlier. + std::vector values; + status = getdb()->SearchRadial(GeoPosition(46, 46), 200000, &values); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(values.size(), 1); + + // search all objects centered at 46 degree latitude with + // a radius of 2 kilometers. There should be none. + values.clear(); + status = getdb()->SearchRadial(GeoPosition(46, 46), 2, &values); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(values.size(), 0); +} + +} // namespace rocksdb + +int main(int argc, char* argv[]) { + return rocksdb::test::RunAllTests(); +} From 96e2c2c03164d5d7b7dd44bdc712a402d4868eed Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Fri, 28 Mar 2014 16:12:50 -0700 Subject: [PATCH 16/23] Geo spatial support. Summary: Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- HISTORY.md | 1 + 1 file changed, 1 insertion(+) diff --git a/HISTORY.md b/HISTORY.md index 905901853..f6ee32a60 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,6 +22,7 @@ we will ignore it. We assume that writers of these records were interrupted and that we can safely ignore it. * Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. +* Geo-spatial support for locations and radial-search. ## 2.7.0 (01/28/2014) From c8bb79978e287d991897e98a539a92e6b744f14e Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Fri, 28 Mar 2014 16:21:42 -0700 Subject: [PATCH 17/23] fix the buffer overflow in dynamic_bloom_test Summary: int -> uint64_t Test Plan: it think it is pretty obvious will run asan_check before committing Reviewers: igor, haobo Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D17241 --- util/dynamic_bloom_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index d9ececa5a..e8bbc38e1 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -69,7 +69,7 @@ static uint32_t NextNum(uint32_t num) { } TEST(DynamicBloomTest, VaryingLengths) { - char buffer[sizeof(int)]; + char buffer[sizeof(uint64_t)]; // Count number of filters that significantly exceed the false positive rate int mediocre_filters = 0; From 2d3468c2939b86eaa8b68c9c5c163504dccc0de0 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 28 Mar 2014 14:38:10 -0700 Subject: [PATCH 18/23] MemTableIterator not to reference Memtable Summary: In one of the perf, I shows 10%-25% CPU costs of MemTableIterator.Seek(), when doing dynamic prefix seek, are spent on checking whether we need to do bloom filter check or finding out the prefix extractor. Seems that more level of pointer checking makes CPU cache miss more likely. This patch makes things slightly simpler by copying pointer of bloom of prefix extractor into the iterator. Test Plan: make all check Reviewers: haobo, ljin Reviewed By: ljin CC: igor, dhruba, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D17247 --- db/memtable.cc | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index 41dd66cb9..b520fe25d 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -155,22 +155,24 @@ const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: MemTableIterator(const MemTable& mem, const ReadOptions& options) - : mem_(mem), iter_(), dynamic_prefix_seek_(false), valid_(false) { + : bloom_(nullptr), + prefix_extractor_(mem.prefix_extractor_), + iter_(), + valid_(false) { if (options.prefix) { - iter_.reset(mem_.table_->GetPrefixIterator(*options.prefix)); + iter_.reset(mem.table_->GetPrefixIterator(*options.prefix)); } else if (options.prefix_seek) { - dynamic_prefix_seek_ = true; - iter_.reset(mem_.table_->GetDynamicPrefixIterator()); + bloom_ = mem.prefix_bloom_.get(); + iter_.reset(mem.table_->GetDynamicPrefixIterator()); } else { - iter_.reset(mem_.table_->GetIterator()); + iter_.reset(mem.table_->GetIterator()); } } virtual bool Valid() const { return valid_; } virtual void Seek(const Slice& k) { - if (dynamic_prefix_seek_ && mem_.prefix_bloom_ && - !mem_.prefix_bloom_->MayContain( - mem_.prefix_extractor_->Transform(ExtractUserKey(k)))) { + if (bloom_ != nullptr && + !bloom_->MayContain(prefix_extractor_->Transform(ExtractUserKey(k)))) { valid_ = false; return; } @@ -208,9 +210,9 @@ class MemTableIterator: public Iterator { virtual Status status() const { return Status::OK(); } private: - const MemTable& mem_; + DynamicBloom* bloom_; + const SliceTransform* const prefix_extractor_; std::shared_ptr iter_; - bool dynamic_prefix_seek_; bool valid_; // No copying allowed From 43a593a6d921acbf79ccac0abad35c093faac175 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 28 Mar 2014 16:57:04 -0700 Subject: [PATCH 19/23] Change default value of some Options Summary: Since we are optimizing for server workloads, some default values are not optimized any more. We change some of those values that I feel it's less prone to regression bugs. Test Plan: make all check Reviewers: dhruba, haobo, ljin, igor, yhchiang Reviewed By: igor CC: leveldb, MarkCallaghan Differential Revision: https://reviews.facebook.net/D16995 --- db/db_test.cc | 5 +++++ include/rocksdb/options.h | 8 ++++---- util/options.cc | 14 +++++++------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index ee70871a2..0f007f791 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -306,6 +306,7 @@ class DBTest { DBTest() : option_config_(kDefault), env_(new SpecialEnv(Env::Default())) { + last_options_.max_background_flushes = 0; filter_policy_ = NewBloomFilterPolicy(10); dbname_ = test::TmpDir() + "/db_test"; ASSERT_OK(DestroyDB(dbname_, Options())); @@ -371,6 +372,8 @@ class DBTest { // Return the current option configuration. Options CurrentOptions() { Options options; + options.paranoid_checks = false; + options.max_background_flushes = 0; switch (option_config_) { case kHashSkipList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); @@ -431,6 +434,7 @@ class DBTest { options.compaction_style = kCompactionStyleUniversal; break; case kCompressedBlockCache: + options.allow_mmap_writes = true; options.block_cache_compressed = NewLRUCache(8*1024*1024); break; case kInfiniteMaxOpenFiles: @@ -5306,6 +5310,7 @@ TEST(DBTest, ReadCompaction) { options.filter_policy = nullptr; options.block_size = 4096; options.no_block_cache = true; + options.disable_seek_compaction = false; Reopen(&options); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index bd51669a2..54b4ef38f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -146,7 +146,7 @@ struct Options { // If any of the writes to the database fails (Put, Delete, Merge, Write), // the database will switch to read-only mode and fail all other // Write operations. - // Default: false + // Default: true bool paranoid_checks; // Use the specified object to interact with the environment, @@ -199,7 +199,7 @@ struct Options { // on target_file_size_base and target_file_size_multiplier for level-based // compaction. For universal-style compaction, you can usually set it to -1. // - // Default: 1000 + // Default: 5000 int max_open_files; // Control over blocks (user data is stored in a set of blocks, and @@ -436,7 +436,7 @@ struct Options { // Without a separate pool, long running major compaction jobs could // potentially block memtable flush jobs of other db instances, leading to // unnecessary Put stalls. - // Default: 0 + // Default: 1 int max_background_flushes; // Specify the maximal size of the info log file. If the log file @@ -562,7 +562,7 @@ struct Options { // Allow the OS to mmap file for reading sst tables. Default: false bool allow_mmap_reads; - // Allow the OS to mmap file for writing. Default: true + // Allow the OS to mmap file for writing. Default: false bool allow_mmap_writes; // Disable child process inherit open files. Default: true diff --git a/util/options.cc b/util/options.cc index aa1a4c64e..5c5bab557 100644 --- a/util/options.cc +++ b/util/options.cc @@ -35,14 +35,14 @@ Options::Options() compaction_filter_factory_v2(new DefaultCompactionFilterFactoryV2()), create_if_missing(false), error_if_exists(false), - paranoid_checks(false), + paranoid_checks(true), env(Env::Default()), info_log(nullptr), info_log_level(INFO), write_buffer_size(4 << 20), max_write_buffer_number(2), min_write_buffer_number_to_merge(1), - max_open_files(1000), + max_open_files(5000), block_cache(nullptr), block_cache_compressed(nullptr), block_size(4096), @@ -53,8 +53,8 @@ Options::Options() whole_key_filtering(true), num_levels(7), level0_file_num_compaction_trigger(4), - level0_slowdown_writes_trigger(8), - level0_stop_writes_trigger(12), + level0_slowdown_writes_trigger(20), + level0_stop_writes_trigger(24), max_mem_compaction_level(2), target_file_size_base(2 * 1048576), target_file_size_multiplier(1), @@ -69,10 +69,10 @@ Options::Options() db_stats_log_interval(1800), db_log_dir(""), wal_dir(""), - disable_seek_compaction(false), + disable_seek_compaction(true), delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL), max_background_compactions(1), - max_background_flushes(0), + max_background_flushes(1), max_log_file_size(0), log_file_time_to_roll(0), keep_log_file_num(1000), @@ -91,7 +91,7 @@ Options::Options() purge_redundant_kvs_while_flush(true), allow_os_buffer(true), allow_mmap_reads(false), - allow_mmap_writes(true), + allow_mmap_writes(false), is_fd_close_on_exec(true), skip_log_error_on_recovery(false), stats_dump_period_sec(3600), From 550cca719274540ffde1a63cc8046189af5ad54b Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Fri, 28 Mar 2014 17:30:20 -0700 Subject: [PATCH 20/23] dynamicbloom fix: don't offset address when it is already aligned Summary: this causes overflow and asan failure Test Plan: make asan_check Reviewers: igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D17301 --- util/dynamic_bloom.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index 5d3d30f4e..a4c8e11cb 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -39,7 +39,7 @@ DynamicBloom::DynamicBloom(uint32_t total_bits, sz += CACHE_LINE_SIZE - 1; } raw_ = new unsigned char[sz](); - if (kBlocked) { + if (kBlocked && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { data_ = raw_ + CACHE_LINE_SIZE - reinterpret_cast(raw_) % CACHE_LINE_SIZE; } else { From 8a139a054c4c194368fb010245a407e9e12b6be3 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Sat, 29 Mar 2014 10:34:47 -0700 Subject: [PATCH 21/23] More valgrind issues! Summary: Fix some more CompactionFilterV2 valgrind issues. Maybe it would make sense for CompactionFilterV2 to delete its prefix_extractor? Test Plan: ran CompactionFilterV2* tests with valgrind. issues before patch -> no issues after Reviewers: haobo, sdong, ljin, dhruba Reviewed By: dhruba CC: leveldb, danguo Differential Revision: https://reviews.facebook.net/D17337 --- db/db_test.cc | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 0f007f791..8df456dfb 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3707,9 +3707,11 @@ TEST(DBTest, CompactionFilterV2) { options.num_levels = 3; options.max_mem_compaction_level = 0; // extract prefix - auto prefix_extractor = NewFixedPrefixTransform(8); + std::unique_ptr prefix_extractor; + prefix_extractor.reset(NewFixedPrefixTransform(8)); + options.compaction_filter_factory_v2 - = std::make_shared(prefix_extractor); + = std::make_shared(prefix_extractor.get()); // In a testing environment, we can only flush the application // compaction filter buffer using universal compaction option_config_ = kUniversalCompaction; @@ -3757,7 +3759,7 @@ TEST(DBTest, CompactionFilterV2) { // create a new database with the compaction // filter in such a way that it deletes all keys options.compaction_filter_factory_v2 = - std::make_shared(prefix_extractor); + std::make_shared(prefix_extractor.get()); options.create_if_missing = true; DestroyAndReopen(&options); @@ -3792,9 +3794,10 @@ TEST(DBTest, CompactionFilterV2WithValueChange) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto prefix_extractor = NewFixedPrefixTransform(8); + std::unique_ptr prefix_extractor; + prefix_extractor.reset(NewFixedPrefixTransform(8)); options.compaction_filter_factory_v2 = - std::make_shared(prefix_extractor); + std::make_shared(prefix_extractor.get()); // In a testing environment, we can only flush the application // compaction filter buffer using universal compaction option_config_ = kUniversalCompaction; @@ -3832,9 +3835,10 @@ TEST(DBTest, CompactionFilterV2NULLPrefix) { Options options = CurrentOptions(); options.num_levels = 3; options.max_mem_compaction_level = 0; - auto prefix_extractor = NewFixedPrefixTransform(8); + std::unique_ptr prefix_extractor; + prefix_extractor.reset(NewFixedPrefixTransform(8)); options.compaction_filter_factory_v2 = - std::make_shared(prefix_extractor); + std::make_shared(prefix_extractor.get()); // In a testing environment, we can only flush the application // compaction filter buffer using universal compaction option_config_ = kUniversalCompaction; From 5ec38c3d3ecfec75b66b0a900c819ea6c2da8a4d Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Sat, 29 Mar 2014 22:00:52 -0700 Subject: [PATCH 22/23] Minor fix in rocksdb jni library. Summary: Fix a bug in RocksDBSample.java and minor code improvement in rocksjni.cc. Test Plan: make jni make jtest Reviewers: haobo, sdong Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D17325 --- java/RocksDBSample.java | 2 +- java/org/rocksdb/RocksDB.java | 4 +++- java/rocksjni/rocksjni.cc | 42 +++++++++++++++++++++-------------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/java/RocksDBSample.java b/java/RocksDBSample.java index dc2ae6cd9..b574c23e5 100644 --- a/java/RocksDBSample.java +++ b/java/RocksDBSample.java @@ -62,7 +62,7 @@ public class RocksDBSample { byte[] enoughArray = new byte[50]; int len; len = db.get(testKey, insufficientArray); - assert(len > testKey.length); + assert(len > insufficientArray.length); len = db.get("asdfjkl;".getBytes(), enoughArray); assert(len == RocksDB.NOT_FOUND); len = db.get(testKey, enoughArray); diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index cd43cf4e5..7e96eff28 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -34,7 +34,9 @@ public class RocksDB implements Closeable { } @Override public void close() throws IOException { - close0(); + if (nativeHandle != 0) { + close0(); + } } /** diff --git a/java/rocksjni/rocksjni.cc b/java/rocksjni/rocksjni.cc index 6c992bc00..3ae53834e 100644 --- a/java/rocksjni/rocksjni.cc +++ b/java/rocksjni/rocksjni.cc @@ -121,6 +121,9 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( JNIEnv* env, jobject jdb, jbyteArray jkey, jint jkey_len, jbyteArray jvalue, jint jvalue_len) { + static const int kNotFound = -1; + static const int kStatusError = -2; + rocksdb::DB* db = rocksdb::RocksDBJni::getHandle(env, jdb); jboolean isCopy; @@ -142,24 +145,29 @@ jint Java_org_rocksdb_RocksDB_get___3BI_3BI( if (s.IsNotFound()) { env->ReleaseByteArrayElements(jvalue, value, JNI_ABORT); - return -1; - } else if (s.ok()) { - int cvalue_len = static_cast(cvalue.size()); - int length = cvalue_len; - // currently we prevent overflowing. - if (length > jvalue_len) { - length = jvalue_len; - } - memcpy(value, cvalue.c_str(), length); - env->ReleaseByteArrayElements(jvalue, value, JNI_COMMIT); - if (cvalue_len > length) { - return static_cast(cvalue.size()); - } - return length; - } - rocksdb::RocksDBExceptionJni::ThrowNew(env, s); + return kNotFound; + } else if (!s.ok()) { + // Here since we are throwing a Java exception from c++ side. + // As a result, c++ does not know calling this function will in fact + // throwing an exception. As a result, the execution flow will + // not stop here, and codes after this throw will still be + // executed. + rocksdb::RocksDBExceptionJni::ThrowNew(env, s); - return -1; + // Return a dummy const value to avoid compilation error, although + // java side might not have a chance to get the return value :) + return kStatusError; + } + + int cvalue_len = static_cast(cvalue.size()); + int length = std::min(jvalue_len, cvalue_len); + + memcpy(value, cvalue.c_str(), length); + env->ReleaseByteArrayElements(jvalue, value, JNI_COMMIT); + if (cvalue_len > length) { + return static_cast(cvalue_len); + } + return length; } /* From 577556d5f98a905c3cce2cf6684b11cc3e73fa23 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 31 Mar 2014 11:33:09 -0700 Subject: [PATCH 23/23] Don't store version number in MANIFEST Summary: Talked to folks and they found it really scary that they won't be able to roll back once they upgrade to 2.8. We should fix this. Test Plan: make check Reviewers: haobo, ljin Reviewed By: ljin CC: leveldb Differential Revision: https://reviews.facebook.net/D17343 --- HISTORY.md | 2 +- db/db_impl.cc | 1 - db/version_edit.cc | 15 --------------- db/version_edit.h | 10 ---------- db/version_set.cc | 25 +++---------------------- 5 files changed, 4 insertions(+), 49 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f6ee32a60..44ff73632 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -10,7 +10,7 @@ * Added "virtual void WaitForJoin()" in class Env. Default operation is no-op. * Removed BackupEngine::DeleteBackupsNewerThan() function * Added new option -- verify_checksums_in_compaction -* Chagned Options.prefix_extractor from raw pointer to shared_ptr (take ownership) +* Changed Options.prefix_extractor from raw pointer to shared_ptr (take ownership) Changed HashSkipListRepFactory and HashLinkListRepFactory constructor to not take SliceTransform object (use Options.prefix_extractor implicitly) * Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools * Added a command "checkconsistency" in ldb tool, which checks diff --git a/db/db_impl.cc b/db/db_impl.cc index 67daecb48..e532d5b17 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -568,7 +568,6 @@ uint64_t DBImpl::TEST_Current_Manifest_FileNo() { Status DBImpl::NewDB() { VersionEdit new_db; - new_db.SetVersionNumber(); new_db.SetComparatorName(user_comparator()->Name()); new_db.SetLogNumber(0); new_db.SetNextFile(2); diff --git a/db/version_edit.cc b/db/version_edit.cc index f949a32ba..5c532b138 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -29,18 +29,15 @@ enum Tag { // these are new formats divergent from open source leveldb kNewFile2 = 100, // store smallest & largest seqno - kVersionNumber = 101, // manifest version number, available after 2.8 }; void VersionEdit::Clear() { - version_number_ = 0; comparator_.clear(); max_level_ = 0; log_number_ = 0; prev_log_number_ = 0; last_sequence_ = 0; next_file_number_ = 0; - has_version_number_ = false; has_comparator_ = false; has_log_number_ = false; has_prev_log_number_ = false; @@ -51,10 +48,6 @@ void VersionEdit::Clear() { } void VersionEdit::EncodeTo(std::string* dst) const { - if (has_version_number_) { - PutVarint32(dst, kVersionNumber); - PutVarint32(dst, version_number_); - } if (has_comparator_) { PutVarint32(dst, kComparator); PutLengthPrefixedSlice(dst, comparator_); @@ -133,14 +126,6 @@ Status VersionEdit::DecodeFrom(const Slice& src) { while (msg == nullptr && GetVarint32(&input, &tag)) { switch (tag) { - case kVersionNumber: - if (GetVarint32(&input, &version_number_)) { - has_version_number_ = true; - } else { - msg = "version number"; - } - break; - case kComparator: if (GetLengthPrefixedSlice(&input, &str)) { comparator_ = str.ToString(); diff --git a/db/version_edit.h b/db/version_edit.h index c1a3799f4..f54949fbf 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -46,10 +46,6 @@ class VersionEdit { void Clear(); - void SetVersionNumber() { - has_version_number_ = true; - version_number_ = kManifestVersion; - } void SetComparatorName(const Slice& name) { has_comparator_ = true; comparator_ = name.ToString(); @@ -114,13 +110,11 @@ class VersionEdit { bool GetLevel(Slice* input, int* level, const char** msg); int max_level_; - uint32_t version_number_; std::string comparator_; uint64_t log_number_; uint64_t prev_log_number_; uint64_t next_file_number_; SequenceNumber last_sequence_; - bool has_version_number_; bool has_comparator_; bool has_log_number_; bool has_prev_log_number_; @@ -129,10 +123,6 @@ class VersionEdit { DeletedFileSet deleted_files_; std::vector > new_files_; - - enum { - kManifestVersion = 1 - }; }; } // namespace rocksdb diff --git a/db/version_set.cc b/db/version_set.cc index 17ba77922..77275bdd8 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1768,8 +1768,6 @@ Status VersionSet::Recover() { return s; } - bool have_version_number = false; - bool log_number_decrease = false; bool have_log_number = false; bool have_prev_log_number = false; bool have_next_file = false; @@ -1810,17 +1808,15 @@ Status VersionSet::Recover() { builder.Apply(&edit); - if (edit.has_version_number_) { - have_version_number = true; - } - // Only a flush's edit or a new snapshot can write log number during // LogAndApply. Since memtables are flushed and inserted into // manifest_writers_ queue in order, the log number in MANIFEST file // should be monotonically increasing. if (edit.has_log_number_) { if (have_log_number && log_number >= edit.log_number_) { - log_number_decrease = true; + Log(options_->info_log, + "decreasing of log_number is detected " + "in MANIFEST\n"); } else { log_number = edit.log_number_; have_log_number = true; @@ -1842,20 +1838,6 @@ Status VersionSet::Recover() { have_last_sequence = true; } } - - if (s.ok() && log_number_decrease) { - // Since release 2.8, version number is added into MANIFEST file. - // Prior release 2.8, a bug in LogAndApply() can cause log_number - // to be smaller than the one from previous edit. To ensure backward - // compatibility, only fail for MANIFEST genearated by release 2.8 - // and after. - if (have_version_number) { - s = Status::Corruption("log number decreases"); - } else { - Log(options_->info_log, "decreasing of log_number is detected " - "in MANIFEST\n"); - } - } } if (s.ok()) { @@ -2125,7 +2107,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // Save metadata VersionEdit edit; - edit.SetVersionNumber(); edit.SetComparatorName(icmp_.user_comparator()->Name()); // Save files