// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #ifndef ROCKSDB_LITE #include "utilities/date_tiered/date_tiered_db_impl.h" #include <limits> #include "db/db_impl.h" #include "db/db_iter.h" #include "db/write_batch_internal.h" #include "monitoring/instrumented_mutex.h" #include "options/options_helper.h" #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/utilities/date_tiered_db.h" #include "table/merging_iterator.h" #include "util/coding.h" #include "util/filename.h" #include "util/string_util.h" namespace rocksdb { // Open the db inside DateTieredDBImpl because options needs pointer to its ttl DateTieredDBImpl::DateTieredDBImpl( DB* db, Options options, const std::vector<ColumnFamilyDescriptor>& descriptors, const std::vector<ColumnFamilyHandle*>& handles, int64_t ttl, int64_t column_family_interval) : db_(db), cf_options_(ColumnFamilyOptions(options)), ioptions_(ImmutableCFOptions(options)), moptions_(MutableCFOptions(options)), icomp_(cf_options_.comparator), ttl_(ttl), column_family_interval_(column_family_interval), mutex_(options.statistics.get(), db->GetEnv(), DB_MUTEX_WAIT_MICROS, options.use_adaptive_mutex) { latest_timebound_ = std::numeric_limits<int64_t>::min(); for (size_t i = 0; i < handles.size(); ++i) { const auto& name = descriptors[i].name; int64_t timestamp = 0; try { timestamp = ParseUint64(name); } catch (const std::invalid_argument&) { // Bypass unrelated column family, e.g. default db_->DestroyColumnFamilyHandle(handles[i]); continue; } if (timestamp > latest_timebound_) { latest_timebound_ = timestamp; } handle_map_.insert(std::make_pair(timestamp, handles[i])); } } DateTieredDBImpl::~DateTieredDBImpl() { for (auto handle : handle_map_) { db_->DestroyColumnFamilyHandle(handle.second); } delete db_; db_ = nullptr; } Status DateTieredDB::Open(const Options& options, const std::string& dbname, DateTieredDB** dbptr, int64_t ttl, int64_t column_family_interval, bool read_only) { DBOptions db_options(options); ColumnFamilyOptions cf_options(options); std::vector<ColumnFamilyDescriptor> descriptors; std::vector<ColumnFamilyHandle*> handles; DB* db; Status s; // Get column families std::vector<std::string> column_family_names; s = DB::ListColumnFamilies(db_options, dbname, &column_family_names); if (!s.ok()) { // No column family found. Use default s = DB::Open(options, dbname, &db); if (!s.ok()) { return s; } } else { for (auto name : column_family_names) { descriptors.emplace_back(ColumnFamilyDescriptor(name, cf_options)); } // Open database if (read_only) { s = DB::OpenForReadOnly(db_options, dbname, descriptors, &handles, &db); } else { s = DB::Open(db_options, dbname, descriptors, &handles, &db); } } if (s.ok()) { *dbptr = new DateTieredDBImpl(db, options, descriptors, handles, ttl, column_family_interval); } return s; } // Checks if the string is stale or not according to TTl provided bool DateTieredDBImpl::IsStale(int64_t keytime, int64_t ttl, Env* env) { if (ttl <= 0) { // Data is fresh if TTL is non-positive return false; } int64_t curtime; if (!env->GetCurrentTime(&curtime).ok()) { // Treat the data as fresh if could not get current time return false; } return curtime >= keytime + ttl; } // Drop column family when all data in that column family is expired // TODO(jhli): Can be made a background job Status DateTieredDBImpl::DropObsoleteColumnFamilies() { int64_t curtime; Status s; s = db_->GetEnv()->GetCurrentTime(&curtime); if (!s.ok()) { return s; } { InstrumentedMutexLock l(&mutex_); auto iter = handle_map_.begin(); while (iter != handle_map_.end()) { if (iter->first <= curtime - ttl_) { s = db_->DropColumnFamily(iter->second); if (!s.ok()) { return s; } delete iter->second; iter = handle_map_.erase(iter); } else { break; } } } return Status::OK(); } // Get timestamp from user key Status DateTieredDBImpl::GetTimestamp(const Slice& key, int64_t* result) { if (key.size() < kTSLength) { return Status::Corruption("Bad timestamp in key"); } const char* pos = key.data() + key.size() - 8; int64_t timestamp = 0; if (port::kLittleEndian) { int bytes_to_fill = 8; for (int i = 0; i < bytes_to_fill; ++i) { timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i])) << ((bytes_to_fill - i - 1) << 3)); } } else { memcpy(×tamp, pos, sizeof(timestamp)); } *result = timestamp; return Status::OK(); } Status DateTieredDBImpl::CreateColumnFamily( ColumnFamilyHandle** column_family) { int64_t curtime; Status s; mutex_.AssertHeld(); s = db_->GetEnv()->GetCurrentTime(&curtime); if (!s.ok()) { return s; } int64_t new_timebound; if (handle_map_.empty()) { new_timebound = curtime + column_family_interval_; } else { new_timebound = latest_timebound_ + ((curtime - latest_timebound_) / column_family_interval_ + 1) * column_family_interval_; } std::string cf_name = ToString(new_timebound); latest_timebound_ = new_timebound; s = db_->CreateColumnFamily(cf_options_, cf_name, column_family); if (s.ok()) { handle_map_.insert(std::make_pair(new_timebound, *column_family)); } return s; } Status DateTieredDBImpl::FindColumnFamily(int64_t keytime, ColumnFamilyHandle** column_family, bool create_if_missing) { *column_family = nullptr; { InstrumentedMutexLock l(&mutex_); auto iter = handle_map_.upper_bound(keytime); if (iter == handle_map_.end()) { if (!create_if_missing) { return Status::NotFound(); } else { return CreateColumnFamily(column_family); } } // Move to previous element to get the appropriate time window *column_family = iter->second; } return Status::OK(); } Status DateTieredDBImpl::Put(const WriteOptions& options, const Slice& key, const Slice& val) { int64_t timestamp = 0; Status s; s = GetTimestamp(key, ×tamp); if (!s.ok()) { return s; } DropObsoleteColumnFamilies(); // Prune request to obsolete data if (IsStale(timestamp, ttl_, db_->GetEnv())) { return Status::InvalidArgument(); } // Decide column family (i.e. the time window) to put into ColumnFamilyHandle* column_family; s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/); if (!s.ok()) { return s; } // Efficiently put with WriteBatch WriteBatch batch; batch.Put(column_family, key, val); return Write(options, &batch); } Status DateTieredDBImpl::Get(const ReadOptions& options, const Slice& key, std::string* value) { int64_t timestamp = 0; Status s; s = GetTimestamp(key, ×tamp); if (!s.ok()) { return s; } // Prune request to obsolete data if (IsStale(timestamp, ttl_, db_->GetEnv())) { return Status::NotFound(); } // Decide column family to get from ColumnFamilyHandle* column_family; s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/); if (!s.ok()) { return s; } if (column_family == nullptr) { // Cannot find column family return Status::NotFound(); } // Get value with key return db_->Get(options, column_family, key, value); } bool DateTieredDBImpl::KeyMayExist(const ReadOptions& options, const Slice& key, std::string* value, bool* value_found) { int64_t timestamp = 0; Status s; s = GetTimestamp(key, ×tamp); if (!s.ok()) { // Cannot get current time return false; } // Decide column family to get from ColumnFamilyHandle* column_family; s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/); if (!s.ok() || column_family == nullptr) { // Cannot find column family return false; } if (IsStale(timestamp, ttl_, db_->GetEnv())) { return false; } return db_->KeyMayExist(options, column_family, key, value, value_found); } Status DateTieredDBImpl::Delete(const WriteOptions& options, const Slice& key) { int64_t timestamp = 0; Status s; s = GetTimestamp(key, ×tamp); if (!s.ok()) { return s; } DropObsoleteColumnFamilies(); // Prune request to obsolete data if (IsStale(timestamp, ttl_, db_->GetEnv())) { return Status::NotFound(); } // Decide column family to get from ColumnFamilyHandle* column_family; s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/); if (!s.ok()) { return s; } if (column_family == nullptr) { // Cannot find column family return Status::NotFound(); } // Get value with key return db_->Delete(options, column_family, key); } Status DateTieredDBImpl::Merge(const WriteOptions& options, const Slice& key, const Slice& value) { // Decide column family to get from int64_t timestamp = 0; Status s; s = GetTimestamp(key, ×tamp); if (!s.ok()) { // Cannot get current time return s; } ColumnFamilyHandle* column_family; s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/); if (!s.ok()) { return s; } WriteBatch batch; batch.Merge(column_family, key, value); return Write(options, &batch); } Status DateTieredDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { class Handler : public WriteBatch::Handler { public: explicit Handler() {} WriteBatch updates_ttl; Status batch_rewrite_status; virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { WriteBatchInternal::Put(&updates_ttl, column_family_id, key, value); return Status::OK(); } virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) override { WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, value); return Status::OK(); } virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) override { WriteBatchInternal::Delete(&updates_ttl, column_family_id, key); return Status::OK(); } virtual void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); } }; Handler handler; updates->Iterate(&handler); if (!handler.batch_rewrite_status.ok()) { return handler.batch_rewrite_status; } else { return db_->Write(opts, &(handler.updates_ttl)); } } Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) { if (handle_map_.empty()) { return NewEmptyIterator(); } DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_); auto db_iter = NewArenaWrappedDbIterator( db_impl->GetEnv(), opts, ioptions_, moptions_, kMaxSequenceNumber, cf_options_.max_sequential_skip_in_iterations, 0, nullptr /*read_callback*/); auto arena = db_iter->GetArena(); MergeIteratorBuilder builder(&icomp_, arena); for (auto& item : handle_map_) { auto handle = item.second; builder.AddIterator(db_impl->NewInternalIterator( arena, db_iter->GetRangeDelAggregator(), handle)); } auto internal_iter = builder.Finish(); db_iter->SetIterUnderDBIter(internal_iter); return db_iter; } } // namespace rocksdb #endif // ROCKSDB_LITE