diff --git a/Makefile b/Makefile index 606a92c1e..451b69057 100644 --- a/Makefile +++ b/Makefile @@ -104,6 +104,7 @@ TESTS = \ stringappend_test \ ttl_test \ backupable_db_test \ + document_db_test \ json_document_test \ version_edit_test \ version_set_test \ @@ -345,6 +346,9 @@ prefix_test: db/prefix_test.o $(LIBOBJECTS) $(TESTHARNESS) backupable_db_test: utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) utilities/backupable/backupable_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +document_db_test: utilities/document/document_db_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) utilities/document/document_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + json_document_test: utilities/document/json_document_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) utilities/document/json_document_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/write_batch.cc b/db/write_batch.cc index 734d1e376..113f8a218 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -236,6 +236,23 @@ void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); } +void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeDeletion)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSliceParts(&b->rep_, key); +} + +void WriteBatch::Delete(ColumnFamilyHandle* column_family, + const SliceParts& key) { + WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); +} + void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 85e85b33d..9a191f4cb 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -71,6 +71,9 @@ class WriteBatchInternal { static void Put(WriteBatch* batch, uint32_t column_family_id, const SliceParts& key, const SliceParts& value); + static void Delete(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key); + static void Delete(WriteBatch* batch, uint32_t column_family_id, const Slice& key); diff --git a/include/rocksdb/slice.h b/include/rocksdb/slice.h index 225371571..406a8abb9 100644 --- a/include/rocksdb/slice.h +++ b/include/rocksdb/slice.h @@ -107,6 +107,7 @@ class Slice { struct SliceParts { SliceParts(const Slice* _parts, int _num_parts) : parts(_parts), num_parts(_num_parts) { } + SliceParts() : parts(nullptr), num_parts(0) {} const Slice* parts; int num_parts; diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 74ee2ad16..3272fd2f9 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -67,6 +67,10 @@ class WriteBatch { void Delete(ColumnFamilyHandle* column_family, const Slice& key); void Delete(const Slice& key) { Delete(nullptr, key); } + // variant that takes SliceParts + void Delete(ColumnFamilyHandle* column_family, const SliceParts& key); + void Delete(const SliceParts& key) { Delete(nullptr, key); } + // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, // it will not be persisted to the SST files. When iterating over this diff --git a/include/utilities/document_db.h b/include/utilities/document_db.h new file mode 100644 index 000000000..8e072ad29 --- /dev/null +++ b/include/utilities/document_db.h @@ -0,0 +1,149 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include + +#include "utilities/stackable_db.h" +#include "utilities/json_document.h" +#include "rocksdb/db.h" + +namespace rocksdb { + +// IMPORTANT: DocumentDB is a work in progress. It is unstable and we might +// change the API without warning. Talk to RocksDB team before using this in +// production ;) + +// DocumentDB is a layer on top of RocksDB that provides a very simple JSON API. +// When creating a DB, you specify a list of indexes you want to keep on your +// data. You can insert a JSON document to the DB, which is automatically +// indexed. Every document added to the DB needs to have "_id" field which is +// automatically indexed and is an unique primary key. All other indexes are +// non-unique. + +// NOTE: field names in the JSON are NOT allowed to start with '$' or +// contain '.'. We don't currently enforce that rule, but will start behaving +// badly. + +// Cursor is what you get as a result of executing query. To get all +// results from a query, call Next() on a Cursor while Valid() returns true +class Cursor { + public: + Cursor() = default; + virtual ~Cursor() {} + + virtual bool Valid() const = 0; + virtual void Next() = 0; + // Lifecycle of the returned JSONDocument is until the next Next() call + virtual const JSONDocument& document() const = 0; + virtual Status status() const = 0; + + private: + // No copying allowed + Cursor(const Cursor&); + void operator=(const Cursor&); +}; + +struct DocumentDBOptions { + int background_threads = 4; + uint64_t memtable_size = 128 * 1024 * 1024; // 128 MB + uint64_t cache_size = 1 * 1024 * 1024 * 1024; // 1 GB +}; + +// TODO(icanadi) Add `JSONDocument* info` parameter to all calls that can be +// used by the caller to get more information about the call execution (number +// of dropped records, number of updated records, etc.) +class DocumentDB : public StackableDB { + public: + struct IndexDescriptor { + // Currently, you can only define an index on a single field. To specify an + // index on a field X, set index description to JSON "{X: 1}" + // Currently the value needs to be 1, which means ascending. + // In the future, we plan to also support indexes on multiple keys, where + // you could mix ascending sorting (1) with descending sorting indexes (-1) + JSONDocument* description; + std::string name; + }; + + // Open DocumentDB with specified indexes. The list of indexes has to be + // complete, i.e. include all indexes present in the DB, except the primary + // key index. + // Otherwise, Open() will return an error + static Status Open(const DocumentDBOptions& options, const std::string& name, + const std::vector& indexes, + DocumentDB** db, bool read_only = false); + + explicit DocumentDB(DB* db) : StackableDB(db) {} + + // Create a new index. It will stop all writes for the duration of the call. + // All current documents in the DB are scanned and corresponding index entries + // are created + virtual Status CreateIndex(const WriteOptions& write_options, + const IndexDescriptor& index) = 0; + + // Drop an index. Client is responsible to make sure that index is not being + // used by currently executing queries + virtual Status DropIndex(const std::string& name) = 0; + + // Insert a document to the DB. The document needs to have a primary key "_id" + // which can either be a string or an integer. Otherwise the write will fail + // with InvalidArgument. + virtual Status Insert(const WriteOptions& options, + const JSONDocument& document) = 0; + + // Deletes all documents matching a filter atomically + virtual Status Remove(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& query) = 0; + + // Does this sequence of operations: + // 1. Find all documents matching a filter + // 2. For all documents, atomically: + // 2.1. apply the update operators + // 2.2. update the secondary indexes + // + // Currently only $set update operator is supported. + // Syntax is: {$set: {key1: value1, key2: value2, etc...}} + // This operator will change a document's key1 field to value1, key2 to + // value2, etc. New values will be set even if a document didn't have an entry + // for the specified key. + // + // You can not change a primary key of a document. + // + // Update example: Update({id: {$gt: 5}, $index: id}, {$set: {enabled: true}}) + virtual Status Update(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& filter, + const JSONDocument& updates) = 0; + + // query has to be an array in which every element is an operator. Currently + // only $filter operator is supported. Syntax of $filter operator is: + // {$filter: {key1: condition1, key2: condition2, etc.}} where conditions can + // be either: + // 1) a single value in which case the condition is equality condition, or + // 2) a defined operators, like {$gt: 4}, which will match all documents that + // have key greater than 4. + // + // Supported operators are: + // 1) $gt -- greater than + // 2) $gte -- greater than or equal + // 3) $lt -- less than + // 4) $lte -- less than or equal + // If you want the filter to use an index, you need to specify it like this: + // {$filter: {...(conditions)..., $index: index_name}} + // + // Example query: + // * [{$filter: {name: John, age: {$gte: 18}, $index: age}}] + // will return all Johns whose age is greater or equal to 18 and it will use + // index "age" to satisfy the query. + virtual Cursor* Query(const ReadOptions& read_options, + const JSONDocument& query) = 0; +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/include/utilities/json_document.h b/include/utilities/json_document.h index daac5a64f..ceb058cf9 100644 --- a/include/utilities/json_document.h +++ b/include/utilities/json_document.h @@ -99,6 +99,8 @@ class JSONDocument { bool operator==(const JSONDocument& rhs) const; + std::string DebugString() const; + private: class ItemsIteratorGenerator; diff --git a/utilities/document/document_db.cc b/utilities/document/document_db.cc new file mode 100644 index 000000000..b30bdf830 --- /dev/null +++ b/utilities/document/document_db.cc @@ -0,0 +1,1141 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include "utilities/document_db.h" + +#include "rocksdb/cache.h" +#include "rocksdb/filter_policy.h" +#include "rocksdb/comparator.h" +#include "rocksdb/db.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/mutexlock.h" +#include "port/port.h" +#include "utilities/json_document.h" + +namespace rocksdb { + +// IMPORTANT NOTE: Secondary index column families should be very small and +// generally fit in memory. Assume that accessing secondary index column +// families is much faster than accessing primary index (data heap) column +// family. Accessing a key (i.e. checking for existance) from a column family in +// RocksDB is not much faster than accessing both key and value since they are +// kept together and loaded from storage together. + +namespace { +// < 0 <=> lhs < rhs +// == 0 <=> lhs == rhs +// > 0 <=> lhs == rhs +// TODO(icanadi) move this to JSONDocument? +int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) { + assert(rhs.IsObject() == false && rhs.IsObject() == false && + lhs.type() == rhs.type()); + + switch (lhs.type()) { + case JSONDocument::kNull: + return 0; + case JSONDocument::kBool: + return static_cast(lhs.GetBool()) - static_cast(rhs.GetBool()); + case JSONDocument::kDouble: { + double res = lhs.GetDouble() - rhs.GetDouble(); + return res == 0.0 ? 0 : (res < 0.0 ? -1 : 1); + } + case JSONDocument::kInt64: { + int64_t res = lhs.GetInt64() - rhs.GetInt64(); + return res == 0 ? 0 : (res < 0 ? -1 : 1); + } + case JSONDocument::kString: + return Slice(lhs.GetString()).compare(Slice(rhs.GetString())); + default: + assert(false); + } + return 0; +} +} // namespace + +class Filter { + public: + // returns nullptr on parse failure + static Filter* ParseFilter(const JSONDocument& filter); + + struct Interval { + const JSONDocument* upper_bound; + const JSONDocument* lower_bound; + bool upper_inclusive; + bool lower_inclusive; + Interval() + : upper_bound(nullptr), + lower_bound(nullptr), + upper_inclusive(false), + lower_inclusive(false) {} + Interval(const JSONDocument* ub, const JSONDocument* lb, bool ui, bool li) + : upper_bound(ub), + lower_bound(lb), + upper_inclusive(ui), + lower_inclusive(li) {} + void UpdateUpperBound(const JSONDocument* ub, bool inclusive); + void UpdateLowerBound(const JSONDocument* lb, bool inclusive); + }; + + bool SatisfiesFilter(const JSONDocument& document) const; + const Interval* GetInterval(const std::string& field) const; + + private: + explicit Filter(const JSONDocument& filter) : filter_(filter) {} + + // copied from the parameter + const JSONDocument filter_; + // upper_bound and lower_bound point to JSONDocuments in filter_, so no need + // to free them + // constant after construction + std::unordered_map intervals_; +}; + +void Filter::Interval::UpdateUpperBound(const JSONDocument* ub, + bool inclusive) { + bool update = (upper_bound == nullptr); + if (!update) { + int cmp = DocumentCompare(*upper_bound, *ub); + update = (cmp > 0) || (cmp == 0 && !inclusive); + } + if (update) { + upper_bound = ub; + upper_inclusive = inclusive; + } +} + +void Filter::Interval::UpdateLowerBound(const JSONDocument* lb, + bool inclusive) { + bool update = (lower_bound == nullptr); + if (!update) { + int cmp = DocumentCompare(*lower_bound, *lb); + update = (cmp < 0) || (cmp == 0 && !inclusive); + } + if (update) { + lower_bound = lb; + lower_inclusive = inclusive; + } +} + +Filter* Filter::ParseFilter(const JSONDocument& filter) { + if (filter.IsObject() == false) { + return nullptr; + } + + std::unique_ptr f(new Filter(filter)); + + for (const auto& items : f->filter_.Items()) { + if (items.first.size() && items.first[0] == '$') { + // fields starting with '$' are commands + continue; + } + assert(f->intervals_.find(items.first) == f->intervals_.end()); + if (items.second->IsObject()) { + if (items.second->Count() == 0) { + // uhm...? + return nullptr; + } + Interval interval; + for (const auto& condition : items.second->Items()) { + if (condition.second->IsObject() || condition.second->IsArray()) { + // comparison operators not defined on objects. invalid array + return nullptr; + } + // comparison operators: + if (condition.first == "$gt") { + interval.UpdateLowerBound(condition.second, false); + } else if (condition.first == "$gte") { + interval.UpdateLowerBound(condition.second, true); + } else if (condition.first == "$lt") { + interval.UpdateUpperBound(condition.second, false); + } else if (condition.first == "$lte") { + interval.UpdateUpperBound(condition.second, true); + } else { + // TODO(icanadi) more logical operators + return nullptr; + } + } + f->intervals_.insert({items.first, interval}); + } else { + // equality + f->intervals_.insert( + {items.first, Interval(items.second, items.second, true, true)}); + } + } + + return f.release(); +} + +const Filter::Interval* Filter::GetInterval(const std::string& field) const { + auto itr = intervals_.find(field); + if (itr == intervals_.end()) { + return nullptr; + } + // we can do that since intervals_ is constant after construction + return &itr->second; +} + +bool Filter::SatisfiesFilter(const JSONDocument& document) const { + for (const auto& interval : intervals_) { + auto value = document.Get(interval.first); + if (value == nullptr) { + // doesn't have the value, doesn't satisfy the filter + // (we don't support null queries yet) + return false; + } + if (interval.second.upper_bound != nullptr) { + if (value->type() != interval.second.upper_bound->type()) { + // no cross-type queries yet + // TODO(icanadi) do this at least for numbers! + return false; + } + int cmp = DocumentCompare(*interval.second.upper_bound, *value); + if (cmp < 0 || (cmp == 0 && interval.second.upper_inclusive == false)) { + // bigger (or equal) than upper bound + return false; + } + } + if (interval.second.lower_bound != nullptr) { + if (value->type() != interval.second.lower_bound->type()) { + // no cross-type queries yet + return false; + } + int cmp = DocumentCompare(*interval.second.lower_bound, *value); + if (cmp > 0 || (cmp == 0 && interval.second.lower_inclusive == false)) { + // smaller (or equal) than the lower bound + return false; + } + } + } + return true; +} + +class Index { + public: + Index() = default; + virtual ~Index() {} + + virtual const char* Name() const = 0; + + // Functions that are executed during write time + // --------------------------------------------- + // GetIndexKey() generates a key that will be used to index document and + // returns the key though the second std::string* parameter + virtual void GetIndexKey(const JSONDocument& document, + std::string* key) const = 0; + // Keys generated with GetIndexKey() will be compared using this comparator. + // It should be assumed that there will be a suffix added to the index key + // according to IndexKey implementation + virtual const Comparator* GetComparator() const = 0; + + // Functions that are executed during query time + // --------------------------------------------- + enum Direction { + kForwards, + kBackwards, + }; + // Returns true if this index can provide some optimization for satisfying + // filter. False otherwise + virtual bool UsefulIndex(const Filter& filter) const = 0; + // For every filter (assuming UsefulIndex()) there is a continuous interval of + // keys in the index that satisfy the index conditions. That interval can be + // three things: + // * [A, B] + // * [A, infinity> + // * <-infinity, B] + // + // Query engine that uses this Index for optimization will access the interval + // by first calling Position() and then iterating in the Direction (returned + // by Position()) while ShouldContinueLooking() is true. + // * For [A, B] interval Position() will Seek() to A and return kForwards. + // ShouldContinueLooking() will be true until the iterator value gets beyond B + // -- then it will return false + // * For [A, infinity> Position() will Seek() to A and return kForwards. + // ShouldContinueLooking() will always return true + // * For <-infinity, B] Position() will Seek() to B and return kBackwards. + // ShouldContinueLooking() will always return true (given that iterator is + // advanced by calling Prev()) + virtual Direction Position(const Filter& filter, + Iterator* iterator) const = 0; + virtual bool ShouldContinueLooking(const Filter& filter, + const Slice& secondary_key, + Direction direction) const = 0; + + // Static function that is executed when Index is created + // --------------------------------------------- + // Create Index from user-supplied description. Return nullptr on parse + // failure. + static Index* CreateIndexFromDescription(const JSONDocument& description, + const std::string& name); + + private: + // No copying allowed + Index(const Index&); + void operator=(const Index&); +}; + +// Encoding helper function +namespace { +std::string InternalSecondaryIndexName(const std::string& user_name) { + return "index_" + user_name; +} + +// Don't change these, they are persisted in secondary indexes +enum JSONPrimitivesEncoding : char { + kNull = 0x1, + kBool = 0x2, + kDouble = 0x3, + kInt64 = 0x4, + kString = 0x5, +}; + +// encodes simple JSON members (meaning string, integer, etc) +// the end result of this will be lexicographically compared to each other +bool EncodeJSONPrimitive(const JSONDocument& json, std::string* dst) { + // TODO(icanadi) revise this at some point, have a custom comparator + switch (json.type()) { + case JSONDocument::kNull: + dst->push_back(kNull); + break; + case JSONDocument::kBool: + dst->push_back(kBool); + dst->push_back(static_cast(json.GetBool())); + break; + case JSONDocument::kDouble: + dst->push_back(kDouble); + PutFixed64(dst, static_cast(json.GetDouble())); + break; + case JSONDocument::kInt64: + dst->push_back(kInt64); + // TODO(icanadi) oops, this will not work correctly for negative numbers + PutFixed64(dst, static_cast(json.GetInt64())); + break; + case JSONDocument::kString: + dst->push_back(kString); + dst->append(json.GetString()); + break; + default: + return false; + } + return true; +} + +} // namespace + +// format of the secondary key is: +// +class IndexKey { + public: + IndexKey() : ok_(false) {} + explicit IndexKey(const Slice& slice) { + if (slice.size() < sizeof(uint32_t)) { + ok_ = false; + return; + } + uint32_t primary_key_offset = + DecodeFixed32(slice.data() + slice.size() - sizeof(uint32_t)); + if (primary_key_offset >= slice.size() - sizeof(uint32_t)) { + ok_ = false; + return; + } + parts_[0] = Slice(slice.data(), primary_key_offset); + parts_[1] = Slice(slice.data() + primary_key_offset, + slice.size() - primary_key_offset - sizeof(uint32_t)); + ok_ = true; + } + IndexKey(const Slice& secondary_key, const Slice& primary_key) : ok_(true) { + parts_[0] = secondary_key; + parts_[1] = primary_key; + } + + SliceParts GetSliceParts() { + uint32_t primary_key_offset = static_cast(parts_[0].size()); + EncodeFixed32(primary_key_offset_buf_, primary_key_offset); + parts_[2] = Slice(primary_key_offset_buf_, sizeof(uint32_t)); + return SliceParts(parts_, 3); + } + + const Slice& GetPrimaryKey() const { return parts_[1]; } + const Slice& GetSecondaryKey() const { return parts_[0]; } + + bool ok() const { return ok_; } + + private: + bool ok_; + // 0 -- secondary key + // 1 -- primary key + // 2 -- primary key offset + Slice parts_[3]; + char primary_key_offset_buf_[sizeof(uint32_t)]; +}; + +class SimpleSortedIndex : public Index { + public: + SimpleSortedIndex(const std::string field, const std::string& name) + : field_(field), name_(name) {} + + virtual const char* Name() const override { return name_.c_str(); } + + virtual void GetIndexKey(const JSONDocument& document, std::string* key) const + override { + auto value = document.Get(field_); + if (value == nullptr) { + // null + if (!EncodeJSONPrimitive(JSONDocument(JSONDocument::kNull), key)) { + assert(false); + } + } + if (!EncodeJSONPrimitive(*value, key)) { + assert(false); + } + } + virtual const Comparator* GetComparator() const override { + return BytewiseComparator(); + } + + virtual bool UsefulIndex(const Filter& filter) const { + return filter.GetInterval(field_) != nullptr; + } + // REQUIRES: UsefulIndex(filter) == true + virtual Direction Position(const Filter& filter, Iterator* iterator) const { + auto interval = filter.GetInterval(field_); + assert(interval != nullptr); // because index is useful + Direction direction; + + std::string op; + const JSONDocument* limit; + if (interval->lower_bound != nullptr) { + limit = interval->lower_bound; + direction = kForwards; + } else { + limit = interval->upper_bound; + direction = kBackwards; + } + + std::string encoded_limit; + if (!EncodeJSONPrimitive(*limit, &encoded_limit)) { + assert(false); + } + iterator->Seek(Slice(encoded_limit)); + + return direction; + } + // REQUIRES: UsefulIndex(filter) == true + virtual bool ShouldContinueLooking(const Filter& filter, + const Slice& secondary_key, + Index::Direction direction) const { + auto interval = filter.GetInterval(field_); + assert(interval != nullptr); // because index is useful + + if (direction == kForwards) { + if (interval->upper_bound == nullptr) { + // continue looking, no upper bound + return true; + } + std::string encoded_upper_bound; + if (!EncodeJSONPrimitive(*interval->upper_bound, &encoded_upper_bound)) { + // uhm...? + // TODO(icanadi) store encoded upper and lower bounds in Filter*? + assert(false); + } + // TODO(icanadi) we need to somehow decode this and use DocumentCompare() + int compare = secondary_key.compare(Slice(encoded_upper_bound)); + // if (current key is bigger than upper bound) OR (current key is equal to + // upper bound, but inclusive is false) THEN stop looking. otherwise, + // continue + return (compare > 0 || + (compare == 0 && interval->upper_inclusive == false)) + ? false + : true; + } else { + assert(direction == kBackwards); + if (interval->lower_bound == nullptr) { + // continue looking, no lower bound + return true; + } + std::string encoded_lower_bound; + if (!EncodeJSONPrimitive(*interval->lower_bound, &encoded_lower_bound)) { + // uhm...? + // TODO(icanadi) store encoded upper and lower bounds in Filter*? + assert(false); + } + // TODO(icanadi) we need to somehow decode this and use DocumentCompare() + int compare = secondary_key.compare(Slice(encoded_lower_bound)); + // if (current key is smaller than lower bound) OR (current key is equal + // to lower bound, but inclusive is false) THEN stop looking. otherwise, + // continue + return (compare < 0 || + (compare == 0 && interval->lower_inclusive == false)) + ? false + : true; + } + + assert(false); + // this is here just so compiler doesn't complain + return false; + } + + private: + std::string field_; + std::string name_; +}; + +Index* Index::CreateIndexFromDescription(const JSONDocument& description, + const std::string& name) { + if (!description.IsObject() || description.Count() != 1) { + // not supported yet + return nullptr; + } + const auto& field = *description.Items().begin(); + if (field.second->IsInt64() == false || field.second->GetInt64() != 1) { + // not supported yet + return nullptr; + } + return new SimpleSortedIndex(field.first, name); +} + +class CursorWithFilterIndexed : public Cursor { + public: + CursorWithFilterIndexed(Iterator* primary_index_iter, + Iterator* secondary_index_iter, const Index* index, + const Filter* filter) + : primary_index_iter_(primary_index_iter), + secondary_index_iter_(secondary_index_iter), + index_(index), + filter_(filter), + valid_(true), + current_json_document_(nullptr) { + assert(filter_.get() != nullptr); + direction_ = index->Position(*filter_.get(), secondary_index_iter_.get()); + UpdateIndexKey(); + AdvanceUntilSatisfies(); + } + + virtual bool Valid() const override { + return valid_ && secondary_index_iter_->Valid(); + } + virtual void Next() { + assert(Valid()); + Advance(); + AdvanceUntilSatisfies(); + } + // temporary object. copy it if you want to use it + virtual const JSONDocument& document() const { + assert(Valid()); + return *current_json_document_; + } + virtual Status status() const { + if (!status_.ok()) { + return status_; + } + if (!primary_index_iter_->status().ok()) { + return primary_index_iter_->status(); + } + return secondary_index_iter_->status(); + } + + private: + void Advance() { + if (direction_ == Index::kForwards) { + secondary_index_iter_->Next(); + } else { + secondary_index_iter_->Prev(); + } + UpdateIndexKey(); + } + void AdvanceUntilSatisfies() { + bool found = false; + while (secondary_index_iter_->Valid() && + index_->ShouldContinueLooking( + *filter_.get(), index_key_.GetSecondaryKey(), direction_)) { + if (!UpdateJSONDocument()) { + // corruption happened + return; + } + if (filter_->SatisfiesFilter(*current_json_document_)) { + // we found satisfied! + found = true; + break; + } else { + // doesn't satisfy :( + Advance(); + } + } + if (!found) { + valid_ = false; + } + } + + bool UpdateJSONDocument() { + assert(secondary_index_iter_->Valid()); + primary_index_iter_->Seek(index_key_.GetPrimaryKey()); + if (!primary_index_iter_->Valid()) { + status_ = Status::Corruption( + "Inconsistency between primary and secondary index"); + valid_ = false; + return false; + } + current_json_document_.reset( + JSONDocument::Deserialize(primary_index_iter_->value())); + if (current_json_document_.get() == nullptr) { + status_ = Status::Corruption("JSON deserialization failed"); + valid_ = false; + return false; + } + return true; + } + void UpdateIndexKey() { + if (secondary_index_iter_->Valid()) { + index_key_ = IndexKey(secondary_index_iter_->key()); + if (!index_key_.ok()) { + status_ = Status::Corruption("Invalid index key"); + valid_ = false; + } + } + } + std::unique_ptr primary_index_iter_; + std::unique_ptr secondary_index_iter_; + // we don't own index_ + const Index* index_; + Index::Direction direction_; + std::unique_ptr filter_; + bool valid_; + IndexKey index_key_; + std::unique_ptr current_json_document_; + Status status_; +}; + +class CursorFromIterator : public Cursor { + public: + explicit CursorFromIterator(Iterator* iter) + : iter_(iter), current_json_document_(nullptr) { + iter_->SeekToFirst(); + UpdateCurrentJSON(); + } + + virtual bool Valid() const override { return status_.ok() && iter_->Valid(); } + virtual void Next() override { + iter_->Next(); + UpdateCurrentJSON(); + } + virtual const JSONDocument& document() const override { + assert(Valid()); + return *current_json_document_; + }; + virtual Status status() const override { + if (!status_.ok()) { + return status_; + } + return iter_->status(); + } + + // not part of public Cursor interface + Slice key() const { return iter_->key(); } + + private: + void UpdateCurrentJSON() { + if (Valid()) { + current_json_document_.reset(JSONDocument::Deserialize(iter_->value())); + if (current_json_document_.get() == nullptr) { + status_ = Status::Corruption("JSON deserialization failed"); + } + } + } + + Status status_; + std::unique_ptr iter_; + std::unique_ptr current_json_document_; +}; + +class CursorWithFilter : public Cursor { + public: + CursorWithFilter(Cursor* base_cursor, const Filter* filter) + : base_cursor_(base_cursor), filter_(filter) { + assert(filter_.get() != nullptr); + SeekToNextSatisfies(); + } + virtual bool Valid() const override { return base_cursor_->Valid(); } + virtual void Next() override { + assert(Valid()); + base_cursor_->Next(); + SeekToNextSatisfies(); + } + virtual const JSONDocument& document() const override { + assert(Valid()); + return base_cursor_->document(); + } + virtual Status status() const override { return base_cursor_->status(); } + + private: + void SeekToNextSatisfies() { + for (; base_cursor_->Valid(); base_cursor_->Next()) { + if (filter_->SatisfiesFilter(base_cursor_->document())) { + break; + } + } + } + std::unique_ptr base_cursor_; + std::unique_ptr filter_; +}; + +class CursorError : public Cursor { + public: + explicit CursorError(Status s) : s_(s) { assert(!s.ok()); } + virtual Status status() const override { return s_; } + virtual bool Valid() const override { return false; } + virtual void Next() override {} + virtual const JSONDocument& document() const override { + assert(false); + // compiler complains otherwise + return trash_; + } + + private: + Status s_; + JSONDocument trash_; +}; + +class DocumentDBImpl : public DocumentDB { + public: + DocumentDBImpl( + DB* db, ColumnFamilyHandle* primary_key_column_family, + const std::vector>& indexes, + const Options& rocksdb_options) + : DocumentDB(db), + primary_key_column_family_(primary_key_column_family), + rocksdb_options_(rocksdb_options) { + for (const auto& index : indexes) { + name_to_index_.insert( + {index.first->Name(), IndexColumnFamily(index.first, index.second)}); + } + } + + ~DocumentDBImpl() { + for (auto& iter : name_to_index_) { + delete iter.second.index; + delete iter.second.column_family; + } + delete primary_key_column_family_; + } + + virtual Status CreateIndex(const WriteOptions& write_options, + const IndexDescriptor& index) { + auto index_obj = + Index::CreateIndexFromDescription(*index.description, index.name); + if (index_obj == nullptr) { + return Status::InvalidArgument("Failed parsing index description"); + } + + ColumnFamilyHandle* cf_handle; + Status s = + CreateColumnFamily(ColumnFamilyOptions(rocksdb_options_), + InternalSecondaryIndexName(index.name), &cf_handle); + if (!s.ok()) { + return s; + } + + MutexLock l(&write_mutex_); + + std::unique_ptr cursor(new CursorFromIterator( + DocumentDB::NewIterator(ReadOptions(), primary_key_column_family_))); + + WriteBatch batch; + for (; cursor->Valid(); cursor->Next()) { + std::string secondary_index_key; + index_obj->GetIndexKey(cursor->document(), &secondary_index_key); + IndexKey index_key(Slice(secondary_index_key), cursor->key()); + batch.Put(cf_handle, index_key.GetSliceParts(), SliceParts()); + } + + if (!cursor->status().ok()) { + delete index_obj; + return cursor->status(); + } + + { + MutexLock l_nti(&name_to_index_mutex_); + name_to_index_.insert( + {index.name, IndexColumnFamily(index_obj, cf_handle)}); + } + + return DocumentDB::Write(write_options, &batch); + } + + virtual Status DropIndex(const std::string& name) { + MutexLock l(&write_mutex_); + + auto index_iter = name_to_index_.find(name); + if (index_iter == name_to_index_.end()) { + return Status::InvalidArgument("No such index"); + } + + Status s = DropColumnFamily(index_iter->second.column_family); + if (!s.ok()) { + return s; + } + + delete index_iter->second.index; + delete index_iter->second.column_family; + + // remove from name_to_index_ + { + MutexLock l_nti(&name_to_index_mutex_); + name_to_index_.erase(index_iter); + } + + return Status::OK(); + } + + virtual Status Insert(const WriteOptions& options, + const JSONDocument& document) { + WriteBatch batch; + + if (!document.IsObject()) { + return Status::InvalidArgument("Document not an object"); + } + auto primary_key = document.Get(kPrimaryKey); + if (primary_key == nullptr || primary_key->IsNull() || + (!primary_key->IsString() && !primary_key->IsInt64())) { + return Status::InvalidArgument( + "No primary key or primary key format error"); + } + std::string encoded_document; + document.Serialize(&encoded_document); + std::string primary_key_encoded; + if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { + // previous call should be guaranteed to pass because of all primary_key + // conditions checked before + assert(false); + } + Slice primary_key_slice(primary_key_encoded); + + // Lock now, since we're starting DB operations + MutexLock l(&write_mutex_); + // check if there is already a document with the same primary key + std::string value; + Status s = DocumentDB::Get(ReadOptions(), primary_key_column_family_, + primary_key_slice, &value); + if (!s.IsNotFound()) { + return s.ok() ? Status::InvalidArgument("Duplicate primary key!") : s; + } + + batch.Put(primary_key_column_family_, primary_key_slice, encoded_document); + + for (const auto& iter : name_to_index_) { + std::string secondary_index_key; + iter.second.index->GetIndexKey(document, &secondary_index_key); + IndexKey index_key(Slice(secondary_index_key), primary_key_slice); + batch.Put(iter.second.column_family, index_key.GetSliceParts(), + SliceParts()); + } + + return DocumentDB::Write(options, &batch); + } + + virtual Status Remove(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& query) override { + MutexLock l(&write_mutex_); + std::unique_ptr cursor( + ConstructFilterCursor(read_options, nullptr, query)); + + WriteBatch batch; + for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { + const auto& document = cursor->document(); + if (!document.IsObject()) { + return Status::Corruption("Document corruption"); + } + auto primary_key = document.Get(kPrimaryKey); + if (primary_key == nullptr || primary_key->IsNull() || + (!primary_key->IsString() && !primary_key->IsInt64())) { + return Status::Corruption("Document corruption"); + } + + // TODO(icanadi) Instead of doing this, just get primary key encoding from + // cursor, as it already has this information + std::string primary_key_encoded; + if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { + // previous call should be guaranteed to pass because of all primary_key + // conditions checked before + assert(false); + } + Slice primary_key_slice(primary_key_encoded); + batch.Delete(primary_key_column_family_, primary_key_slice); + + for (const auto& iter : name_to_index_) { + std::string secondary_index_key; + iter.second.index->GetIndexKey(document, &secondary_index_key); + IndexKey index_key(Slice(secondary_index_key), primary_key_slice); + batch.Delete(iter.second.column_family, index_key.GetSliceParts()); + } + } + + if (!cursor->status().ok()) { + return cursor->status(); + } + + return DocumentDB::Write(write_options, &batch); + } + + virtual Status Update(const ReadOptions& read_options, + const WriteOptions& write_options, + const JSONDocument& filter, + const JSONDocument& updates) { + MutexLock l(&write_mutex_); + std::unique_ptr cursor( + ConstructFilterCursor(read_options, nullptr, filter)); + + WriteBatch batch; + for (; cursor->status().ok() && cursor->Valid(); cursor->Next()) { + const auto& old_document = cursor->document(); + JSONDocument new_document(old_document); + if (!new_document.IsObject()) { + return Status::Corruption("Document corruption"); + } + // TODO(icanadi) Make this nicer, something like class Filter + for (const auto& update : updates.Items()) { + if (update.first == "$set") { + for (const auto& itr : update.second->Items()) { + if (itr.first == kPrimaryKey) { + return Status::NotSupported("Please don't change primary key"); + } + new_document.Set(itr.first, *itr.second); + } + } else { + // TODO(icanadi) more commands + return Status::InvalidArgument("Can't understand update command"); + } + } + + // TODO(icanadi) reuse some of this code + auto primary_key = new_document.Get(kPrimaryKey); + if (primary_key == nullptr || primary_key->IsNull() || + (!primary_key->IsString() && !primary_key->IsInt64())) { + // This will happen when document on storage doesn't have primary key, + // since we don't support any update operations on primary key. That's + // why this is corruption error + return Status::Corruption("Corrupted document -- primary key missing"); + } + std::string encoded_document; + new_document.Serialize(&encoded_document); + std::string primary_key_encoded; + if (!EncodeJSONPrimitive(*primary_key, &primary_key_encoded)) { + // previous call should be guaranteed to pass because of all primary_key + // conditions checked before + assert(false); + } + Slice primary_key_slice(primary_key_encoded); + batch.Put(primary_key_column_family_, primary_key_slice, + encoded_document); + + for (const auto& iter : name_to_index_) { + std::string old_key, new_key; + iter.second.index->GetIndexKey(old_document, &old_key); + iter.second.index->GetIndexKey(new_document, &new_key); + if (old_key == new_key) { + // don't need to update this secondary index + continue; + } + + IndexKey old_index_key(Slice(old_key), primary_key_slice); + IndexKey new_index_key(Slice(new_key), primary_key_slice); + + batch.Delete(iter.second.column_family, old_index_key.GetSliceParts()); + batch.Put(iter.second.column_family, new_index_key.GetSliceParts(), + SliceParts()); + } + } + + if (!cursor->status().ok()) { + return cursor->status(); + } + + return DocumentDB::Write(write_options, &batch); + } + + virtual Cursor* Query(const ReadOptions& read_options, + const JSONDocument& query) override { + Cursor* cursor = nullptr; + + if (!query.IsArray()) { + return new CursorError( + Status::InvalidArgument("Query has to be an array")); + } + + // TODO(icanadi) support index "_id" + for (size_t i = 0; i < query.Count(); ++i) { + const auto& command_doc = query[i]; + if (command_doc.Count() != 1) { + // there can be only one key-value pair in each of array elements. + // key is the command and value are the params + delete cursor; + return new CursorError(Status::InvalidArgument("Invalid query")); + } + const auto& command = *command_doc.Items().begin(); + + if (command.first == "$filter") { + cursor = ConstructFilterCursor(read_options, cursor, *command.second); + } else { + // only filter is supported for now + delete cursor; + return new CursorError(Status::InvalidArgument("Invalid query")); + } + } + + if (cursor == nullptr) { + cursor = new CursorFromIterator( + DocumentDB::NewIterator(read_options, primary_key_column_family_)); + } + + return cursor; + } + + // RocksDB functions + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + std::string* value) override { + return Status::NotSupported(""); + } + virtual Status Write(const WriteOptions& options, + WriteBatch* updates) override { + return Status::NotSupported(""); + } + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override { + return nullptr; + } + + private: + Cursor* ConstructFilterCursor(ReadOptions read_options, Cursor* cursor, + const JSONDocument& query) { + std::unique_ptr filter(Filter::ParseFilter(query)); + if (filter.get() == nullptr) { + return new CursorError(Status::InvalidArgument("Invalid query")); + } + + IndexColumnFamily tmp_storage(nullptr, nullptr); + + if (cursor == nullptr) { + auto index_name = query.Get("$index"); + IndexColumnFamily* index_column_family = nullptr; + if (index_name != nullptr && index_name->IsString()) { + { + MutexLock l(&name_to_index_mutex_); + auto index_iter = name_to_index_.find(index_name->GetString()); + if (index_iter != name_to_index_.end()) { + tmp_storage = index_iter->second; + index_column_family = &tmp_storage; + } else { + return new CursorError( + Status::InvalidArgument("Index does not exist")); + } + } + } + + if (index_column_family != nullptr && + index_column_family->index->UsefulIndex(*filter.get())) { + std::vector iterators; + Status s = DocumentDB::NewIterators( + read_options, + {primary_key_column_family_, index_column_family->column_family}, + &iterators); + if (!s.ok()) { + delete cursor; + return new CursorError(s); + } + assert(iterators.size() == 2); + return new CursorWithFilterIndexed(iterators[0], iterators[1], + index_column_family->index, + filter.release()); + } else { + return new CursorWithFilter( + new CursorFromIterator(DocumentDB::NewIterator( + read_options, primary_key_column_family_)), + filter.release()); + } + } else { + return new CursorWithFilter(cursor, filter.release()); + } + assert(false); + return nullptr; + } + + // currently, we lock and serialize all writes to rocksdb. reads are not + // locked and always get consistent view of the database. we should optimize + // locking in the future + port::Mutex write_mutex_; + port::Mutex name_to_index_mutex_; + const char* kPrimaryKey = "_id"; + struct IndexColumnFamily { + IndexColumnFamily(Index* _index, ColumnFamilyHandle* _column_family) + : index(_index), column_family(_column_family) {} + Index* index; + ColumnFamilyHandle* column_family; + }; + + + // name_to_index_ protected: + // 1) when writing -- 1. lock write_mutex_, 2. lock name_to_index_mutex_ + // 2) when reading -- lock name_to_index_mutex_ OR write_mutex_ + std::unordered_map name_to_index_; + ColumnFamilyHandle* primary_key_column_family_; + Options rocksdb_options_; +}; + +namespace { +Options GetRocksDBOptionsFromOptions(const DocumentDBOptions& options) { + Options rocksdb_options; + rocksdb_options.max_background_compactions = options.background_threads - 1; + rocksdb_options.max_background_flushes = 1; + rocksdb_options.write_buffer_size = options.memtable_size; + rocksdb_options.max_write_buffer_number = 6; + rocksdb_options.block_cache = NewLRUCache(options.cache_size); + return rocksdb_options; +} +} // namespace + +Status DocumentDB::Open(const DocumentDBOptions& options, + const std::string& name, + const std::vector& indexes, + DocumentDB** db, bool read_only) { + Options rocksdb_options = GetRocksDBOptionsFromOptions(options); + rocksdb_options.create_if_missing = true; + + std::vector column_families; + column_families.push_back(ColumnFamilyDescriptor( + kDefaultColumnFamilyName, ColumnFamilyOptions(rocksdb_options))); + for (const auto& index : indexes) { + column_families.emplace_back(InternalSecondaryIndexName(index.name), + ColumnFamilyOptions(rocksdb_options)); + } + std::vector handles; + DB* base_db; + Status s; + if (read_only) { + s = DB::OpenForReadOnly(DBOptions(rocksdb_options), name, column_families, + &handles, &base_db); + } else { + s = DB::Open(DBOptions(rocksdb_options), name, column_families, &handles, + &base_db); + } + if (!s.ok()) { + return s; + } + + std::vector> index_cf(indexes.size()); + assert(handles.size() == indexes.size() + 1); + for (size_t i = 0; i < indexes.size(); ++i) { + auto index = Index::CreateIndexFromDescription(*indexes[i].description, + indexes[i].name); + index_cf[i] = {index, handles[i + 1]}; + } + *db = new DocumentDBImpl(base_db, handles[0], index_cf, rocksdb_options); + return Status::OK(); +} + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/document/document_db_test.cc b/utilities/document/document_db_test.cc new file mode 100644 index 000000000..25d5effc2 --- /dev/null +++ b/utilities/document/document_db_test.cc @@ -0,0 +1,262 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include + +#include "utilities/json_document.h" +#include "utilities/document_db.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +class DocumentDBTest { + public: + DocumentDBTest() { + dbname_ = test::TmpDir() + "/document_db_test"; + DestroyDB(dbname_, Options()); + } + ~DocumentDBTest() { + delete db_; + DestroyDB(dbname_, Options()); + } + + void AssertCursorIDs(Cursor* cursor, std::vector expected) { + std::vector got; + while (cursor->Valid()) { + ASSERT_TRUE(cursor->Valid()); + ASSERT_TRUE(cursor->document().Contains("_id")); + got.push_back(cursor->document()["_id"].GetInt64()); + cursor->Next(); + } + std::sort(expected.begin(), expected.end()); + std::sort(got.begin(), got.end()); + ASSERT_TRUE(got == expected); + } + + // converts ' to ", so that we don't have to escape " all over the place + std::string ConvertQuotes(const std::string& input) { + std::string output; + for (auto x : input) { + if (x == '\'') { + output.push_back('\"'); + } else { + output.push_back(x); + } + } + return output; + } + + void CreateIndexes(std::vector indexes) { + for (auto i : indexes) { + ASSERT_OK(db_->CreateIndex(WriteOptions(), i)); + } + } + + JSONDocument* Parse(const std::string doc) { + return JSONDocument::ParseJSON(ConvertQuotes(doc).c_str()); + } + + std::string dbname_; + DocumentDB* db_; +}; + +TEST(DocumentDBTest, SimpleQueryTest) { + DocumentDBOptions options; + DocumentDB::IndexDescriptor index; + index.description = Parse("{'name': 1}"); + index.name = "name_index"; + + ASSERT_OK(DocumentDB::Open(options, dbname_, {}, &db_)); + CreateIndexes({index}); + delete db_; + // now there is index present + ASSERT_OK(DocumentDB::Open(options, dbname_, {index}, &db_)); + delete index.description; + + std::vector json_objects = { + "{'_id': 1, 'name': 'One'}", "{'_id': 2, 'name': 'Two'}", + "{'_id': 3, 'name': 'Three'}", "{'_id': 4, 'name': 'Four'}"}; + + for (auto& json : json_objects) { + std::unique_ptr document(Parse(json)); + ASSERT_TRUE(document.get() != nullptr); + ASSERT_OK(db_->Insert(WriteOptions(), *document)); + } + + // inserting a document with existing primary key should return failure + { + std::unique_ptr document(Parse(json_objects[0])); + ASSERT_TRUE(document.get() != nullptr); + Status s = db_->Insert(WriteOptions(), *document); + ASSERT_TRUE(s.IsInvalidArgument()); + } + + // find equal to "Two" + { + std::unique_ptr query( + Parse("[{'$filter': {'name': 'Two', '$index': 'name_index'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2}); + } + + // find less than "Three" + { + std::unique_ptr query(Parse( + "[{'$filter': {'name': {'$lt': 'Three'}, '$index': " + "'name_index'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + + AssertCursorIDs(cursor.get(), {1, 4}); + } + + // find less than "Three" without index + { + std::unique_ptr query( + Parse("[{'$filter': {'name': {'$lt': 'Three'} }}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {1, 4}); + } + + // remove less or equal to "Three" + { + std::unique_ptr query( + Parse("{'name': {'$lte': 'Three'}, '$index': 'name_index'}")); + ASSERT_OK(db_->Remove(ReadOptions(), WriteOptions(), *query)); + } + + // find all -- only "Two" left, everything else should be deleted + { + std::unique_ptr query(Parse("[]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2}); + } +} + +TEST(DocumentDBTest, ComplexQueryTest) { + DocumentDBOptions options; + DocumentDB::IndexDescriptor priority_index; + priority_index.description = Parse("{'priority': 1}"); + priority_index.name = "priority"; + DocumentDB::IndexDescriptor job_name_index; + job_name_index.description = Parse("{'job_name': 1}"); + job_name_index.name = "job_name"; + DocumentDB::IndexDescriptor progress_index; + progress_index.description = Parse("{'progress': 1}"); + progress_index.name = "progress"; + + ASSERT_OK(DocumentDB::Open(options, dbname_, {}, &db_)); + CreateIndexes({priority_index, progress_index}); + delete priority_index.description; + delete progress_index.description; + + std::vector json_objects = { + "{'_id': 1, 'job_name': 'play', 'priority': 10, 'progress': 14.2}", + "{'_id': 2, 'job_name': 'white', 'priority': 2, 'progress': 45.1}", + "{'_id': 3, 'job_name': 'straw', 'priority': 5, 'progress': 83.2}", + "{'_id': 4, 'job_name': 'temporary', 'priority': 3, 'progress': 14.9}", + "{'_id': 5, 'job_name': 'white', 'priority': 4, 'progress': 44.2}", + "{'_id': 6, 'job_name': 'tea', 'priority': 1, 'progress': 12.4}", + "{'_id': 7, 'job_name': 'delete', 'priority': 2, 'progress': 77.54}", + "{'_id': 8, 'job_name': 'rock', 'priority': 3, 'progress': 93.24}", + "{'_id': 9, 'job_name': 'steady', 'priority': 3, 'progress': 9.1}", + "{'_id': 10, 'job_name': 'white', 'priority': 1, 'progress': 61.4}", + "{'_id': 11, 'job_name': 'who', 'priority': 4, 'progress': 39.41}", }; + + // add index on the fly! + CreateIndexes({job_name_index}); + delete job_name_index.description; + + for (auto& json : json_objects) { + std::unique_ptr document(Parse(json)); + ASSERT_TRUE(document != nullptr); + ASSERT_OK(db_->Insert(WriteOptions(), *document)); + } + + // 2 < priority < 4 AND progress > 10.0, index priority + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$lt': 4, '$gt': 2}, 'progress': {'$gt': " + "10.0}, '$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 8}); + } + + // 2 < priority < 4 AND progress > 10.0, index progress + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$lt': 4, '$gt': 2}, 'progress': {'$gt': " + "10.0}, '$index': 'progress'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 8}); + } + + // job_name == 'white' AND priority >= 2, index job_name + { + std::unique_ptr query(Parse( + "[{'$filter': {'job_name': 'white', 'priority': {'$gte': " + "2}, '$index': 'job_name'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2, 5}); + } + + // 35.0 <= progress < 65.5, index progress + { + std::unique_ptr query(Parse( + "[{'$filter': {'progress': {'$gt': 5.0, '$gte': 35.0, '$lt': 65.5}, " + "'$index': 'progress'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {2, 5, 10, 11}); + } + + // 2 < priority <= 4, index priority + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$gt': 2, '$lt': 8, '$lte': 4}, " + "'$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 5, 8, 9, 11}); + } + + // Delete all whose progress is bigger than 50% + { + std::unique_ptr query( + Parse("{'progress': {'$gt': 50.0}, '$index': 'progress'}")); + ASSERT_OK(db_->Remove(ReadOptions(), WriteOptions(), *query)); + } + + // 2 < priority < 6, index priority + { + std::unique_ptr query(Parse( + "[{'$filter': {'priority': {'$gt': 2, '$lt': 6}, " + "'$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + AssertCursorIDs(cursor.get(), {4, 5, 9, 11}); + } + + // update set priority to 10 where job_name is 'white' + { + std::unique_ptr query(Parse("{'job_name': 'white'}")); + std::unique_ptr update(Parse("{'$set': {'priority': 10}}")); + ASSERT_OK(db_->Update(ReadOptions(), WriteOptions(), *query, *update)); + } + + // 4 < priority + { + std::unique_ptr query( + Parse("[{'$filter': {'priority': {'$gt': 4}, '$index': 'priority'}}]")); + std::unique_ptr cursor(db_->Query(ReadOptions(), *query)); + ASSERT_OK(cursor->status()); + AssertCursorIDs(cursor.get(), {1, 2, 5}); + } + + Status s = db_->DropIndex("doesnt-exist"); + ASSERT_TRUE(!s.ok()); + ASSERT_OK(db_->DropIndex("priority")); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); } diff --git a/utilities/document/json_document.cc b/utilities/document/json_document.cc index 46da6f8ba..f3ccc3884 100644 --- a/utilities/document/json_document.cc +++ b/utilities/document/json_document.cc @@ -6,6 +6,8 @@ #include "utilities/json_document.h" +#define __STDC_FORMAT_MACROS +#include #include #include #include @@ -267,6 +269,58 @@ bool JSONDocument::operator==(const JSONDocument& rhs) const { return false; } +std::string JSONDocument::DebugString() const { + std::string ret; + switch (type_) { + case kNull: + ret = "null"; + break; + case kArray: + ret = "["; + for (size_t i = 0; i < data_.a.size(); ++i) { + if (i) { + ret += ", "; + } + ret += data_.a[i]->DebugString(); + } + ret += "]"; + break; + case kBool: + ret = data_.b ? "true" : "false"; + break; + case kDouble: { + char buf[100]; + snprintf(buf, sizeof(buf), "%lf", data_.d); + ret = buf; + break; + } + case kInt64: { + char buf[100]; + snprintf(buf, sizeof(buf), "%" PRIi64, data_.i); + ret = buf; + break; + } + case kObject: { + bool first = true; + ret = "{"; + for (const auto& iter : data_.o) { + ret += first ? "" : ", "; + first = false; + ret += iter.first + ": "; + ret += iter.second->DebugString(); + } + ret += "}"; + break; + } + case kString: + ret = "\"" + data_.s + "\""; + break; + default: + assert(false); + } + return ret; +} + JSONDocument::ItemsIteratorGenerator JSONDocument::Items() const { assert(type_ == kObject); return data_.o;