// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #include "rocksdb/utilities/write_batch_with_index.h" #include #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" #include "db/column_family.h" #include "db/skiplist.h" #include "util/arena.h" namespace rocksdb { // when direction == forward // * current_at_base_ <=> base_iterator > delta_iterator // when direction == backwards // * current_at_base_ <=> base_iterator < delta_iterator // always: // * equal_keys_ <=> base_iterator == delta_iterator class BaseDeltaIterator : public Iterator { public: BaseDeltaIterator(Iterator* base_iterator, WBWIIterator* delta_iterator, const Comparator* comparator) : forward_(true), current_at_base_(true), equal_keys_(false), status_(Status::OK()), base_iterator_(base_iterator), delta_iterator_(delta_iterator), comparator_(comparator) {} virtual ~BaseDeltaIterator() {} bool Valid() const override { return current_at_base_ ? BaseValid() : DeltaValid(); } void SeekToFirst() override { forward_ = true; base_iterator_->SeekToFirst(); delta_iterator_->SeekToFirst(); UpdateCurrent(); } void SeekToLast() override { forward_ = false; base_iterator_->SeekToLast(); delta_iterator_->SeekToLast(); UpdateCurrent(); } void Seek(const Slice& k) override { forward_ = true; base_iterator_->Seek(k); delta_iterator_->Seek(k); UpdateCurrent(); } void Next() override { if (!Valid()) { status_ = Status::NotSupported("Next() on invalid iterator"); } if (!forward_) { // Need to change direction // if our direction was backward and we're not equal, we have two states: // * both iterators are valid: we're already in a good state (current // shows to smaller) // * only one iterator is valid: we need to advance that iterator forward_ = true; equal_keys_ = false; if (!BaseValid()) { assert(DeltaValid()); base_iterator_->SeekToFirst(); } else if (!DeltaValid()) { delta_iterator_->SeekToFirst(); } else if (current_at_base_) { // Change delta from larger than base to smaller AdvanceDelta(); } else { // Change base from larger than delta to smaller AdvanceBase(); } if (DeltaValid() && BaseValid()) { if (Compare() == 0) { equal_keys_ = true; } } } Advance(); } void Prev() override { if (!Valid()) { status_ = Status::NotSupported("Prev() on invalid iterator"); } if (forward_) { // Need to change direction // if our direction was backward and we're not equal, we have two states: // * both iterators are valid: we're already in a good state (current // shows to smaller) // * only one iterator is valid: we need to advance that iterator forward_ = false; equal_keys_ = false; if (!BaseValid()) { assert(DeltaValid()); base_iterator_->SeekToLast(); } else if (!DeltaValid()) { delta_iterator_->SeekToLast(); } else if (current_at_base_) { // Change delta from less advanced than base to more advanced AdvanceDelta(); } else { // Change base from less advanced than delta to more advanced AdvanceBase(); } if (DeltaValid() && BaseValid()) { if (Compare() == 0) { equal_keys_ = true; } } } Advance(); } Slice key() const override { return current_at_base_ ? base_iterator_->key() : delta_iterator_->Entry().key; } Slice value() const override { return current_at_base_ ? base_iterator_->value() : delta_iterator_->Entry().value; } Status status() const { if (!status_.ok()) { return status_; } if (!base_iterator_->status().ok()) { return base_iterator_->status(); } return delta_iterator_->status(); } private: // -1 -- delta less advanced than base // 0 -- delta == base // 1 -- delta more advanced than base int Compare() const { assert(delta_iterator_->Valid() && base_iterator_->Valid()); int cmp = comparator_->Compare(delta_iterator_->Entry().key, base_iterator_->key()); if (forward_) { return cmp; } else { return -cmp; } } bool IsDeltaDelete() { assert(DeltaValid()); return delta_iterator_->Entry().type == kDeleteRecord; } void AssertInvariants() { #ifndef NDEBUG if (!Valid()) { return; } if (!BaseValid()) { assert(!current_at_base_ && delta_iterator_->Valid()); return; } if (!DeltaValid()) { assert(current_at_base_ && base_iterator_->Valid()); return; } // we don't support those yet assert(delta_iterator_->Entry().type != kMergeRecord && delta_iterator_->Entry().type != kLogDataRecord); int compare = comparator_->Compare(delta_iterator_->Entry().key, base_iterator_->key()); if (forward_) { // current_at_base -> compare < 0 assert(!current_at_base_ || compare < 0); // !current_at_base -> compare <= 0 assert(current_at_base_ && compare >= 0); } else { // current_at_base -> compare > 0 assert(!current_at_base_ || compare > 0); // !current_at_base -> compare <= 0 assert(current_at_base_ && compare <= 0); } // equal_keys_ <=> compare == 0 assert((equal_keys_ || compare != 0) && (!equal_keys_ || compare == 0)); #endif } void Advance() { if (equal_keys_) { assert(BaseValid() && DeltaValid()); AdvanceBase(); AdvanceDelta(); } else { if (current_at_base_) { assert(BaseValid()); AdvanceBase(); } else { assert(DeltaValid()); AdvanceDelta(); } } UpdateCurrent(); } void AdvanceDelta() { if (forward_) { delta_iterator_->Next(); } else { delta_iterator_->Prev(); } } void AdvanceBase() { if (forward_) { base_iterator_->Next(); } else { base_iterator_->Prev(); } } bool BaseValid() const { return base_iterator_->Valid(); } bool DeltaValid() const { return delta_iterator_->Valid(); } void UpdateCurrent() { while (true) { equal_keys_ = false; if (!BaseValid()) { // Base has finished. if (!DeltaValid()) { // Finished return; } if (IsDeltaDelete()) { AdvanceDelta(); } else { current_at_base_ = false; return; } } else if (!DeltaValid()) { // Delta has finished. current_at_base_ = true; return; } else { int compare = Compare(); if (compare <= 0) { // delta bigger or equal if (compare == 0) { equal_keys_ = true; } if (!IsDeltaDelete()) { current_at_base_ = false; return; } // Delta is less advanced and is delete. AdvanceDelta(); if (equal_keys_) { AdvanceBase(); } } else { current_at_base_ = true; return; } } } AssertInvariants(); } bool forward_; bool current_at_base_; bool equal_keys_; Status status_; std::unique_ptr base_iterator_; std::unique_ptr delta_iterator_; const Comparator* comparator_; // not owned }; class ReadableWriteBatch : public WriteBatch { public: explicit ReadableWriteBatch(size_t reserved_bytes = 0) : WriteBatch(reserved_bytes) {} // Retrieve some information from a write entry in the write batch, given // the start offset of the write entry. Status GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, Slice* value, Slice* blob) const; }; // Key used by skip list, as the binary searchable index of WriteBatchWithIndex. struct WriteBatchIndexEntry { WriteBatchIndexEntry(size_t o, uint32_t c) : offset(o), column_family(c), search_key(nullptr) {} WriteBatchIndexEntry(const Slice* sk, uint32_t c) : offset(0), column_family(c), search_key(sk) {} // If this flag appears in the offset, it indicates a key that is smaller // than any other entry for the same column family static const size_t kFlagMin = std::numeric_limits::max(); size_t offset; // offset of an entry in write batch's string buffer. uint32_t column_family; // column family of the entry const Slice* search_key; // if not null, instead of reading keys from // write batch, use it to compare. This is used // for lookup key. }; class WriteBatchEntryComparator { public: WriteBatchEntryComparator(const Comparator* default_comparator, const ReadableWriteBatch* write_batch) : default_comparator_(default_comparator), write_batch_(write_batch) {} // Compare a and b. Return a negative value if a is less than b, 0 if they // are equal, and a positive value if a is greater than b int operator()(const WriteBatchIndexEntry* entry1, const WriteBatchIndexEntry* entry2) const; int CompareKey(uint32_t column_family, const Slice& key1, const Slice& key2) const; void SetComparatorForCF(uint32_t column_family_id, const Comparator* comparator) { cf_comparator_map_[column_family_id] = comparator; } private: const Comparator* default_comparator_; std::unordered_map cf_comparator_map_; const ReadableWriteBatch* write_batch_; }; typedef SkipList WriteBatchEntrySkipList; class WBWIIteratorImpl : public WBWIIterator { public: WBWIIteratorImpl(uint32_t column_family_id, WriteBatchEntrySkipList* skip_list, const ReadableWriteBatch* write_batch) : column_family_id_(column_family_id), skip_list_iter_(skip_list), write_batch_(write_batch), valid_(false) {} virtual ~WBWIIteratorImpl() {} virtual bool Valid() const override { return valid_; } virtual void SeekToFirst() { valid_ = true; WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, column_family_id_); skip_list_iter_.Seek(&search_entry); ReadEntry(); } virtual void SeekToLast() { valid_ = true; WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin, column_family_id_ + 1); skip_list_iter_.Seek(&search_entry); if (!skip_list_iter_.Valid()) { skip_list_iter_.SeekToLast(); } else { skip_list_iter_.Prev(); } ReadEntry(); } virtual void Seek(const Slice& key) override { valid_ = true; WriteBatchIndexEntry search_entry(&key, column_family_id_); skip_list_iter_.Seek(&search_entry); ReadEntry(); } virtual void Next() override { skip_list_iter_.Next(); ReadEntry(); } virtual void Prev() override { skip_list_iter_.Prev(); ReadEntry(); } virtual const WriteEntry& Entry() const override { return current_; } virtual Status status() const override { return status_; } const WriteBatchIndexEntry* GetRawEntry() const { return skip_list_iter_.key(); } private: uint32_t column_family_id_; WriteBatchEntrySkipList::Iterator skip_list_iter_; const ReadableWriteBatch* write_batch_; Status status_; bool valid_; WriteEntry current_; void ReadEntry() { if (!status_.ok() || !skip_list_iter_.Valid()) { valid_ = false; return; } const WriteBatchIndexEntry* iter_entry = skip_list_iter_.key(); if (iter_entry == nullptr || iter_entry->column_family != column_family_id_) { valid_ = false; return; } Slice blob; status_ = write_batch_->GetEntryFromDataOffset( iter_entry->offset, ¤t_.type, ¤t_.key, ¤t_.value, &blob); if (!status_.ok()) { valid_ = false; } else if (current_.type != kPutRecord && current_.type != kDeleteRecord && current_.type != kMergeRecord) { valid_ = false; status_ = Status::Corruption("write batch index is corrupted"); } } }; struct WriteBatchWithIndex::Rep { Rep(const Comparator* index_comparator, size_t reserved_bytes = 0, bool _overwrite_key = false) : write_batch(reserved_bytes), comparator(index_comparator, &write_batch), skip_list(comparator, &arena), overwrite_key(_overwrite_key), last_entry_offset(0) {} ReadableWriteBatch write_batch; WriteBatchEntryComparator comparator; Arena arena; WriteBatchEntrySkipList skip_list; bool overwrite_key; size_t last_entry_offset; // Remember current offset of internal write batch, which is used as // the starting offset of the next record. void SetLastEntryOffset() { last_entry_offset = write_batch.GetDataSize(); } // In overwrite mode, find the existing entry for the same key and update it // to point to the current entry. // Return true if the key is found and updated. bool UpdateExistingEntry(ColumnFamilyHandle* column_family, const Slice& key); bool UpdateExistingEntryWithCfId(uint32_t column_family_id, const Slice& key); // Add the recent entry to the update. // In overwrite mode, if key already exists in the index, update it. void AddOrUpdateIndex(ColumnFamilyHandle* column_family, const Slice& key); void AddOrUpdateIndex(const Slice& key); // Allocate an index entry pointing to the last entry in the write batch and // put it to skip list. void AddNewEntry(uint32_t column_family_id); }; bool WriteBatchWithIndex::Rep::UpdateExistingEntry( ColumnFamilyHandle* column_family, const Slice& key) { uint32_t cf_id = GetColumnFamilyID(column_family); return UpdateExistingEntryWithCfId(cf_id, key); } bool WriteBatchWithIndex::Rep::UpdateExistingEntryWithCfId( uint32_t column_family_id, const Slice& key) { if (!overwrite_key) { return false; } WBWIIteratorImpl iter(column_family_id, &skip_list, &write_batch); iter.Seek(key); if (!iter.Valid()) { return false; } if (comparator.CompareKey(column_family_id, key, iter.Entry().key) != 0) { return false; } WriteBatchIndexEntry* non_const_entry = const_cast(iter.GetRawEntry()); non_const_entry->offset = last_entry_offset; return true; } void WriteBatchWithIndex::Rep::AddOrUpdateIndex( ColumnFamilyHandle* column_family, const Slice& key) { if (!UpdateExistingEntry(column_family, key)) { uint32_t cf_id = GetColumnFamilyID(column_family); const auto* cf_cmp = GetColumnFamilyUserComparator(column_family); if (cf_cmp != nullptr) { comparator.SetComparatorForCF(cf_id, cf_cmp); } AddNewEntry(cf_id); } } void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) { if (!UpdateExistingEntryWithCfId(0, key)) { AddNewEntry(0); } } void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) { auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry)); auto* index_entry = new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id); skip_list.Insert(index_entry); } Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset, WriteType* type, Slice* Key, Slice* value, Slice* blob) const { if (type == nullptr || Key == nullptr || value == nullptr || blob == nullptr) { return Status::InvalidArgument("Output parameters cannot be null"); } if (data_offset >= GetDataSize()) { return Status::InvalidArgument("data offset exceed write batch size"); } Slice input = Slice(rep_.data() + data_offset, rep_.size() - data_offset); char tag; uint32_t column_family; Status s = ReadRecordFromWriteBatch(&input, &tag, &column_family, Key, value, blob); switch (tag) { case kTypeColumnFamilyValue: case kTypeValue: *type = kPutRecord; break; case kTypeColumnFamilyDeletion: case kTypeDeletion: *type = kDeleteRecord; break; case kTypeColumnFamilyMerge: case kTypeMerge: *type = kMergeRecord; break; case kTypeLogData: *type = kLogDataRecord; break; default: return Status::Corruption("unknown WriteBatch tag"); } return Status::OK(); } WriteBatchWithIndex::WriteBatchWithIndex( const Comparator* default_index_comparator, size_t reserved_bytes, bool overwrite_key) : rep(new Rep(default_index_comparator, reserved_bytes, overwrite_key)) {} WriteBatchWithIndex::~WriteBatchWithIndex() { delete rep; } WriteBatch* WriteBatchWithIndex::GetWriteBatch() { return &rep->write_batch; } WBWIIterator* WriteBatchWithIndex::NewIterator() { return new WBWIIteratorImpl(0, &(rep->skip_list), &rep->write_batch); } WBWIIterator* WriteBatchWithIndex::NewIterator( ColumnFamilyHandle* column_family) { return new WBWIIteratorImpl(GetColumnFamilyID(column_family), &(rep->skip_list), &rep->write_batch); } Iterator* WriteBatchWithIndex::NewIteratorWithBase( ColumnFamilyHandle* column_family, Iterator* base_iterator) { if (rep->overwrite_key == false) { assert(false); return nullptr; } return new BaseDeltaIterator(base_iterator, NewIterator(column_family), GetColumnFamilyUserComparator(column_family)); } void WriteBatchWithIndex::Put(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); rep->write_batch.Put(column_family, key, value); rep->AddOrUpdateIndex(column_family, key); } void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); rep->write_batch.Put(key, value); rep->AddOrUpdateIndex(key); } void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); rep->write_batch.Merge(column_family, key, value); rep->AddOrUpdateIndex(column_family, key); } void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) { rep->SetLastEntryOffset(); rep->write_batch.Merge(key, value); rep->AddOrUpdateIndex(key); } void WriteBatchWithIndex::PutLogData(const Slice& blob) { rep->write_batch.PutLogData(blob); } void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family, const Slice& key) { rep->SetLastEntryOffset(); rep->write_batch.Delete(column_family, key); rep->AddOrUpdateIndex(column_family, key); } void WriteBatchWithIndex::Delete(const Slice& key) { rep->SetLastEntryOffset(); rep->write_batch.Delete(key); rep->AddOrUpdateIndex(key); } int WriteBatchEntryComparator::operator()( const WriteBatchIndexEntry* entry1, const WriteBatchIndexEntry* entry2) const { if (entry1->column_family > entry2->column_family) { return 1; } else if (entry1->column_family < entry2->column_family) { return -1; } if (entry1->offset == WriteBatchIndexEntry::kFlagMin) { return -1; } else if (entry2->offset == WriteBatchIndexEntry::kFlagMin) { return 1; } Status s; Slice key1, key2; if (entry1->search_key == nullptr) { Slice value, blob; WriteType write_type; s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1, &value, &blob); if (!s.ok()) { return 1; } } else { key1 = *(entry1->search_key); } if (entry2->search_key == nullptr) { Slice value, blob; WriteType write_type; s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2, &value, &blob); if (!s.ok()) { return -1; } } else { key2 = *(entry2->search_key); } int cmp = CompareKey(entry1->column_family, key1, key2); if (cmp != 0) { return cmp; } else if (entry1->offset > entry2->offset) { return 1; } else if (entry1->offset < entry2->offset) { return -1; } return 0; } int WriteBatchEntryComparator::CompareKey(uint32_t column_family, const Slice& key1, const Slice& key2) const { auto comparator_for_cf = cf_comparator_map_.find(column_family); if (comparator_for_cf != cf_comparator_map_.end()) { return comparator_for_cf->second->Compare(key1, key2); } else { return default_comparator_->Compare(key1, key2); } } } // namespace rocksdb