Merge branch 'master' into jni

This commit is contained in:
Yueh-Hsuan Chiang 2014-04-03 17:14:10 -07:00
commit 3699fda6c7
10 changed files with 83 additions and 42 deletions

View File

@ -15,13 +15,12 @@
* Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools * Added Env::GetThreadPoolQueueLen(), which returns the waiting queue length of thread pools
* Added a command "checkconsistency" in ldb tool, which checks * Added a command "checkconsistency" in ldb tool, which checks
if file system state matches DB state (file existence and file sizes) if file system state matches DB state (file existence and file sizes)
* CompactionFilter::Context is now CompactionFilterContext. It is shared by CompactionFilter and CompactionFilterV2
### New Features ### New Features
* If we find one truncated record at the end of the MANIFEST or WAL files, * If we find one truncated record at the end of the MANIFEST or WAL files,
we will ignore it. We assume that writers of these records were interrupted we will ignore it. We assume that writers of these records were interrupted
and that we can safely ignore it. and that we can safely ignore it.
* Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. * Now compaction filter has a V2 interface. It buffers the kv-pairs sharing the same key prefix, process them in batches, and return the batched results back to DB. The new interface uses a new structure CompactionFilterContext for the same purpose as CompactionFilter::Context in V1.
* Geo-spatial support for locations and radial-search. * Geo-spatial support for locations and radial-search.
## 2.7.0 (01/28/2014) ## 2.7.0 (01/28/2014)

View File

@ -439,7 +439,8 @@ int main(int argc, char** argv) {
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(options, dbname, &err); rocksdb_destroy_db(options, dbname, &err);
rocksdb_options_set_filter_policy(options, rocksdb_filterpolicy_create_bloom(10)); rocksdb_filterpolicy_t* policy = rocksdb_filterpolicy_create_bloom(10);
rocksdb_options_set_filter_policy(options, policy);
rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3)); rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3));
rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4); rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4);
@ -477,6 +478,7 @@ int main(int argc, char** argv) {
rocksdb_iter_get_error(iter, &err); rocksdb_iter_get_error(iter, &err);
CheckNoError(err); CheckNoError(err);
rocksdb_iter_destroy(iter); rocksdb_iter_destroy(iter);
rocksdb_filterpolicy_destroy(policy);
} }
StartPhase("cleanup"); StartPhase("cleanup");

View File

@ -117,6 +117,14 @@ struct DBImpl::CompactionState {
total_bytes(0) { total_bytes(0) {
} }
// Create a client visible context of this compaction
CompactionFilter::Context GetFilterContextV1() {
CompactionFilter::Context context;
context.is_full_compaction = compaction->IsFullCompaction();
context.is_manual_compaction = compaction->IsManualCompaction();
return context;
}
// Create a client visible context of this compaction // Create a client visible context of this compaction
CompactionFilterContext GetFilterContext() { CompactionFilterContext GetFilterContext() {
CompactionFilterContext context; CompactionFilterContext context;
@ -2545,7 +2553,7 @@ Status DBImpl::ProcessKeyValueCompaction(
auto compaction_filter = options_.compaction_filter; auto compaction_filter = options_.compaction_filter;
std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr; std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
if (!compaction_filter) { if (!compaction_filter) {
auto context = compact->GetFilterContext(); auto context = compact->GetFilterContextV1();
compaction_filter_from_factory = compaction_filter_from_factory =
options_.compaction_filter_factory->CreateCompactionFilter(context); options_.compaction_filter_factory->CreateCompactionFilter(context);
compaction_filter = compaction_filter_from_factory.get(); compaction_filter = compaction_filter_from_factory.get();
@ -4026,6 +4034,9 @@ Status DBImpl::MakeRoomForWrite(bool force,
new_mem = new MemTable(internal_comparator_, options_); new_mem = new MemTable(internal_comparator_, options_);
new_superversion = new SuperVersion(); new_superversion = new SuperVersion();
} }
Log(options_.info_log,
"New memtable created with log file: #%lu\n",
(unsigned long)new_log_number);
} }
mutex_.Lock(); mutex_.Lock();
if (!s.ok()) { if (!s.ok()) {
@ -4043,9 +4054,6 @@ Status DBImpl::MakeRoomForWrite(bool force,
} }
mem_ = new_mem; mem_ = new_mem;
mem_->Ref(); mem_->Ref();
Log(options_.info_log,
"New memtable created with log file: #%lu\n",
(unsigned long)logfile_number_);
mem_->SetLogNumber(logfile_number_); mem_->SetLogNumber(logfile_number_);
force = false; // Do not force another compaction if have room force = false; // Do not force another compaction if have room
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();

View File

@ -2483,7 +2483,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
: check_context_(check_context) {} : check_context_(check_context) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilterContext& context) override { const CompactionFilter::Context& context) override {
if (check_context_) { if (check_context_) {
ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction); ASSERT_EQ(expect_full_compaction_.load(), context.is_full_compaction);
ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); ASSERT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction);
@ -2500,7 +2500,7 @@ class KeepFilterFactory : public CompactionFilterFactory {
class DeleteFilterFactory : public CompactionFilterFactory { class DeleteFilterFactory : public CompactionFilterFactory {
public: public:
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilterContext& context) override { const CompactionFilter::Context& context) override {
if (context.is_manual_compaction) { if (context.is_manual_compaction) {
return std::unique_ptr<CompactionFilter>(new DeleteFilter()); return std::unique_ptr<CompactionFilter>(new DeleteFilter());
} else { } else {
@ -2516,7 +2516,7 @@ class ChangeFilterFactory : public CompactionFilterFactory {
explicit ChangeFilterFactory() {} explicit ChangeFilterFactory() {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilterContext& context) override { const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(new ChangeFilter()); return std::unique_ptr<CompactionFilter>(new ChangeFilter());
} }

View File

@ -25,6 +25,8 @@ namespace rocksdb {
class Env; class Env;
struct FileMetaData; struct FileMetaData;
// TODO(sdong): try to come up with a better API to pass the file information
// other than simply passing FileMetaData.
class TableCache { class TableCache {
public: public:
TableCache(const std::string& dbname, const Options* options, TableCache(const std::string& dbname, const Options* options,

View File

@ -140,6 +140,18 @@ bool SomeFileOverlapsRange(
return !BeforeFile(ucmp, largest_user_key, files[index]); return !BeforeFile(ucmp, largest_user_key, files[index]);
} }
namespace {
// Used for LevelFileNumIterator to pass "block handle" value,
// which actually means file information in this iterator.
// It contains subset of fields of FileMetaData, that is sufficient
// for table cache to use.
struct EncodedFileMetaData {
uint64_t number; // file number
uint64_t file_size; // file size
Cache::Handle* table_reader_handle; // cached table reader's handler
};
} // namespace
// An internal iterator. For a given version/level pair, yields // An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key() // information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an // is the largest key that occurs in the file, and value() is an
@ -181,14 +193,19 @@ class Version::LevelFileNumIterator : public Iterator {
} }
Slice value() const { Slice value() const {
assert(Valid()); assert(Valid());
return Slice(reinterpret_cast<const char*>((*flist_)[index_]), auto* file_meta = (*flist_)[index_];
sizeof(FileMetaData)); current_value_.number = file_meta->number;
current_value_.file_size = file_meta->file_size;
current_value_.table_reader_handle = file_meta->table_reader_handle;
return Slice(reinterpret_cast<const char*>(&current_value_),
sizeof(EncodedFileMetaData));
} }
virtual Status status() const { return Status::OK(); } virtual Status status() const { return Status::OK(); }
private: private:
const InternalKeyComparator icmp_; const InternalKeyComparator icmp_;
const std::vector<FileMetaData*>* const flist_; const std::vector<FileMetaData*>* const flist_;
uint32_t index_; uint32_t index_;
mutable EncodedFileMetaData current_value_;
}; };
static Iterator* GetFileIterator(void* arg, const ReadOptions& options, static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
@ -196,7 +213,7 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
const InternalKeyComparator& icomparator, const InternalKeyComparator& icomparator,
const Slice& file_value, bool for_compaction) { const Slice& file_value, bool for_compaction) {
TableCache* cache = reinterpret_cast<TableCache*>(arg); TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != sizeof(FileMetaData)) { if (file_value.size() != sizeof(EncodedFileMetaData)) {
return NewErrorIterator( return NewErrorIterator(
Status::Corruption("FileReader invoked with unexpected value")); Status::Corruption("FileReader invoked with unexpected value"));
} else { } else {
@ -208,11 +225,13 @@ static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
options_copy.prefix = nullptr; options_copy.prefix = nullptr;
} }
const FileMetaData* meta_file = const EncodedFileMetaData* encoded_meta =
reinterpret_cast<const FileMetaData*>(file_value.data()); reinterpret_cast<const EncodedFileMetaData*>(file_value.data());
FileMetaData meta(encoded_meta->number, encoded_meta->file_size);
meta.table_reader_handle = encoded_meta->table_reader_handle;
return cache->NewIterator( return cache->NewIterator(
options.prefix ? options_copy : options, soptions, icomparator, options.prefix ? options_copy : options, soptions, icomparator, meta,
*meta_file, nullptr /* don't need reference to table*/, for_compaction); nullptr /* don't need reference to table*/, for_compaction);
} }
} }
@ -231,11 +250,13 @@ bool Version::PrefixMayMatch(const ReadOptions& options,
// key() will always be the biggest value for this SST? // key() will always be the biggest value for this SST?
may_match = true; may_match = true;
} else { } else {
const FileMetaData* meta_file = const EncodedFileMetaData* encoded_meta =
reinterpret_cast<const FileMetaData*>(level_iter->value().data()); reinterpret_cast<const EncodedFileMetaData*>(
level_iter->value().data());
may_match = vset_->table_cache_->PrefixMayMatch( FileMetaData meta(encoded_meta->number, encoded_meta->file_size);
options, vset_->icmp_, *meta_file, internal_prefix, nullptr); meta.table_reader_handle = encoded_meta->table_reader_handle;
may_match = vset_->table_cache_->PrefixMayMatch(options, vset_->icmp_, meta,
internal_prefix, nullptr);
} }
return may_match; return may_match;
} }

View File

@ -31,6 +31,15 @@ struct CompactionFilterContext {
class CompactionFilter { class CompactionFilter {
public: public:
// Context information of a compaction run
struct Context {
// Does this compaction run include all data files
bool is_full_compaction;
// Is this compaction requested by the client (true),
// or is it occurring as an automatic compaction process
bool is_manual_compaction;
};
virtual ~CompactionFilter() {} virtual ~CompactionFilter() {}
// The compaction process invokes this // The compaction process invokes this
@ -105,7 +114,7 @@ class CompactionFilterFactory {
virtual ~CompactionFilterFactory() { } virtual ~CompactionFilterFactory() { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilterContext& context) = 0; const CompactionFilter::Context& context) = 0;
// Returns a name that identifies this compaction filter factory. // Returns a name that identifies this compaction filter factory.
virtual const char* Name() const = 0; virtual const char* Name() const = 0;
@ -115,8 +124,8 @@ class CompactionFilterFactory {
// return any filter // return any filter
class DefaultCompactionFilterFactory : public CompactionFilterFactory { class DefaultCompactionFilterFactory : public CompactionFilterFactory {
public: public:
virtual std::unique_ptr<CompactionFilter> virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
CreateCompactionFilter(const CompactionFilterContext& context) override { const CompactionFilter::Context& context) override {
return std::unique_ptr<CompactionFilter>(nullptr); return std::unique_ptr<CompactionFilter>(nullptr);
} }

View File

@ -334,19 +334,8 @@ static bool isSSE42() {
#endif #endif
} }
typedef void (*Function)(uint64_t*, uint8_t const**); template<void (*CRC32)(uint64_t*, uint8_t const**)>
uint32_t ExtendImpl(uint32_t crc, const char* buf, size_t size) {
static inline Function Choose_CRC32() {
return isSSE42() ? Fast_CRC32 : Slow_CRC32;
}
static Function func = Choose_CRC32();
static inline void CRC32(uint64_t* l, uint8_t const **p) {
func(l, p);
}
uint32_t Extend(uint32_t crc, const char* buf, size_t size) {
const uint8_t *p = reinterpret_cast<const uint8_t *>(buf); const uint8_t *p = reinterpret_cast<const uint8_t *>(buf);
const uint8_t *e = p + size; const uint8_t *e = p + size;
uint64_t l = crc ^ 0xffffffffu; uint64_t l = crc ^ 0xffffffffu;
@ -388,5 +377,17 @@ uint32_t Extend(uint32_t crc, const char* buf, size_t size) {
return l ^ 0xffffffffu; return l ^ 0xffffffffu;
} }
typedef uint32_t (*Function)(uint32_t, const char*, size_t);
static inline Function Choose_Extend() {
return isSSE42() ? ExtendImpl<Fast_CRC32> : ExtendImpl<Slow_CRC32>;
}
Function ChosenExtend = Choose_Extend();
uint32_t Extend(uint32_t crc, const char* buf, size_t size) {
return ChosenExtend(crc, buf, size);
}
} // namespace crc32c } // namespace crc32c
} // namespace rocksdb } // namespace rocksdb

View File

@ -192,7 +192,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
user_comp_filter_factory_(comp_filter_factory) { } user_comp_filter_factory_(comp_filter_factory) { }
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilterContext& context) { const CompactionFilter::Context& context) {
return std::unique_ptr<TtlCompactionFilter>( return std::unique_ptr<TtlCompactionFilter>(
new TtlCompactionFilter( new TtlCompactionFilter(
ttl_, ttl_,

View File

@ -283,9 +283,8 @@ class TtlTest {
kNewValue_(kNewValue) { kNewValue_(kNewValue) {
} }
virtual std::unique_ptr<CompactionFilter> virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
CreateCompactionFilter( const CompactionFilter::Context& context) override {
const CompactionFilterContext& context) override {
return std::unique_ptr<CompactionFilter>( return std::unique_ptr<CompactionFilter>(
new TestFilter(kSampleSize_, kNewValue_)); new TestFilter(kSampleSize_, kNewValue_));
} }