rocksdb/utilities/date_tiered/date_tiered_db_impl.cc
Siying Dong e67b35c076 Add Iterator::Refresh()
Summary:
Add and implement Iterator::Refresh(). When this function is called, if the super version doesn't change, update the sequence number of the iterator to the latest one and invalidate the iterator. If the super version changed, recreated the whole iterator. This can help users reuse the iterator more easily.
Closes https://github.com/facebook/rocksdb/pull/2621

Differential Revision: D5464500

Pulled By: siying

fbshipit-source-id: f548bd35e85c1efca2ea69273802f6704eba6ba9
2017-07-24 10:54:37 -07:00

397 lines
12 KiB
C++

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#include "utilities/date_tiered/date_tiered_db_impl.h"
#include <limits>
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/write_batch_internal.h"
#include "monitoring/instrumented_mutex.h"
#include "options/options_helper.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/date_tiered_db.h"
#include "table/merging_iterator.h"
#include "util/coding.h"
#include "util/filename.h"
#include "util/string_util.h"
namespace rocksdb {
// Open the db inside DateTieredDBImpl because options needs pointer to its ttl
DateTieredDBImpl::DateTieredDBImpl(
DB* db, Options options,
const std::vector<ColumnFamilyDescriptor>& descriptors,
const std::vector<ColumnFamilyHandle*>& handles, int64_t ttl,
int64_t column_family_interval)
: db_(db),
cf_options_(ColumnFamilyOptions(options)),
ioptions_(ImmutableCFOptions(options)),
ttl_(ttl),
column_family_interval_(column_family_interval),
mutex_(options.statistics.get(), db->GetEnv(), DB_MUTEX_WAIT_MICROS,
options.use_adaptive_mutex) {
latest_timebound_ = std::numeric_limits<int64_t>::min();
for (size_t i = 0; i < handles.size(); ++i) {
const auto& name = descriptors[i].name;
int64_t timestamp = 0;
try {
timestamp = ParseUint64(name);
} catch (const std::invalid_argument&) {
// Bypass unrelated column family, e.g. default
db_->DestroyColumnFamilyHandle(handles[i]);
continue;
}
if (timestamp > latest_timebound_) {
latest_timebound_ = timestamp;
}
handle_map_.insert(std::make_pair(timestamp, handles[i]));
}
}
DateTieredDBImpl::~DateTieredDBImpl() {
for (auto handle : handle_map_) {
db_->DestroyColumnFamilyHandle(handle.second);
}
delete db_;
db_ = nullptr;
}
Status DateTieredDB::Open(const Options& options, const std::string& dbname,
DateTieredDB** dbptr, int64_t ttl,
int64_t column_family_interval, bool read_only) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> descriptors;
std::vector<ColumnFamilyHandle*> handles;
DB* db;
Status s;
// Get column families
std::vector<std::string> column_family_names;
s = DB::ListColumnFamilies(db_options, dbname, &column_family_names);
if (!s.ok()) {
// No column family found. Use default
s = DB::Open(options, dbname, &db);
if (!s.ok()) {
return s;
}
} else {
for (auto name : column_family_names) {
descriptors.emplace_back(ColumnFamilyDescriptor(name, cf_options));
}
// Open database
if (read_only) {
s = DB::OpenForReadOnly(db_options, dbname, descriptors, &handles, &db);
} else {
s = DB::Open(db_options, dbname, descriptors, &handles, &db);
}
}
if (s.ok()) {
*dbptr = new DateTieredDBImpl(db, options, descriptors, handles, ttl,
column_family_interval);
}
return s;
}
// Checks if the string is stale or not according to TTl provided
bool DateTieredDBImpl::IsStale(int64_t keytime, int64_t ttl, Env* env) {
if (ttl <= 0) {
// Data is fresh if TTL is non-positive
return false;
}
int64_t curtime;
if (!env->GetCurrentTime(&curtime).ok()) {
// Treat the data as fresh if could not get current time
return false;
}
return curtime >= keytime + ttl;
}
// Drop column family when all data in that column family is expired
// TODO(jhli): Can be made a background job
Status DateTieredDBImpl::DropObsoleteColumnFamilies() {
int64_t curtime;
Status s;
s = db_->GetEnv()->GetCurrentTime(&curtime);
if (!s.ok()) {
return s;
}
{
InstrumentedMutexLock l(&mutex_);
auto iter = handle_map_.begin();
while (iter != handle_map_.end()) {
if (iter->first <= curtime - ttl_) {
s = db_->DropColumnFamily(iter->second);
if (!s.ok()) {
return s;
}
delete iter->second;
iter = handle_map_.erase(iter);
} else {
break;
}
}
}
return Status::OK();
}
// Get timestamp from user key
Status DateTieredDBImpl::GetTimestamp(const Slice& key, int64_t* result) {
if (key.size() < kTSLength) {
return Status::Corruption("Bad timestamp in key");
}
const char* pos = key.data() + key.size() - 8;
int64_t timestamp = 0;
if (port::kLittleEndian) {
int bytes_to_fill = 8;
for (int i = 0; i < bytes_to_fill; ++i) {
timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
<< ((bytes_to_fill - i - 1) << 3));
}
} else {
memcpy(&timestamp, pos, sizeof(timestamp));
}
*result = timestamp;
return Status::OK();
}
Status DateTieredDBImpl::CreateColumnFamily(
ColumnFamilyHandle** column_family) {
int64_t curtime;
Status s;
mutex_.AssertHeld();
s = db_->GetEnv()->GetCurrentTime(&curtime);
if (!s.ok()) {
return s;
}
int64_t new_timebound;
if (handle_map_.empty()) {
new_timebound = curtime + column_family_interval_;
} else {
new_timebound =
latest_timebound_ +
((curtime - latest_timebound_) / column_family_interval_ + 1) *
column_family_interval_;
}
std::string cf_name = ToString(new_timebound);
latest_timebound_ = new_timebound;
s = db_->CreateColumnFamily(cf_options_, cf_name, column_family);
if (s.ok()) {
handle_map_.insert(std::make_pair(new_timebound, *column_family));
}
return s;
}
Status DateTieredDBImpl::FindColumnFamily(int64_t keytime,
ColumnFamilyHandle** column_family,
bool create_if_missing) {
*column_family = nullptr;
{
InstrumentedMutexLock l(&mutex_);
auto iter = handle_map_.upper_bound(keytime);
if (iter == handle_map_.end()) {
if (!create_if_missing) {
return Status::NotFound();
} else {
return CreateColumnFamily(column_family);
}
}
// Move to previous element to get the appropriate time window
*column_family = iter->second;
}
return Status::OK();
}
Status DateTieredDBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& val) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
return s;
}
DropObsoleteColumnFamilies();
// Prune request to obsolete data
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return Status::InvalidArgument();
}
// Decide column family (i.e. the time window) to put into
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/);
if (!s.ok()) {
return s;
}
// Efficiently put with WriteBatch
WriteBatch batch;
batch.Put(column_family, key, val);
return Write(options, &batch);
}
Status DateTieredDBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
return s;
}
// Prune request to obsolete data
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return Status::NotFound();
}
// Decide column family to get from
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
if (!s.ok()) {
return s;
}
if (column_family == nullptr) {
// Cannot find column family
return Status::NotFound();
}
// Get value with key
return db_->Get(options, column_family, key, value);
}
bool DateTieredDBImpl::KeyMayExist(const ReadOptions& options, const Slice& key,
std::string* value, bool* value_found) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
// Cannot get current time
return false;
}
// Decide column family to get from
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
if (!s.ok() || column_family == nullptr) {
// Cannot find column family
return false;
}
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return false;
}
return db_->KeyMayExist(options, column_family, key, value, value_found);
}
Status DateTieredDBImpl::Delete(const WriteOptions& options, const Slice& key) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
return s;
}
DropObsoleteColumnFamilies();
// Prune request to obsolete data
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return Status::NotFound();
}
// Decide column family to get from
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
if (!s.ok()) {
return s;
}
if (column_family == nullptr) {
// Cannot find column family
return Status::NotFound();
}
// Get value with key
return db_->Delete(options, column_family, key);
}
Status DateTieredDBImpl::Merge(const WriteOptions& options, const Slice& key,
const Slice& value) {
// Decide column family to get from
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
// Cannot get current time
return s;
}
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/);
if (!s.ok()) {
return s;
}
WriteBatch batch;
batch.Merge(column_family, key, value);
return Write(options, &batch);
}
Status DateTieredDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
class Handler : public WriteBatch::Handler {
public:
explicit Handler() {}
WriteBatch updates_ttl;
Status batch_rewrite_status;
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
WriteBatchInternal::Put(&updates_ttl, column_family_id, key, value);
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, value);
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return Status::OK();
}
virtual void LogData(const Slice& blob) override {
updates_ttl.PutLogData(blob);
}
};
Handler handler;
updates->Iterate(&handler);
if (!handler.batch_rewrite_status.ok()) {
return handler.batch_rewrite_status;
} else {
return db_->Write(opts, &(handler.updates_ttl));
}
}
Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
if (handle_map_.empty()) {
return NewEmptyIterator();
}
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_);
auto db_iter = NewArenaWrappedDbIterator(
db_impl->GetEnv(), opts, ioptions_, kMaxSequenceNumber,
cf_options_.max_sequential_skip_in_iterations, 0);
auto arena = db_iter->GetArena();
MergeIteratorBuilder builder(cf_options_.comparator, arena);
for (auto& item : handle_map_) {
auto handle = item.second;
builder.AddIterator(db_impl->NewInternalIterator(
arena, db_iter->GetRangeDelAggregator(), handle));
}
auto internal_iter = builder.Finish();
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE