64324e329e
Summary: As you know, almost all compilers support "pragma once" keyword instead of using include guards. To be keep consistency between header files, all header files are edited. Besides this, try to fix some warnings about loss of data. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4339 Differential Revision: D9654990 Pulled By: ajkr fbshipit-source-id: c2cf3d2d03a599847684bed81378c401920ca848
920 lines
27 KiB
C++
920 lines
27 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include "rocksdb/utilities/spatial_db.h"
|
|
|
|
#ifndef __STDC_FORMAT_MACROS
|
|
#define __STDC_FORMAT_MACROS
|
|
#endif
|
|
|
|
#include <algorithm>
|
|
#include <condition_variable>
|
|
#include <inttypes.h>
|
|
#include <string>
|
|
#include <vector>
|
|
#include <mutex>
|
|
#include <thread>
|
|
#include <set>
|
|
#include <unordered_set>
|
|
|
|
#include "rocksdb/cache.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/memtablerep.h"
|
|
#include "rocksdb/slice_transform.h"
|
|
#include "rocksdb/statistics.h"
|
|
#include "rocksdb/table.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/utilities/stackable_db.h"
|
|
#include "util/coding.h"
|
|
#include "utilities/spatialdb/utils.h"
|
|
#include "port/port.h"
|
|
|
|
namespace rocksdb {
|
|
namespace spatial {
|
|
|
|
// Column families are used to store element's data and spatial indexes. We use
|
|
// [default] column family to store the element data. This is the format of
|
|
// [default] column family:
|
|
// * id (fixed 64 big endian) -> blob (length prefixed slice) feature_set
|
|
// (serialized)
|
|
// We have one additional column family for each spatial index. The name of the
|
|
// column family is [spatial$<spatial_index_name>]. The format is:
|
|
// * quad_key (fixed 64 bit big endian) id (fixed 64 bit big endian) -> ""
|
|
// We store information about indexes in [metadata] column family. Format is:
|
|
// * spatial$<spatial_index_name> -> bbox (4 double encodings) tile_bits
|
|
// (varint32)
|
|
|
|
namespace {
|
|
const std::string kMetadataColumnFamilyName("metadata");
|
|
inline std::string GetSpatialIndexColumnFamilyName(
|
|
const std::string& spatial_index_name) {
|
|
return "spatial$" + spatial_index_name;
|
|
}
|
|
inline bool GetSpatialIndexName(const std::string& column_family_name,
|
|
Slice* dst) {
|
|
*dst = Slice(column_family_name);
|
|
if (dst->starts_with("spatial$")) {
|
|
dst->remove_prefix(8); // strlen("spatial$")
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
void Variant::Init(const Variant& v, Data& d) {
|
|
switch (v.type_) {
|
|
case kNull:
|
|
break;
|
|
case kBool:
|
|
d.b = v.data_.b;
|
|
break;
|
|
case kInt:
|
|
d.i = v.data_.i;
|
|
break;
|
|
case kDouble:
|
|
d.d = v.data_.d;
|
|
break;
|
|
case kString:
|
|
new (d.s) std::string(*GetStringPtr(v.data_));
|
|
break;
|
|
default:
|
|
assert(false);
|
|
}
|
|
}
|
|
|
|
Variant& Variant::operator=(const Variant& v) {
|
|
// Construct first a temp so exception from a string ctor
|
|
// does not change this object
|
|
Data tmp;
|
|
Init(v, tmp);
|
|
|
|
Type thisType = type_;
|
|
// Boils down to copying bits so safe
|
|
std::swap(tmp, data_);
|
|
type_ = v.type_;
|
|
|
|
Destroy(thisType, tmp);
|
|
|
|
return *this;
|
|
}
|
|
|
|
Variant& Variant::operator=(Variant&& rhs) {
|
|
Destroy(type_, data_);
|
|
if (rhs.type_ == kString) {
|
|
new (data_.s) std::string(std::move(*GetStringPtr(rhs.data_)));
|
|
} else {
|
|
data_ = rhs.data_;
|
|
}
|
|
type_ = rhs.type_;
|
|
rhs.type_ = kNull;
|
|
return *this;
|
|
}
|
|
|
|
bool Variant::operator==(const Variant& rhs) const {
|
|
if (type_ != rhs.type_) {
|
|
return false;
|
|
}
|
|
|
|
switch (type_) {
|
|
case kNull:
|
|
return true;
|
|
case kBool:
|
|
return data_.b == rhs.data_.b;
|
|
case kInt:
|
|
return data_.i == rhs.data_.i;
|
|
case kDouble:
|
|
return data_.d == rhs.data_.d;
|
|
case kString:
|
|
return *GetStringPtr(data_) == *GetStringPtr(rhs.data_);
|
|
default:
|
|
assert(false);
|
|
}
|
|
// it will never reach here, but otherwise the compiler complains
|
|
return false;
|
|
}
|
|
|
|
FeatureSet* FeatureSet::Set(const std::string& key, const Variant& value) {
|
|
map_.insert({key, value});
|
|
return this;
|
|
}
|
|
|
|
bool FeatureSet::Contains(const std::string& key) const {
|
|
return map_.find(key) != map_.end();
|
|
}
|
|
|
|
const Variant& FeatureSet::Get(const std::string& key) const {
|
|
auto itr = map_.find(key);
|
|
assert(itr != map_.end());
|
|
return itr->second;
|
|
}
|
|
|
|
FeatureSet::iterator FeatureSet::Find(const std::string& key) const {
|
|
return iterator(map_.find(key));
|
|
}
|
|
|
|
void FeatureSet::Clear() { map_.clear(); }
|
|
|
|
void FeatureSet::Serialize(std::string* output) const {
|
|
for (const auto& iter : map_) {
|
|
PutLengthPrefixedSlice(output, iter.first);
|
|
output->push_back(static_cast<char>(iter.second.type()));
|
|
switch (iter.second.type()) {
|
|
case Variant::kNull:
|
|
break;
|
|
case Variant::kBool:
|
|
output->push_back(static_cast<char>(iter.second.get_bool()));
|
|
break;
|
|
case Variant::kInt:
|
|
PutVarint64(output, iter.second.get_int());
|
|
break;
|
|
case Variant::kDouble: {
|
|
PutDouble(output, iter.second.get_double());
|
|
break;
|
|
}
|
|
case Variant::kString:
|
|
PutLengthPrefixedSlice(output, iter.second.get_string());
|
|
break;
|
|
default:
|
|
assert(false);
|
|
}
|
|
}
|
|
}
|
|
|
|
bool FeatureSet::Deserialize(const Slice& input) {
|
|
assert(map_.empty());
|
|
Slice s(input);
|
|
while (s.size()) {
|
|
Slice key;
|
|
if (!GetLengthPrefixedSlice(&s, &key) || s.size() == 0) {
|
|
return false;
|
|
}
|
|
char type = s[0];
|
|
s.remove_prefix(1);
|
|
switch (type) {
|
|
case Variant::kNull: {
|
|
map_.insert({key.ToString(), Variant()});
|
|
break;
|
|
}
|
|
case Variant::kBool: {
|
|
if (s.size() == 0) {
|
|
return false;
|
|
}
|
|
map_.insert({key.ToString(), Variant(static_cast<bool>(s[0]))});
|
|
s.remove_prefix(1);
|
|
break;
|
|
}
|
|
case Variant::kInt: {
|
|
uint64_t v;
|
|
if (!GetVarint64(&s, &v)) {
|
|
return false;
|
|
}
|
|
map_.insert({key.ToString(), Variant(v)});
|
|
break;
|
|
}
|
|
case Variant::kDouble: {
|
|
double d;
|
|
if (!GetDouble(&s, &d)) {
|
|
return false;
|
|
}
|
|
map_.insert({key.ToString(), Variant(d)});
|
|
break;
|
|
}
|
|
case Variant::kString: {
|
|
Slice str;
|
|
if (!GetLengthPrefixedSlice(&s, &str)) {
|
|
return false;
|
|
}
|
|
map_.insert({key.ToString(), str.ToString()});
|
|
break;
|
|
}
|
|
default:
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
std::string FeatureSet::DebugString() const {
|
|
std::string out = "{";
|
|
bool comma = false;
|
|
for (const auto& iter : map_) {
|
|
if (comma) {
|
|
out.append(", ");
|
|
} else {
|
|
comma = true;
|
|
}
|
|
out.append("\"" + iter.first + "\": ");
|
|
switch (iter.second.type()) {
|
|
case Variant::kNull:
|
|
out.append("null");
|
|
break;
|
|
case Variant::kBool:
|
|
if (iter.second.get_bool()) {
|
|
out.append("true");
|
|
} else {
|
|
out.append("false");
|
|
}
|
|
break;
|
|
case Variant::kInt: {
|
|
char buf[32];
|
|
snprintf(buf, sizeof(buf), "%" PRIu64, iter.second.get_int());
|
|
out.append(buf);
|
|
break;
|
|
}
|
|
case Variant::kDouble: {
|
|
char buf[32];
|
|
snprintf(buf, sizeof(buf), "%lf", iter.second.get_double());
|
|
out.append(buf);
|
|
break;
|
|
}
|
|
case Variant::kString:
|
|
out.append("\"" + iter.second.get_string() + "\"");
|
|
break;
|
|
default:
|
|
assert(false);
|
|
}
|
|
}
|
|
return out + "}";
|
|
}
|
|
|
|
class ValueGetter {
|
|
public:
|
|
ValueGetter() {}
|
|
virtual ~ValueGetter() {}
|
|
|
|
virtual bool Get(uint64_t id) = 0;
|
|
virtual const Slice value() const = 0;
|
|
|
|
virtual Status status() const = 0;
|
|
};
|
|
|
|
class ValueGetterFromDB : public ValueGetter {
|
|
public:
|
|
ValueGetterFromDB(DB* db, ColumnFamilyHandle* cf) : db_(db), cf_(cf) {}
|
|
|
|
virtual bool Get(uint64_t id) override {
|
|
std::string encoded_id;
|
|
PutFixed64BigEndian(&encoded_id, id);
|
|
status_ = db_->Get(ReadOptions(), cf_, encoded_id, &value_);
|
|
if (status_.IsNotFound()) {
|
|
status_ = Status::Corruption("Index inconsistency");
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
virtual const Slice value() const override { return value_; }
|
|
|
|
virtual Status status() const override { return status_; }
|
|
|
|
private:
|
|
std::string value_;
|
|
DB* db_;
|
|
ColumnFamilyHandle* cf_;
|
|
Status status_;
|
|
};
|
|
|
|
class ValueGetterFromIterator : public ValueGetter {
|
|
public:
|
|
explicit ValueGetterFromIterator(Iterator* iterator) : iterator_(iterator) {}
|
|
|
|
virtual bool Get(uint64_t id) override {
|
|
std::string encoded_id;
|
|
PutFixed64BigEndian(&encoded_id, id);
|
|
iterator_->Seek(encoded_id);
|
|
|
|
if (!iterator_->Valid() || iterator_->key() != Slice(encoded_id)) {
|
|
status_ = Status::Corruption("Index inconsistency");
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
virtual const Slice value() const override { return iterator_->value(); }
|
|
|
|
virtual Status status() const override { return status_; }
|
|
|
|
private:
|
|
std::unique_ptr<Iterator> iterator_;
|
|
Status status_;
|
|
};
|
|
|
|
class SpatialIndexCursor : public Cursor {
|
|
public:
|
|
// tile_box is inclusive
|
|
SpatialIndexCursor(Iterator* spatial_iterator, ValueGetter* value_getter,
|
|
const BoundingBox<uint64_t>& tile_bbox, uint32_t tile_bits)
|
|
: value_getter_(value_getter), valid_(true) {
|
|
// calculate quad keys we'll need to query
|
|
std::vector<uint64_t> quad_keys;
|
|
quad_keys.reserve(static_cast<size_t>((tile_bbox.max_x - tile_bbox.min_x + 1) *
|
|
(tile_bbox.max_y - tile_bbox.min_y + 1)));
|
|
for (uint64_t x = tile_bbox.min_x; x <= tile_bbox.max_x; ++x) {
|
|
for (uint64_t y = tile_bbox.min_y; y <= tile_bbox.max_y; ++y) {
|
|
quad_keys.push_back(GetQuadKeyFromTile(x, y, tile_bits));
|
|
}
|
|
}
|
|
std::sort(quad_keys.begin(), quad_keys.end());
|
|
|
|
// load primary key ids for all quad keys
|
|
for (auto quad_key : quad_keys) {
|
|
std::string encoded_quad_key;
|
|
PutFixed64BigEndian(&encoded_quad_key, quad_key);
|
|
Slice slice_quad_key(encoded_quad_key);
|
|
|
|
// If CheckQuadKey is true, there is no need to reseek, since
|
|
// spatial_iterator is already pointing at the correct quad key. This is
|
|
// an optimization.
|
|
if (!CheckQuadKey(spatial_iterator, slice_quad_key)) {
|
|
spatial_iterator->Seek(slice_quad_key);
|
|
}
|
|
|
|
while (CheckQuadKey(spatial_iterator, slice_quad_key)) {
|
|
// extract ID from spatial_iterator
|
|
uint64_t id;
|
|
bool ok = GetFixed64BigEndian(
|
|
Slice(spatial_iterator->key().data() + sizeof(uint64_t),
|
|
sizeof(uint64_t)),
|
|
&id);
|
|
if (!ok) {
|
|
valid_ = false;
|
|
status_ = Status::Corruption("Spatial index corruption");
|
|
break;
|
|
}
|
|
primary_key_ids_.insert(id);
|
|
spatial_iterator->Next();
|
|
}
|
|
}
|
|
|
|
if (!spatial_iterator->status().ok()) {
|
|
status_ = spatial_iterator->status();
|
|
valid_ = false;
|
|
}
|
|
delete spatial_iterator;
|
|
|
|
valid_ = valid_ && !primary_key_ids_.empty();
|
|
|
|
if (valid_) {
|
|
primary_keys_iterator_ = primary_key_ids_.begin();
|
|
ExtractData();
|
|
}
|
|
}
|
|
|
|
virtual bool Valid() const override { return valid_; }
|
|
|
|
virtual void Next() override {
|
|
assert(valid_);
|
|
|
|
++primary_keys_iterator_;
|
|
if (primary_keys_iterator_ == primary_key_ids_.end()) {
|
|
valid_ = false;
|
|
return;
|
|
}
|
|
|
|
ExtractData();
|
|
}
|
|
|
|
virtual const Slice blob() override { return current_blob_; }
|
|
virtual const FeatureSet& feature_set() override {
|
|
return current_feature_set_;
|
|
}
|
|
|
|
virtual Status status() const override {
|
|
if (!status_.ok()) {
|
|
return status_;
|
|
}
|
|
return value_getter_->status();
|
|
}
|
|
|
|
private:
|
|
// * returns true if spatial iterator is on the current quad key and all is
|
|
// well
|
|
// * returns false if spatial iterator is not on current, or iterator is
|
|
// invalid or corruption
|
|
bool CheckQuadKey(Iterator* spatial_iterator, const Slice& quad_key) {
|
|
if (!spatial_iterator->Valid()) {
|
|
return false;
|
|
}
|
|
if (spatial_iterator->key().size() != 2 * sizeof(uint64_t)) {
|
|
status_ = Status::Corruption("Invalid spatial index key");
|
|
valid_ = false;
|
|
return false;
|
|
}
|
|
Slice spatial_iterator_quad_key(spatial_iterator->key().data(),
|
|
sizeof(uint64_t));
|
|
if (spatial_iterator_quad_key != quad_key) {
|
|
// caller needs to reseek
|
|
return false;
|
|
}
|
|
// if we come to here, we have found the quad key
|
|
return true;
|
|
}
|
|
|
|
void ExtractData() {
|
|
assert(valid_);
|
|
valid_ = value_getter_->Get(*primary_keys_iterator_);
|
|
|
|
if (valid_) {
|
|
Slice data = value_getter_->value();
|
|
current_feature_set_.Clear();
|
|
if (!GetLengthPrefixedSlice(&data, ¤t_blob_) ||
|
|
!current_feature_set_.Deserialize(data)) {
|
|
status_ = Status::Corruption("Primary key column family corruption");
|
|
valid_ = false;
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
unique_ptr<ValueGetter> value_getter_;
|
|
bool valid_;
|
|
Status status_;
|
|
|
|
FeatureSet current_feature_set_;
|
|
Slice current_blob_;
|
|
|
|
// This is loaded from spatial iterator.
|
|
std::unordered_set<uint64_t> primary_key_ids_;
|
|
std::unordered_set<uint64_t>::iterator primary_keys_iterator_;
|
|
};
|
|
|
|
class ErrorCursor : public Cursor {
|
|
public:
|
|
explicit ErrorCursor(Status s) : s_(s) { assert(!s.ok()); }
|
|
virtual Status status() const override { return s_; }
|
|
virtual bool Valid() const override { return false; }
|
|
virtual void Next() override { assert(false); }
|
|
|
|
virtual const Slice blob() override {
|
|
assert(false);
|
|
return Slice();
|
|
}
|
|
virtual const FeatureSet& feature_set() override {
|
|
assert(false);
|
|
// compiler complains otherwise
|
|
return trash_;
|
|
}
|
|
|
|
private:
|
|
Status s_;
|
|
FeatureSet trash_;
|
|
};
|
|
|
|
class SpatialDBImpl : public SpatialDB {
|
|
public:
|
|
// * db -- base DB that needs to be forwarded to StackableDB
|
|
// * data_column_family -- column family used to store the data
|
|
// * spatial_indexes -- a list of spatial indexes together with column
|
|
// families that correspond to those spatial indexes
|
|
// * next_id -- next ID in auto-incrementing ID. This is usually
|
|
// `max_id_currenty_in_db + 1`
|
|
SpatialDBImpl(
|
|
DB* db, ColumnFamilyHandle* data_column_family,
|
|
const std::vector<std::pair<SpatialIndexOptions, ColumnFamilyHandle*>>&
|
|
spatial_indexes,
|
|
uint64_t next_id, bool read_only)
|
|
: SpatialDB(db),
|
|
data_column_family_(data_column_family),
|
|
next_id_(next_id),
|
|
read_only_(read_only) {
|
|
for (const auto& index : spatial_indexes) {
|
|
name_to_index_.insert(
|
|
{index.first.name, IndexColumnFamily(index.first, index.second)});
|
|
}
|
|
}
|
|
|
|
~SpatialDBImpl() {
|
|
for (auto& iter : name_to_index_) {
|
|
delete iter.second.column_family;
|
|
}
|
|
delete data_column_family_;
|
|
}
|
|
|
|
virtual Status Insert(
|
|
const WriteOptions& write_options, const BoundingBox<double>& bbox,
|
|
const Slice& blob, const FeatureSet& feature_set,
|
|
const std::vector<std::string>& spatial_indexes) override {
|
|
WriteBatch batch;
|
|
|
|
if (spatial_indexes.size() == 0) {
|
|
return Status::InvalidArgument("Spatial indexes can't be empty");
|
|
}
|
|
|
|
const size_t kWriteOutEveryBytes = 1024 * 1024; // 1MB
|
|
uint64_t id = next_id_.fetch_add(1);
|
|
|
|
for (const auto& si : spatial_indexes) {
|
|
auto itr = name_to_index_.find(si);
|
|
if (itr == name_to_index_.end()) {
|
|
return Status::InvalidArgument("Can't find index " + si);
|
|
}
|
|
const auto& spatial_index = itr->second.index;
|
|
if (!spatial_index.bbox.Intersects(bbox)) {
|
|
continue;
|
|
}
|
|
BoundingBox<uint64_t> tile_bbox = GetTileBoundingBox(spatial_index, bbox);
|
|
|
|
for (uint64_t x = tile_bbox.min_x; x <= tile_bbox.max_x; ++x) {
|
|
for (uint64_t y = tile_bbox.min_y; y <= tile_bbox.max_y; ++y) {
|
|
// see above for format
|
|
std::string key;
|
|
PutFixed64BigEndian(
|
|
&key, GetQuadKeyFromTile(x, y, spatial_index.tile_bits));
|
|
PutFixed64BigEndian(&key, id);
|
|
batch.Put(itr->second.column_family, key, Slice());
|
|
if (batch.GetDataSize() >= kWriteOutEveryBytes) {
|
|
Status s = Write(write_options, &batch);
|
|
batch.Clear();
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// see above for format
|
|
std::string data_key;
|
|
PutFixed64BigEndian(&data_key, id);
|
|
std::string data_value;
|
|
PutLengthPrefixedSlice(&data_value, blob);
|
|
feature_set.Serialize(&data_value);
|
|
batch.Put(data_column_family_, data_key, data_value);
|
|
|
|
return Write(write_options, &batch);
|
|
}
|
|
|
|
virtual Status Compact(int num_threads) override {
|
|
std::vector<ColumnFamilyHandle*> column_families;
|
|
column_families.push_back(data_column_family_);
|
|
|
|
for (auto& iter : name_to_index_) {
|
|
column_families.push_back(iter.second.column_family);
|
|
}
|
|
|
|
std::mutex state_mutex;
|
|
std::condition_variable cv;
|
|
Status s;
|
|
int threads_running = 0;
|
|
|
|
std::vector<port::Thread> threads;
|
|
|
|
for (auto cfh : column_families) {
|
|
threads.emplace_back([&, cfh] {
|
|
{
|
|
std::unique_lock<std::mutex> lk(state_mutex);
|
|
cv.wait(lk, [&] { return threads_running < num_threads; });
|
|
threads_running++;
|
|
}
|
|
|
|
Status t = Flush(FlushOptions(), cfh);
|
|
if (t.ok()) {
|
|
t = CompactRange(CompactRangeOptions(), cfh, nullptr, nullptr);
|
|
}
|
|
|
|
{
|
|
std::unique_lock<std::mutex> lk(state_mutex);
|
|
threads_running--;
|
|
if (s.ok() && !t.ok()) {
|
|
s = t;
|
|
}
|
|
cv.notify_one();
|
|
}
|
|
});
|
|
}
|
|
|
|
for (auto& t : threads) {
|
|
t.join();
|
|
}
|
|
|
|
return s;
|
|
}
|
|
|
|
virtual Cursor* Query(const ReadOptions& read_options,
|
|
const BoundingBox<double>& bbox,
|
|
const std::string& spatial_index) override {
|
|
auto itr = name_to_index_.find(spatial_index);
|
|
if (itr == name_to_index_.end()) {
|
|
return new ErrorCursor(Status::InvalidArgument(
|
|
"Spatial index " + spatial_index + " not found"));
|
|
}
|
|
const auto& si = itr->second.index;
|
|
Iterator* spatial_iterator;
|
|
ValueGetter* value_getter;
|
|
|
|
if (read_only_) {
|
|
spatial_iterator = NewIterator(read_options, itr->second.column_family);
|
|
value_getter = new ValueGetterFromDB(this, data_column_family_);
|
|
} else {
|
|
std::vector<Iterator*> iterators;
|
|
Status s = NewIterators(read_options,
|
|
{data_column_family_, itr->second.column_family},
|
|
&iterators);
|
|
if (!s.ok()) {
|
|
return new ErrorCursor(s);
|
|
}
|
|
|
|
spatial_iterator = iterators[1];
|
|
value_getter = new ValueGetterFromIterator(iterators[0]);
|
|
}
|
|
return new SpatialIndexCursor(spatial_iterator, value_getter,
|
|
GetTileBoundingBox(si, bbox), si.tile_bits);
|
|
}
|
|
|
|
private:
|
|
ColumnFamilyHandle* data_column_family_;
|
|
struct IndexColumnFamily {
|
|
SpatialIndexOptions index;
|
|
ColumnFamilyHandle* column_family;
|
|
IndexColumnFamily(const SpatialIndexOptions& _index,
|
|
ColumnFamilyHandle* _cf)
|
|
: index(_index), column_family(_cf) {}
|
|
};
|
|
// constant after construction!
|
|
std::unordered_map<std::string, IndexColumnFamily> name_to_index_;
|
|
|
|
std::atomic<uint64_t> next_id_;
|
|
bool read_only_;
|
|
};
|
|
|
|
namespace {
|
|
DBOptions GetDBOptionsFromSpatialDBOptions(const SpatialDBOptions& options) {
|
|
DBOptions db_options;
|
|
db_options.max_open_files = 50000;
|
|
db_options.max_background_compactions = 3 * options.num_threads / 4;
|
|
db_options.max_background_flushes =
|
|
options.num_threads - db_options.max_background_compactions;
|
|
db_options.env->SetBackgroundThreads(db_options.max_background_compactions,
|
|
Env::LOW);
|
|
db_options.env->SetBackgroundThreads(db_options.max_background_flushes,
|
|
Env::HIGH);
|
|
db_options.statistics = CreateDBStatistics();
|
|
if (options.bulk_load) {
|
|
db_options.stats_dump_period_sec = 600;
|
|
} else {
|
|
db_options.stats_dump_period_sec = 1800; // 30min
|
|
}
|
|
return db_options;
|
|
}
|
|
|
|
ColumnFamilyOptions GetColumnFamilyOptions(const SpatialDBOptions& /*options*/,
|
|
std::shared_ptr<Cache> block_cache) {
|
|
ColumnFamilyOptions column_family_options;
|
|
column_family_options.write_buffer_size = 128 * 1024 * 1024; // 128MB
|
|
column_family_options.max_write_buffer_number = 4;
|
|
column_family_options.max_bytes_for_level_base = 256 * 1024 * 1024; // 256MB
|
|
column_family_options.target_file_size_base = 64 * 1024 * 1024; // 64MB
|
|
column_family_options.level0_file_num_compaction_trigger = 2;
|
|
column_family_options.level0_slowdown_writes_trigger = 16;
|
|
column_family_options.level0_stop_writes_trigger = 32;
|
|
// only compress levels >= 2
|
|
column_family_options.compression_per_level.resize(
|
|
column_family_options.num_levels);
|
|
for (int i = 0; i < column_family_options.num_levels; ++i) {
|
|
if (i < 2) {
|
|
column_family_options.compression_per_level[i] = kNoCompression;
|
|
} else {
|
|
column_family_options.compression_per_level[i] = kLZ4Compression;
|
|
}
|
|
}
|
|
BlockBasedTableOptions table_options;
|
|
table_options.block_cache = block_cache;
|
|
column_family_options.table_factory.reset(
|
|
NewBlockBasedTableFactory(table_options));
|
|
return column_family_options;
|
|
}
|
|
|
|
ColumnFamilyOptions OptimizeOptionsForDataColumnFamily(
|
|
ColumnFamilyOptions options, std::shared_ptr<Cache> block_cache) {
|
|
options.prefix_extractor.reset(NewNoopTransform());
|
|
BlockBasedTableOptions block_based_options;
|
|
block_based_options.index_type = BlockBasedTableOptions::kHashSearch;
|
|
block_based_options.block_cache = block_cache;
|
|
options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
|
|
return options;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
class MetadataStorage {
|
|
public:
|
|
MetadataStorage(DB* db, ColumnFamilyHandle* cf) : db_(db), cf_(cf) {}
|
|
~MetadataStorage() {}
|
|
|
|
// format: <min_x double> <min_y double> <max_x double> <max_y double>
|
|
// <tile_bits varint32>
|
|
Status AddIndex(const SpatialIndexOptions& index) {
|
|
std::string encoded_index;
|
|
PutDouble(&encoded_index, index.bbox.min_x);
|
|
PutDouble(&encoded_index, index.bbox.min_y);
|
|
PutDouble(&encoded_index, index.bbox.max_x);
|
|
PutDouble(&encoded_index, index.bbox.max_y);
|
|
PutVarint32(&encoded_index, index.tile_bits);
|
|
return db_->Put(WriteOptions(), cf_,
|
|
GetSpatialIndexColumnFamilyName(index.name), encoded_index);
|
|
}
|
|
|
|
Status GetIndex(const std::string& name, SpatialIndexOptions* dst) {
|
|
std::string value;
|
|
Status s = db_->Get(ReadOptions(), cf_,
|
|
GetSpatialIndexColumnFamilyName(name), &value);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
dst->name = name;
|
|
Slice encoded_index(value);
|
|
bool ok = GetDouble(&encoded_index, &(dst->bbox.min_x));
|
|
ok = ok && GetDouble(&encoded_index, &(dst->bbox.min_y));
|
|
ok = ok && GetDouble(&encoded_index, &(dst->bbox.max_x));
|
|
ok = ok && GetDouble(&encoded_index, &(dst->bbox.max_y));
|
|
ok = ok && GetVarint32(&encoded_index, &(dst->tile_bits));
|
|
return ok ? Status::OK() : Status::Corruption("Index encoding corrupted");
|
|
}
|
|
|
|
private:
|
|
DB* db_;
|
|
ColumnFamilyHandle* cf_;
|
|
};
|
|
|
|
Status SpatialDB::Create(
|
|
const SpatialDBOptions& options, const std::string& name,
|
|
const std::vector<SpatialIndexOptions>& spatial_indexes) {
|
|
DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options);
|
|
db_options.create_if_missing = true;
|
|
db_options.create_missing_column_families = true;
|
|
db_options.error_if_exists = true;
|
|
|
|
auto block_cache = NewLRUCache(static_cast<size_t>(options.cache_size));
|
|
ColumnFamilyOptions column_family_options =
|
|
GetColumnFamilyOptions(options, block_cache);
|
|
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
column_families.push_back(ColumnFamilyDescriptor(
|
|
kDefaultColumnFamilyName,
|
|
OptimizeOptionsForDataColumnFamily(column_family_options, block_cache)));
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kMetadataColumnFamilyName, column_family_options));
|
|
|
|
for (const auto& index : spatial_indexes) {
|
|
column_families.emplace_back(GetSpatialIndexColumnFamilyName(index.name),
|
|
column_family_options);
|
|
}
|
|
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
DB* base_db;
|
|
Status s = DB::Open(db_options, name, column_families, &handles, &base_db);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
MetadataStorage metadata(base_db, handles[1]);
|
|
for (const auto& index : spatial_indexes) {
|
|
s = metadata.AddIndex(index);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
for (auto h : handles) {
|
|
delete h;
|
|
}
|
|
delete base_db;
|
|
|
|
return s;
|
|
}
|
|
|
|
Status SpatialDB::Open(const SpatialDBOptions& options, const std::string& name,
|
|
SpatialDB** db, bool read_only) {
|
|
DBOptions db_options = GetDBOptionsFromSpatialDBOptions(options);
|
|
auto block_cache = NewLRUCache(static_cast<size_t>(options.cache_size));
|
|
ColumnFamilyOptions column_family_options =
|
|
GetColumnFamilyOptions(options, block_cache);
|
|
|
|
Status s;
|
|
std::vector<std::string> existing_column_families;
|
|
std::vector<std::string> spatial_indexes;
|
|
s = DB::ListColumnFamilies(db_options, name, &existing_column_families);
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
for (const auto& cf_name : existing_column_families) {
|
|
Slice spatial_index;
|
|
if (GetSpatialIndexName(cf_name, &spatial_index)) {
|
|
spatial_indexes.emplace_back(spatial_index.data(), spatial_index.size());
|
|
}
|
|
}
|
|
|
|
std::vector<ColumnFamilyDescriptor> column_families;
|
|
column_families.push_back(ColumnFamilyDescriptor(
|
|
kDefaultColumnFamilyName,
|
|
OptimizeOptionsForDataColumnFamily(column_family_options, block_cache)));
|
|
column_families.push_back(
|
|
ColumnFamilyDescriptor(kMetadataColumnFamilyName, column_family_options));
|
|
|
|
for (const auto& index : spatial_indexes) {
|
|
column_families.emplace_back(GetSpatialIndexColumnFamilyName(index),
|
|
column_family_options);
|
|
}
|
|
std::vector<ColumnFamilyHandle*> handles;
|
|
DB* base_db;
|
|
if (read_only) {
|
|
s = DB::OpenForReadOnly(db_options, name, column_families, &handles,
|
|
&base_db);
|
|
} else {
|
|
s = DB::Open(db_options, name, column_families, &handles, &base_db);
|
|
}
|
|
if (!s.ok()) {
|
|
return s;
|
|
}
|
|
|
|
MetadataStorage metadata(base_db, handles[1]);
|
|
|
|
std::vector<std::pair<SpatialIndexOptions, ColumnFamilyHandle*>> index_cf;
|
|
assert(handles.size() == spatial_indexes.size() + 2);
|
|
for (size_t i = 0; i < spatial_indexes.size(); ++i) {
|
|
SpatialIndexOptions index_options;
|
|
s = metadata.GetIndex(spatial_indexes[i], &index_options);
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
index_cf.emplace_back(index_options, handles[i + 2]);
|
|
}
|
|
uint64_t next_id = 1;
|
|
if (s.ok()) {
|
|
// find next_id
|
|
Iterator* iter = base_db->NewIterator(ReadOptions(), handles[0]);
|
|
iter->SeekToLast();
|
|
if (iter->Valid()) {
|
|
uint64_t last_id = 0;
|
|
if (!GetFixed64BigEndian(iter->key(), &last_id)) {
|
|
s = Status::Corruption("Invalid key in data column family");
|
|
} else {
|
|
next_id = last_id + 1;
|
|
}
|
|
}
|
|
delete iter;
|
|
}
|
|
if (!s.ok()) {
|
|
for (auto h : handles) {
|
|
delete h;
|
|
}
|
|
delete base_db;
|
|
return s;
|
|
}
|
|
|
|
// I don't need metadata column family any more, so delete it
|
|
delete handles[1];
|
|
*db = new SpatialDBImpl(base_db, handles[0], index_cf, next_id, read_only);
|
|
return Status::OK();
|
|
}
|
|
|
|
} // namespace spatial
|
|
} // namespace rocksdb
|
|
#endif // ROCKSDB_LITE
|