rocksdb/utilities/redis/redis_list_iterator.h

309 lines
11 KiB
C
Raw Normal View History

/**
* RedisListIterator:
* An abstraction over the "list" concept (e.g.: for redis lists).
* Provides functionality to read, traverse, edit, and write these lists.
*
* Upon construction, the RedisListIterator is given a block of list data.
* Internally, it stores a pointer to the data and a pointer to current item.
* It also stores a "result" list that will be mutated over time.
*
* Traversal and mutation are done by "forward iteration".
* The Push() and Skip() methods will advance the iterator to the next item.
* However, Push() will also "write the current item to the result".
* Skip() will simply move to next item, causing current item to be dropped.
*
* Upon completion, the result (accessible by WriteResult()) will be saved.
* All "skipped" items will be gone; all "pushed" items will remain.
*
* @throws Any of the operations may throw a RedisListException if an invalid
* operation is performed or if the data is found to be corrupt.
*
* @notes By default, if WriteResult() is called part-way through iteration,
* it will automatically advance the iterator to the end, and Keep()
* all items that haven't been traversed yet. This may be subject
* to review.
*
* @notes Can access the "current" item via GetCurrent(), and other
* list-specific information such as Length().
*
* @notes The internal representation is due to change at any time. Presently,
* the list is represented as follows:
* - 32-bit integer header: the number of items in the list
* - For each item:
* - 32-bit int (n): the number of bytes representing this item
* - n bytes of data: the actual data.
*
* @author Deon Nicholas (dnicholas@fb.com)
* Copyright 2013 Facebook
*/
#pragma once
#include <string>
#include "redis_list_exception.h"
#include "rocksdb/slice.h"
#include "util/coding.h"
namespace leveldb {
/// An abstraction over the "list" concept.
/// All operations may throw a RedisListException
class RedisListIterator {
public:
/// Construct a redis-list-iterator based on data.
/// If the data is non-empty, it must formatted according to @notes above.
///
/// If the data is valid, we can assume the following invariant(s):
/// a) length_, num_bytes_ are set correctly.
/// b) cur_byte_ always refers to the start of the current element,
/// just before the bytes that specify element length.
/// c) cur_elem_ is always the index of the current element.
/// d) cur_elem_length_ is always the number of bytes in current element,
/// excluding the 4-byte header itself.
/// e) result_ will always contain data_[0..cur_byte_) and a header
/// f) Whenever corrupt data is encountered or an invalid operation is
/// attempted, a RedisListException will immediately be thrown.
RedisListIterator(const std::string& list_data)
: data_(list_data.data()),
num_bytes_(list_data.size()),
cur_byte_(0),
cur_elem_(0),
cur_elem_length_(0),
length_(0),
result_() {
// Initialize the result_ (reserve enough space for header)
InitializeResult();
// Parse the data only if it is not empty.
if (num_bytes_ == 0) {
return;
}
// If non-empty, but less than 4 bytes, data must be corrupt
if (num_bytes_ < sizeof(length_)) {
ThrowError("Corrupt header."); // Will break control flow
}
// Good. The first bytes specify the number of elements
length_ = DecodeFixed32(data_);
cur_byte_ = sizeof(length_);
// If we have at least one element, point to that element.
// Also, read the first integer of the element (specifying the size),
// if possible.
if (length_ > 0) {
if (cur_byte_ + sizeof(cur_elem_length_) <= num_bytes_) {
cur_elem_length_ = DecodeFixed32(data_+cur_byte_);
} else {
ThrowError("Corrupt data for first element.");
}
}
// At this point, we are fully set-up.
// The invariants described in the header should now be true.
}
/// Reserve some space for the result_.
/// Equivalent to result_.reserve(bytes).
void Reserve(int bytes) {
result_.reserve(bytes);
}
/// Go to next element in data file.
/// Also writes the current element to result_.
RedisListIterator& Push() {
WriteCurrentElement();
MoveNext();
return *this;
}
/// Go to next element in data file.
/// Drops/skips the current element. It will not be written to result_.
RedisListIterator& Skip() {
MoveNext();
--length_; // One less item
--cur_elem_; // We moved one forward, but index did not change
return *this;
}
/// Insert elem into the result_ (just BEFORE the current element / byte)
/// Note: if Done() (i.e.: iterator points to end), this will append elem.
void InsertElement(const Slice& elem) {
// Ensure we are in a valid state
CheckErrors();
const int kOrigSize = result_.size();
result_.resize(kOrigSize + SizeOf(elem));
EncodeFixed32(result_.data() + kOrigSize, elem.size());
memcpy(result_.data() + kOrigSize + sizeof(uint32_t),
elem.data(),
elem.size());
++length_;
++cur_elem_;
}
/// Access the current element, and save the result into *curElem
void GetCurrent(Slice* curElem) {
// Ensure we are in a valid state
CheckErrors();
// Ensure that we are not past the last element.
if (Done()) {
ThrowError("Invalid dereferencing.");
}
// Dereference the element
*curElem = Slice(data_+cur_byte_+sizeof(cur_elem_length_),
cur_elem_length_);
}
// Number of elements
int Length() const {
return length_;
}
// Number of bytes in the final representation (i.e: WriteResult().size())
int Size() const {
// result_ holds the currently written data
// data_[cur_byte..num_bytes-1] is the remainder of the data
return result_.size() + (num_bytes_ - cur_byte_);
}
// Reached the end?
bool Done() const {
return cur_byte_ >= num_bytes_ || cur_elem_ >= length_;
}
/// Returns a string representing the final, edited, data.
/// Assumes that all bytes of data_ in the range [0,cur_byte_) have been read
/// and that result_ contains this data.
/// The rest of the data must still be written.
/// So, this method ADVANCES THE ITERATOR TO THE END before writing.
Slice WriteResult() {
CheckErrors();
// The header should currently be filled with dummy data (0's)
// Correctly update the header.
// Note, this is safe since result_ is a vector (guaranteed contiguous)
EncodeFixed32(&result_[0],length_);
// Append the remainder of the data to the result.
result_.insert(result_.end(),data_+cur_byte_, data_ +num_bytes_);
// Seek to end of file
cur_byte_ = num_bytes_;
cur_elem_ = length_;
cur_elem_length_ = 0;
// Return the result
return Slice(result_.data(),result_.size());
}
public: // Static public functions
/// An upper-bound on the amount of bytes needed to store this element.
/// This is used to hide representation information from the client.
/// E.G. This can be used to compute the bytes we want to Reserve().
static uint32_t SizeOf(const Slice& elem) {
// [Integer Length . Data]
return sizeof(uint32_t) + elem.size();
}
private: // Private functions
/// Initializes the result_ string.
/// It will fill the first few bytes with 0's so that there is
/// enough space for header information when we need to write later.
/// Currently, "header information" means: the length (number of elements)
/// Assumes that result_ is empty to begin with
void InitializeResult() {
assert(result_.empty()); // Should always be true.
result_.resize(sizeof(uint32_t),0); // Put a block of 0's as the header
}
/// Go to the next element (used in Push() and Skip())
void MoveNext() {
CheckErrors();
// Check to make sure we are not already in a finished state
if (Done()) {
ThrowError("Attempting to iterate past end of list.");
}
// Move forward one element.
cur_byte_ += sizeof(cur_elem_length_) + cur_elem_length_;
++cur_elem_;
// If we are at the end, finish
if (Done()) {
cur_elem_length_ = 0;
return;
}
// Otherwise, we should be able to read the new element's length
if (cur_byte_ + sizeof(cur_elem_length_) > num_bytes_) {
ThrowError("Corrupt element data.");
}
// Set the new element's length
cur_elem_length_ = DecodeFixed32(data_+cur_byte_);
return;
}
/// Append the current element (pointed to by cur_byte_) to result_
/// Assumes result_ has already been reserved appropriately.
void WriteCurrentElement() {
// First verify that the iterator is still valid.
CheckErrors();
if (Done()) {
ThrowError("Attempting to write invalid element.");
}
// Append the cur element.
result_.insert(result_.end(),
data_+cur_byte_,
data_+cur_byte_+ sizeof(uint32_t) + cur_elem_length_);
}
/// Will ThrowError() if neccessary.
/// Checks for common/ubiquitous errors that can arise after most operations.
/// This method should be called before any reading operation.
/// If this function succeeds, then we are guaranteed to be in a valid state.
/// Other member functions should check for errors and ThrowError() also
/// if an error occurs that is specific to it even while in a valid state.
void CheckErrors() {
// Check if any crazy thing has happened recently
if ((cur_elem_ > length_) || // Bad index
(cur_byte_ > num_bytes_) || // No more bytes
(cur_byte_ + cur_elem_length_ > num_bytes_) || // Item too large
(cur_byte_ == num_bytes_ && cur_elem_ != length_) || // Too many items
(cur_elem_ == length_ && cur_byte_ != num_bytes_)) { // Too many bytes
ThrowError("Corrupt data.");
}
}
/// Will throw an exception based on the passed-in message.
/// This function is guaranteed to STOP THE CONTROL-FLOW.
/// (i.e.: you do not have to call "return" after calling ThrowError)
void ThrowError(const char* const msg = NULL) {
// TODO: For now we ignore the msg parameter. This can be expanded later.
throw RedisListException();
}
private:
const char* const data_; // A pointer to the data (the first byte)
const uint32_t num_bytes_; // The number of bytes in this list
uint32_t cur_byte_; // The current byte being read
uint32_t cur_elem_; // The current element being read
uint32_t cur_elem_length_; // The number of bytes in current element
uint32_t length_; // The number of elements in this list
std::vector<char> result_; // The output data
};
} // namespace leveldb