dcd36a6aee
Summary: Blob db doesn't currently support column families. Return NotSupported status explicitly. Closes https://github.com/facebook/rocksdb/pull/2825 Differential Revision: D5757438 Pulled By: yiwu-arbug fbshipit-source-id: 44de9408fd032c98e8ae337d4db4ed37169bd9fa
747 lines
22 KiB
C++
747 lines
22 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
|
|
#pragma once
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include <atomic>
|
|
#include <condition_variable>
|
|
#include <ctime>
|
|
#include <limits>
|
|
#include <list>
|
|
#include <memory>
|
|
#include <set>
|
|
#include <string>
|
|
#include <thread>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "rocksdb/compaction_filter.h"
|
|
#include "rocksdb/db.h"
|
|
#include "rocksdb/listener.h"
|
|
#include "rocksdb/options.h"
|
|
#include "rocksdb/wal_filter.h"
|
|
#include "util/file_reader_writer.h"
|
|
#include "util/mpsc.h"
|
|
#include "util/mutexlock.h"
|
|
#include "util/timer_queue.h"
|
|
#include "utilities/blob_db/blob_db.h"
|
|
#include "utilities/blob_db/blob_log_format.h"
|
|
#include "utilities/blob_db/blob_log_reader.h"
|
|
#include "utilities/blob_db/blob_log_writer.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class DBImpl;
|
|
class ColumnFamilyHandle;
|
|
class ColumnFamilyData;
|
|
class OptimisticTransactionDBImpl;
|
|
struct FlushJobInfo;
|
|
|
|
namespace blob_db {
|
|
|
|
class BlobFile;
|
|
class BlobDBImpl;
|
|
|
|
class BlobDBFlushBeginListener : public EventListener {
|
|
public:
|
|
explicit BlobDBFlushBeginListener() : impl_(nullptr) {}
|
|
|
|
void OnFlushBegin(DB* db, const FlushJobInfo& info) override;
|
|
|
|
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
|
|
|
|
protected:
|
|
BlobDBImpl* impl_;
|
|
};
|
|
|
|
// this implements the callback from the WAL which ensures that the
|
|
// blob record is present in the blob log. If fsync/fdatasync in not
|
|
// happening on every write, there is the probability that keys in the
|
|
// blob log can lag the keys in blobs
|
|
class BlobReconcileWalFilter : public WalFilter {
|
|
public:
|
|
virtual WalFilter::WalProcessingOption LogRecordFound(
|
|
unsigned long long log_number, const std::string& log_file_name,
|
|
const WriteBatch& batch, WriteBatch* new_batch,
|
|
bool* batch_changed) override;
|
|
|
|
virtual const char* Name() const override { return "BlobDBWalReconciler"; }
|
|
|
|
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
|
|
|
|
protected:
|
|
BlobDBImpl* impl_;
|
|
};
|
|
|
|
class EvictAllVersionsCompactionListener : public EventListener {
|
|
public:
|
|
class InternalListener : public CompactionEventListener {
|
|
friend class BlobDBImpl;
|
|
|
|
public:
|
|
virtual void OnCompaction(int level, const Slice& key,
|
|
CompactionListenerValueType value_type,
|
|
const Slice& existing_value,
|
|
const SequenceNumber& sn, bool is_new) override;
|
|
|
|
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
|
|
|
|
private:
|
|
BlobDBImpl* impl_;
|
|
};
|
|
|
|
explicit EvictAllVersionsCompactionListener()
|
|
: internal_listener_(new InternalListener()) {}
|
|
|
|
virtual CompactionEventListener* GetCompactionEventListener() override {
|
|
return internal_listener_.get();
|
|
}
|
|
|
|
void SetImplPtr(BlobDBImpl* p) { internal_listener_->SetImplPtr(p); }
|
|
|
|
private:
|
|
std::unique_ptr<InternalListener> internal_listener_;
|
|
};
|
|
|
|
#if 0
|
|
class EvictAllVersionsFilterFactory : public CompactionFilterFactory {
|
|
private:
|
|
BlobDBImpl* impl_;
|
|
|
|
public:
|
|
EvictAllVersionsFilterFactory() : impl_(nullptr) {}
|
|
|
|
void SetImplPtr(BlobDBImpl* p) { impl_ = p; }
|
|
|
|
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
|
|
const CompactionFilter::Context& context) override;
|
|
|
|
virtual const char* Name() const override {
|
|
return "EvictAllVersionsFilterFactory";
|
|
}
|
|
};
|
|
#endif
|
|
|
|
// Comparator to sort "TTL" aware Blob files based on the lower value of
|
|
// TTL range.
|
|
struct blobf_compare_ttl {
|
|
bool operator()(const std::shared_ptr<BlobFile>& lhs,
|
|
const std::shared_ptr<BlobFile>& rhs) const;
|
|
};
|
|
|
|
struct GCStats {
|
|
uint64_t blob_count = 0;
|
|
uint64_t num_deletes = 0;
|
|
uint64_t deleted_size = 0;
|
|
uint64_t retry_delete = 0;
|
|
uint64_t delete_succeeded = 0;
|
|
uint64_t overwritten_while_delete = 0;
|
|
uint64_t num_relocate = 0;
|
|
uint64_t retry_relocate = 0;
|
|
uint64_t relocate_succeeded = 0;
|
|
uint64_t overwritten_while_relocate = 0;
|
|
std::shared_ptr<BlobFile> newfile = nullptr;
|
|
};
|
|
|
|
/**
|
|
* The implementation class for BlobDB. This manages the value
|
|
* part in TTL aware sequentially written files. These files are
|
|
* Garbage Collected.
|
|
*/
|
|
class BlobDBImpl : public BlobDB {
|
|
friend class BlobDBFlushBeginListener;
|
|
friend class EvictAllVersionsCompactionListener;
|
|
friend class BlobDB;
|
|
friend class BlobFile;
|
|
friend class BlobDBIterator;
|
|
|
|
public:
|
|
// deletions check period
|
|
static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
|
|
|
|
// gc percentage each check period
|
|
static constexpr uint32_t kGCFilePercentage = 100;
|
|
|
|
// gc period
|
|
static constexpr uint32_t kGCCheckPeriodMillisecs = 60 * 1000;
|
|
|
|
// sanity check task
|
|
static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
|
|
|
|
// how many random access open files can we tolerate
|
|
static constexpr uint32_t kOpenFilesTrigger = 100;
|
|
|
|
// how many periods of stats do we keep.
|
|
static constexpr uint32_t kWriteAmplificationStatsPeriods = 24;
|
|
|
|
// what is the length of any period
|
|
static constexpr uint32_t kWriteAmplificationStatsPeriodMillisecs =
|
|
3600 * 1000;
|
|
|
|
// we will garbage collect blob files in
|
|
// which entire files have expired. However if the
|
|
// ttl_range of files is very large say a day, we
|
|
// would have to wait for the entire day, before we
|
|
// recover most of the space.
|
|
static constexpr uint32_t kPartialExpirationGCRangeSecs = 4 * 3600;
|
|
|
|
// this should be based on allowed Write Amplification
|
|
// if 50% of the space of a blob file has been deleted/expired,
|
|
static constexpr uint32_t kPartialExpirationPercentage = 75;
|
|
|
|
// how often should we schedule a job to fsync open files
|
|
static constexpr uint32_t kFSyncFilesPeriodMillisecs = 10 * 1000;
|
|
|
|
// how often to schedule reclaim open files.
|
|
static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
|
|
|
|
// how often to schedule delete obs files periods
|
|
static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
|
|
|
|
// how often to schedule check seq files period
|
|
static constexpr uint32_t kCheckSeqFilesPeriodMillisecs = 10 * 1000;
|
|
|
|
using BlobDB::Put;
|
|
Status Put(const WriteOptions& options, const Slice& key,
|
|
const Slice& value) override;
|
|
|
|
using BlobDB::Delete;
|
|
Status Delete(const WriteOptions& options, const Slice& key) override;
|
|
|
|
using BlobDB::Get;
|
|
Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
|
|
const Slice& key, PinnableSlice* value) override;
|
|
|
|
using BlobDB::NewIterator;
|
|
virtual Iterator* NewIterator(const ReadOptions& read_options) override;
|
|
|
|
using BlobDB::MultiGet;
|
|
virtual std::vector<Status> MultiGet(
|
|
const ReadOptions& read_options,
|
|
const std::vector<Slice>& keys,
|
|
std::vector<std::string>* values) override;
|
|
|
|
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
|
|
|
|
using BlobDB::PutWithTTL;
|
|
Status PutWithTTL(const WriteOptions& options, const Slice& key,
|
|
const Slice& value, uint64_t ttl) override;
|
|
|
|
using BlobDB::PutUntil;
|
|
Status PutUntil(const WriteOptions& options, const Slice& key,
|
|
const Slice& value_unc, uint64_t expiration) override;
|
|
|
|
Status LinkToBaseDB(DB* db) override;
|
|
|
|
BlobDBOptions GetBlobDBOptions() const override;
|
|
|
|
BlobDBImpl(DB* db, const BlobDBOptions& bdb_options);
|
|
|
|
BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
|
|
const DBOptions& db_options);
|
|
|
|
~BlobDBImpl();
|
|
|
|
#ifndef NDEBUG
|
|
Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence);
|
|
|
|
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
|
|
|
|
std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
|
|
|
|
Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
|
|
|
|
Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
|
|
GCStats* gc_stats);
|
|
|
|
void TEST_RunGC();
|
|
|
|
void TEST_ObsoleteFile(std::shared_ptr<BlobFile>& bfile);
|
|
|
|
void TEST_DeleteObsoleteFiles();
|
|
#endif // !NDEBUG
|
|
|
|
private:
|
|
Status OpenPhase1();
|
|
|
|
// Create a snapshot if there isn't one in read options.
|
|
// Return true if a snapshot is created.
|
|
bool SetSnapshotIfNeeded(ReadOptions* read_options);
|
|
|
|
Status CommonGet(const Slice& key, const std::string& index_entry,
|
|
std::string* value, SequenceNumber* sequence = nullptr);
|
|
|
|
Slice GetCompressedSlice(const Slice& raw,
|
|
std::string* compression_output) const;
|
|
|
|
// Just before flush starts acting on memtable files,
|
|
// this handler is called.
|
|
void OnFlushBeginHandler(DB* db, const FlushJobInfo& info);
|
|
|
|
// is this file ready for Garbage collection. if the TTL of the file
|
|
// has expired or if threshold of the file has been evicted
|
|
// tt - current time
|
|
// last_id - the id of the non-TTL file to evict
|
|
bool ShouldGCFile(std::shared_ptr<BlobFile> bfile, uint64_t now,
|
|
bool is_oldest_simple_blob_file, std::string* reason);
|
|
|
|
// collect all the blob log files from the blob directory
|
|
Status GetAllLogFiles(std::set<std::pair<uint64_t, std::string>>* file_nums);
|
|
|
|
// Close a file by appending a footer, and removes file from open files list.
|
|
Status CloseBlobFile(std::shared_ptr<BlobFile> bfile);
|
|
|
|
// Close a file if its size exceeds blob_file_size
|
|
Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
|
|
|
|
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
|
|
Slice* value_slice, std::string* new_value);
|
|
|
|
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
|
|
const std::string& headerbuf, const Slice& key,
|
|
const Slice& value, std::string* index_entry);
|
|
|
|
Status AppendSN(const std::shared_ptr<BlobFile>& bfile,
|
|
const SequenceNumber& sn);
|
|
|
|
// find an existing blob log file based on the expiration unix epoch
|
|
// if such a file does not exist, return nullptr
|
|
std::shared_ptr<BlobFile> SelectBlobFileTTL(uint64_t expiration);
|
|
|
|
// find an existing blob log file to append the value to
|
|
std::shared_ptr<BlobFile> SelectBlobFile();
|
|
|
|
std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
|
|
|
|
void UpdateWriteOptions(const WriteOptions& options);
|
|
|
|
void Shutdown();
|
|
|
|
// periodic sanity check. Bunch of checks
|
|
std::pair<bool, int64_t> SanityCheck(bool aborted);
|
|
|
|
// delete files which have been garbage collected and marked
|
|
// obsolete. Check whether any snapshots exist which refer to
|
|
// the same
|
|
std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
|
|
|
|
// Major task to garbage collect expired and deleted blobs
|
|
std::pair<bool, int64_t> RunGC(bool aborted);
|
|
|
|
// asynchronous task to fsync/fdatasync the open blob files
|
|
std::pair<bool, int64_t> FsyncFiles(bool aborted);
|
|
|
|
// periodically check if open blob files and their TTL's has expired
|
|
// if expired, close the sequential writer and make the file immutable
|
|
std::pair<bool, int64_t> CheckSeqFiles(bool aborted);
|
|
|
|
// if the number of open files, approaches ULIMIT's this
|
|
// task will close random readers, which are kept around for
|
|
// efficiency
|
|
std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
|
|
|
|
// periodically print write amplification statistics
|
|
std::pair<bool, int64_t> WaStats(bool aborted);
|
|
|
|
// background task to do book-keeping of deleted keys
|
|
std::pair<bool, int64_t> EvictDeletions(bool aborted);
|
|
|
|
std::pair<bool, int64_t> EvictCompacted(bool aborted);
|
|
|
|
bool CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile);
|
|
|
|
std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
|
|
|
|
std::pair<bool, int64_t> CallbackEvicts(TimerQueue* tq,
|
|
std::shared_ptr<BlobFile> bfile,
|
|
bool aborted);
|
|
|
|
// Adds the background tasks to the timer queue
|
|
void StartBackgroundTasks();
|
|
|
|
// add a new Blob File
|
|
std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason);
|
|
|
|
Status OpenAllFiles();
|
|
|
|
// hold write mutex on file and call
|
|
// creates a Random Access reader for GET call
|
|
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
|
|
const std::shared_ptr<BlobFile>& bfile, Env* env,
|
|
const EnvOptions& env_options);
|
|
|
|
// hold write mutex on file and call.
|
|
// Close the above Random Access reader
|
|
void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
|
|
|
|
// hold write mutex on file and call
|
|
// creates a sequential (append) writer for this blobfile
|
|
Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
|
|
|
|
// returns a Writer object for the file. If writer is not
|
|
// already present, creates one. Needs Write Mutex to be held
|
|
std::shared_ptr<Writer> CheckOrCreateWriterLocked(
|
|
const std::shared_ptr<BlobFile>& bfile);
|
|
|
|
// Iterate through keys and values on Blob and write into
|
|
// separate file the remaining blobs and delete/update pointers
|
|
// in LSM atomically
|
|
Status GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
|
|
GCStats* gcstats);
|
|
|
|
// checks if there is no snapshot which is referencing the
|
|
// blobs
|
|
bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
|
|
|
|
bool MarkBlobDeleted(const Slice& key, const Slice& lsmValue);
|
|
|
|
bool FindFileAndEvictABlob(uint64_t file_number, uint64_t key_size,
|
|
uint64_t blob_offset, uint64_t blob_size);
|
|
|
|
void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
|
|
|
|
void FilterSubsetOfFiles(
|
|
const std::vector<std::shared_ptr<BlobFile>>& blob_files,
|
|
std::vector<std::shared_ptr<BlobFile>>* to_process, uint64_t epoch,
|
|
size_t files_to_collect);
|
|
|
|
uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
|
|
|
|
// the base DB
|
|
DBImpl* db_impl_;
|
|
Env* env_;
|
|
TTLExtractor* ttl_extractor_;
|
|
|
|
// Optimistic Transaction DB used during Garbage collection
|
|
// for atomicity
|
|
std::unique_ptr<OptimisticTransactionDBImpl> opt_db_;
|
|
|
|
// a boolean to capture whether write_options has been set
|
|
std::atomic<bool> wo_set_;
|
|
WriteOptions write_options_;
|
|
|
|
// the options that govern the behavior of Blob Storage
|
|
BlobDBOptions bdb_options_;
|
|
DBOptions db_options_;
|
|
EnvOptions env_options_;
|
|
|
|
// name of the database directory
|
|
std::string dbname_;
|
|
|
|
// by default this is "blob_dir" under dbname_
|
|
// but can be configured
|
|
std::string blob_dir_;
|
|
|
|
// pointer to directory
|
|
std::unique_ptr<Directory> dir_ent_;
|
|
|
|
std::atomic<bool> dir_change_;
|
|
|
|
// Read Write Mutex, which protects all the data structures
|
|
// HEAVILY TRAFFICKED
|
|
mutable port::RWMutex mutex_;
|
|
|
|
// Writers has to hold write_mutex_ before writing.
|
|
mutable port::Mutex write_mutex_;
|
|
|
|
// counter for blob file number
|
|
std::atomic<uint64_t> next_file_number_;
|
|
|
|
// entire metadata of all the BLOB files memory
|
|
std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
|
|
|
|
// epoch or version of the open files.
|
|
std::atomic<uint64_t> epoch_of_;
|
|
|
|
// All opened non-TTL blob files.
|
|
std::vector<std::shared_ptr<BlobFile>> open_simple_files_;
|
|
|
|
// all the blob files which are currently being appended to based
|
|
// on variety of incoming TTL's
|
|
std::multiset<std::shared_ptr<BlobFile>, blobf_compare_ttl> open_blob_files_;
|
|
|
|
// packet of information to put in lockess delete(s) queue
|
|
struct delete_packet_t {
|
|
ColumnFamilyHandle* cfh_;
|
|
std::string key_;
|
|
SequenceNumber dsn_;
|
|
};
|
|
|
|
struct override_packet_t {
|
|
uint64_t file_number_;
|
|
uint64_t key_size_;
|
|
uint64_t blob_offset_;
|
|
uint64_t blob_size_;
|
|
SequenceNumber dsn_;
|
|
};
|
|
|
|
// LOCKLESS multiple producer single consumer queue to quickly append
|
|
// deletes without taking lock. Can rapidly grow in size!!
|
|
// deletes happen in LSM, but minor book-keeping needs to happen on
|
|
// BLOB side (for triggering eviction)
|
|
mpsc_queue_t<delete_packet_t> delete_keys_q_;
|
|
|
|
// LOCKLESS multiple producer single consumer queue for values
|
|
// that are being compacted
|
|
mpsc_queue_t<override_packet_t> override_vals_q_;
|
|
|
|
// atomic bool to represent shutdown
|
|
std::atomic<bool> shutdown_;
|
|
|
|
// timer based queue to execute tasks
|
|
TimerQueue tqueue_;
|
|
|
|
// timer queues to call eviction callbacks.
|
|
std::vector<std::shared_ptr<TimerQueue>> cb_threads_;
|
|
|
|
// only accessed in GC thread, hence not atomic. The epoch of the
|
|
// GC task. Each execution is one epoch. Helps us in allocating
|
|
// files to one execution
|
|
uint64_t current_epoch_;
|
|
|
|
// number of files opened for random access/GET
|
|
// counter is used to monitor and close excess RA files.
|
|
std::atomic<uint32_t> open_file_count_;
|
|
|
|
// should hold mutex to modify
|
|
// STATISTICS for WA of Blob Files due to GC
|
|
// collect by default 24 hourly periods
|
|
std::list<uint64_t> all_periods_write_;
|
|
std::list<uint64_t> all_periods_ampl_;
|
|
|
|
std::atomic<uint64_t> last_period_write_;
|
|
std::atomic<uint64_t> last_period_ampl_;
|
|
|
|
uint64_t total_periods_write_;
|
|
uint64_t total_periods_ampl_;
|
|
|
|
// total size of all blob files at a given time
|
|
std::atomic<uint64_t> total_blob_space_;
|
|
std::list<std::shared_ptr<BlobFile>> obsolete_files_;
|
|
bool open_p1_done_;
|
|
|
|
uint32_t debug_level_;
|
|
};
|
|
|
|
class BlobFile {
|
|
friend class BlobDBImpl;
|
|
friend struct blobf_compare_ttl;
|
|
|
|
private:
|
|
// access to parent
|
|
const BlobDBImpl* parent_;
|
|
|
|
// path to blob directory
|
|
std::string path_to_dir_;
|
|
|
|
// the id of the file.
|
|
// the above 2 are created during file creation and never changed
|
|
// after that
|
|
uint64_t file_number_;
|
|
|
|
// number of blobs in the file
|
|
std::atomic<uint64_t> blob_count_;
|
|
|
|
// the file will be selected for GC in this future epoch
|
|
std::atomic<int64_t> gc_epoch_;
|
|
|
|
// size of the file
|
|
std::atomic<uint64_t> file_size_;
|
|
|
|
// number of blobs in this particular file which have been evicted
|
|
uint64_t deleted_count_;
|
|
|
|
// size of deleted blobs (used by heuristic to select file for GC)
|
|
uint64_t deleted_size_;
|
|
|
|
BlobLogHeader header_;
|
|
|
|
// closed_ = true implies the file is no more mutable
|
|
// no more blobs will be appended and the footer has been written out
|
|
std::atomic<bool> closed_;
|
|
|
|
// has a pass of garbage collection successfully finished on this file
|
|
// can_be_deleted_ still needs to do iterator/snapshot checks
|
|
std::atomic<bool> can_be_deleted_;
|
|
|
|
// should this file been gc'd once to reconcile lost deletes/compactions
|
|
std::atomic<bool> gc_once_after_open_;
|
|
|
|
// et - lt of the blobs
|
|
ttlrange_t ttl_range_;
|
|
|
|
// et - lt of the timestamp of the KV pairs.
|
|
tsrange_t time_range_;
|
|
|
|
// ESN - LSN of the blobs
|
|
snrange_t sn_range_;
|
|
|
|
// Sequential/Append writer for blobs
|
|
std::shared_ptr<Writer> log_writer_;
|
|
|
|
// random access file reader for GET calls
|
|
std::shared_ptr<RandomAccessFileReader> ra_file_reader_;
|
|
|
|
// This Read-Write mutex is per file specific and protects
|
|
// all the datastructures
|
|
mutable port::RWMutex mutex_;
|
|
|
|
// time when the random access reader was last created.
|
|
std::atomic<std::time_t> last_access_;
|
|
|
|
// last time file was fsync'd/fdatasyncd
|
|
std::atomic<uint64_t> last_fsync_;
|
|
|
|
bool header_valid_;
|
|
|
|
public:
|
|
BlobFile();
|
|
|
|
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum);
|
|
|
|
~BlobFile();
|
|
|
|
ColumnFamilyHandle* GetColumnFamily(DB* db);
|
|
|
|
// Returns log file's pathname relative to the main db dir
|
|
// Eg. For a live-log-file = blob_dir/000003.blob
|
|
std::string PathName() const;
|
|
|
|
// Primary identifier for blob file.
|
|
// once the file is created, this never changes
|
|
uint64_t BlobFileNumber() const { return file_number_; }
|
|
|
|
// the following functions are atomic, and don't need
|
|
// read lock
|
|
uint64_t BlobCount() const {
|
|
return blob_count_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
std::string DumpState() const;
|
|
|
|
// if the file has gone through GC and blobs have been relocated
|
|
bool Obsolete() const { return can_be_deleted_.load(); }
|
|
|
|
// if the file is not taking any more appends.
|
|
bool Immutable() const { return closed_.load(); }
|
|
|
|
// we will assume this is atomic
|
|
bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const;
|
|
|
|
uint64_t GetFileSize() const {
|
|
return file_size_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
// All Get functions which are not atomic, will need ReadLock on the mutex
|
|
tsrange_t GetTimeRange() const {
|
|
assert(HasTimestamp());
|
|
return time_range_;
|
|
}
|
|
|
|
ttlrange_t GetTTLRange() const { return ttl_range_; }
|
|
|
|
snrange_t GetSNRange() const { return sn_range_; }
|
|
|
|
bool HasTTL() const {
|
|
assert(header_valid_);
|
|
return header_.HasTTL();
|
|
}
|
|
|
|
bool HasTimestamp() const {
|
|
assert(header_valid_);
|
|
return header_.HasTimestamp();
|
|
}
|
|
|
|
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
|
|
|
|
void Fsync();
|
|
|
|
private:
|
|
std::shared_ptr<Reader> OpenSequentialReader(
|
|
Env* env, const DBOptions& db_options,
|
|
const EnvOptions& env_options) const;
|
|
|
|
Status ReadFooter(BlobLogFooter* footer);
|
|
|
|
Status WriteFooterAndCloseLocked();
|
|
|
|
std::shared_ptr<RandomAccessFileReader> GetOrOpenRandomAccessReader(
|
|
Env* env, const EnvOptions& env_options, bool* fresh_open);
|
|
|
|
void CloseRandomAccessLocked();
|
|
|
|
// this is used, when you are reading only the footer of a
|
|
// previously closed file
|
|
Status SetFromFooterLocked(const BlobLogFooter& footer);
|
|
|
|
void set_time_range(const tsrange_t& tr) { time_range_ = tr; }
|
|
|
|
void set_ttl_range(const ttlrange_t& ttl) { ttl_range_ = ttl; }
|
|
|
|
void SetSNRange(const snrange_t& snr) { sn_range_ = snr; }
|
|
|
|
// The following functions are atomic, and don't need locks
|
|
void SetFileSize(uint64_t fs) { file_size_ = fs; }
|
|
|
|
void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
|
|
|
|
void SetCanBeDeleted() { can_be_deleted_ = true; }
|
|
};
|
|
|
|
class BlobDBIterator : public Iterator {
|
|
public:
|
|
explicit BlobDBIterator(Iterator* iter, BlobDBImpl* impl, bool own_snapshot,
|
|
const Snapshot* snapshot)
|
|
: iter_(iter),
|
|
db_impl_(impl),
|
|
own_snapshot_(own_snapshot),
|
|
snapshot_(snapshot) {
|
|
assert(iter != nullptr);
|
|
assert(snapshot != nullptr);
|
|
}
|
|
|
|
~BlobDBIterator() {
|
|
if (own_snapshot_) {
|
|
db_impl_->ReleaseSnapshot(snapshot_);
|
|
}
|
|
delete iter_;
|
|
}
|
|
|
|
bool Valid() const override { return iter_->Valid(); }
|
|
|
|
void SeekToFirst() override { iter_->SeekToFirst(); }
|
|
|
|
void SeekToLast() override { iter_->SeekToLast(); }
|
|
|
|
void Seek(const Slice& target) override { iter_->Seek(target); }
|
|
|
|
void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); }
|
|
|
|
void Next() override { iter_->Next(); }
|
|
|
|
void Prev() override { iter_->Prev(); }
|
|
|
|
Slice key() const override { return iter_->key(); }
|
|
|
|
Slice value() const override;
|
|
|
|
Status status() const override { return iter_->status(); }
|
|
|
|
// Iterator::Refresh() not supported.
|
|
|
|
private:
|
|
Iterator* iter_;
|
|
BlobDBImpl* db_impl_;
|
|
bool own_snapshot_;
|
|
const Snapshot* snapshot_;
|
|
mutable std::string vpart_;
|
|
};
|
|
|
|
} // namespace blob_db
|
|
} // namespace rocksdb
|
|
#endif // ROCKSDB_LITE
|