5be2cb6948
Summary: Added compaction filter support for BlobDB non-TTL values. Same as vanilla RocksDB, user compaction filter applies to all k/v pairs of the compaction for non-TTL values. It honors `min_blob_size`, which potentially results value transitions between inlined data and stored-in-blob data when size of value is changed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6850 Reviewed By: siying Differential Revision: D22263487 Pulled By: ltamasi fbshipit-source-id: 8fc03f8cde2a5c831e63b436b3dbf1b7f90939e8
254 lines
7.5 KiB
C++
254 lines
7.5 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
#pragma once
|
|
#ifndef ROCKSDB_LITE
|
|
|
|
#include <atomic>
|
|
#include <limits>
|
|
#include <memory>
|
|
#include <unordered_set>
|
|
|
|
#include "db/blob/blob_log_format.h"
|
|
#include "db/blob/blob_log_reader.h"
|
|
#include "db/blob/blob_log_writer.h"
|
|
#include "file/random_access_file_reader.h"
|
|
#include "port/port.h"
|
|
#include "rocksdb/env.h"
|
|
#include "rocksdb/options.h"
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
namespace blob_db {
|
|
|
|
class BlobDBImpl;
|
|
|
|
class BlobFile {
|
|
friend class BlobDBImpl;
|
|
friend struct BlobFileComparator;
|
|
friend struct BlobFileComparatorTTL;
|
|
friend class BlobIndexCompactionFilterBase;
|
|
friend class BlobIndexCompactionFilterGC;
|
|
|
|
private:
|
|
// access to parent
|
|
const BlobDBImpl* parent_{nullptr};
|
|
|
|
// path to blob directory
|
|
std::string path_to_dir_;
|
|
|
|
// the id of the file.
|
|
// the above 2 are created during file creation and never changed
|
|
// after that
|
|
uint64_t file_number_{0};
|
|
|
|
// The file numbers of the SST files whose oldest blob file reference
|
|
// points to this blob file.
|
|
std::unordered_set<uint64_t> linked_sst_files_;
|
|
|
|
// Info log.
|
|
Logger* info_log_{nullptr};
|
|
|
|
// Column family id.
|
|
uint32_t column_family_id_{std::numeric_limits<uint32_t>::max()};
|
|
|
|
// Compression type of blobs in the file
|
|
CompressionType compression_{kNoCompression};
|
|
|
|
// If true, the keys in this file all has TTL. Otherwise all keys don't
|
|
// have TTL.
|
|
bool has_ttl_{false};
|
|
|
|
// TTL range of blobs in the file.
|
|
ExpirationRange expiration_range_;
|
|
|
|
// number of blobs in the file
|
|
std::atomic<uint64_t> blob_count_{0};
|
|
|
|
// size of the file
|
|
std::atomic<uint64_t> file_size_{0};
|
|
|
|
BlobLogHeader header_;
|
|
|
|
// closed_ = true implies the file is no more mutable
|
|
// no more blobs will be appended and the footer has been written out
|
|
std::atomic<bool> closed_{false};
|
|
|
|
// The latest sequence number when the file was closed/made immutable.
|
|
SequenceNumber immutable_sequence_{0};
|
|
|
|
// Whether the file was marked obsolete (due to either TTL or GC).
|
|
// obsolete_ still needs to do iterator/snapshot checks
|
|
std::atomic<bool> obsolete_{false};
|
|
|
|
// The last sequence number by the time the file marked as obsolete.
|
|
// Data in this file is visible to a snapshot taken before the sequence.
|
|
SequenceNumber obsolete_sequence_{0};
|
|
|
|
// Sequential/Append writer for blobs
|
|
std::shared_ptr<Writer> log_writer_;
|
|
|
|
// random access file reader for GET calls
|
|
std::shared_ptr<RandomAccessFileReader> ra_file_reader_;
|
|
|
|
// This Read-Write mutex is per file specific and protects
|
|
// all the datastructures
|
|
mutable port::RWMutex mutex_;
|
|
|
|
// time when the random access reader was last created.
|
|
std::atomic<std::int64_t> last_access_{-1};
|
|
|
|
// last time file was fsync'd/fdatasyncd
|
|
std::atomic<uint64_t> last_fsync_{0};
|
|
|
|
bool header_valid_{false};
|
|
|
|
bool footer_valid_{false};
|
|
|
|
public:
|
|
BlobFile() = default;
|
|
|
|
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
|
|
Logger* info_log);
|
|
|
|
BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
|
|
Logger* info_log, uint32_t column_family_id,
|
|
CompressionType compression, bool has_ttl,
|
|
const ExpirationRange& expiration_range);
|
|
|
|
~BlobFile();
|
|
|
|
uint32_t GetColumnFamilyId() const;
|
|
|
|
// Returns log file's absolute pathname.
|
|
std::string PathName() const;
|
|
|
|
// Primary identifier for blob file.
|
|
// once the file is created, this never changes
|
|
uint64_t BlobFileNumber() const { return file_number_; }
|
|
|
|
// Get the set of SST files whose oldest blob file reference points to
|
|
// this file.
|
|
const std::unordered_set<uint64_t>& GetLinkedSstFiles() const {
|
|
return linked_sst_files_;
|
|
}
|
|
|
|
// Link an SST file whose oldest blob file reference points to this file.
|
|
void LinkSstFile(uint64_t sst_file_number) {
|
|
assert(linked_sst_files_.find(sst_file_number) == linked_sst_files_.end());
|
|
linked_sst_files_.insert(sst_file_number);
|
|
}
|
|
|
|
// Unlink an SST file whose oldest blob file reference points to this file.
|
|
void UnlinkSstFile(uint64_t sst_file_number) {
|
|
auto it = linked_sst_files_.find(sst_file_number);
|
|
assert(it != linked_sst_files_.end());
|
|
linked_sst_files_.erase(it);
|
|
}
|
|
|
|
// the following functions are atomic, and don't need
|
|
// read lock
|
|
uint64_t BlobCount() const {
|
|
return blob_count_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
std::string DumpState() const;
|
|
|
|
// if the file is not taking any more appends.
|
|
bool Immutable() const { return closed_.load(); }
|
|
|
|
// Mark the file as immutable.
|
|
// REQUIRES: write lock held, or access from single thread (on DB open).
|
|
void MarkImmutable(SequenceNumber sequence) {
|
|
closed_ = true;
|
|
immutable_sequence_ = sequence;
|
|
}
|
|
|
|
SequenceNumber GetImmutableSequence() const {
|
|
assert(Immutable());
|
|
return immutable_sequence_;
|
|
}
|
|
|
|
// Whether the file was marked obsolete (due to either TTL or GC).
|
|
bool Obsolete() const {
|
|
assert(Immutable() || !obsolete_.load());
|
|
return obsolete_.load();
|
|
}
|
|
|
|
// Mark file as obsolete (due to either TTL or GC). The file is not visible to
|
|
// snapshots with sequence greater or equal to the given sequence.
|
|
void MarkObsolete(SequenceNumber sequence);
|
|
|
|
SequenceNumber GetObsoleteSequence() const {
|
|
assert(Obsolete());
|
|
return obsolete_sequence_;
|
|
}
|
|
|
|
// we will assume this is atomic
|
|
bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const;
|
|
|
|
Status Fsync();
|
|
|
|
uint64_t GetFileSize() const {
|
|
return file_size_.load(std::memory_order_acquire);
|
|
}
|
|
|
|
// All Get functions which are not atomic, will need ReadLock on the mutex
|
|
|
|
ExpirationRange GetExpirationRange() const { return expiration_range_; }
|
|
|
|
void ExtendExpirationRange(uint64_t expiration) {
|
|
expiration_range_.first = std::min(expiration_range_.first, expiration);
|
|
expiration_range_.second = std::max(expiration_range_.second, expiration);
|
|
}
|
|
|
|
bool HasTTL() const { return has_ttl_; }
|
|
|
|
void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
|
|
|
|
CompressionType GetCompressionType() const { return compression_; }
|
|
|
|
std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
|
|
|
|
// Read blob file header and footer. Return corruption if file header is
|
|
// malform or incomplete. If footer is malform or incomplete, set
|
|
// footer_valid_ to false and return Status::OK.
|
|
Status ReadMetadata(Env* env, const EnvOptions& env_options);
|
|
|
|
Status GetReader(Env* env, const EnvOptions& env_options,
|
|
std::shared_ptr<RandomAccessFileReader>* reader,
|
|
bool* fresh_open);
|
|
|
|
private:
|
|
std::shared_ptr<Reader> OpenRandomAccessReader(
|
|
Env* env, const DBOptions& db_options,
|
|
const EnvOptions& env_options) const;
|
|
|
|
Status ReadFooter(BlobLogFooter* footer);
|
|
|
|
Status WriteFooterAndCloseLocked(SequenceNumber sequence);
|
|
|
|
void CloseRandomAccessLocked();
|
|
|
|
// this is used, when you are reading only the footer of a
|
|
// previously closed file
|
|
Status SetFromFooterLocked(const BlobLogFooter& footer);
|
|
|
|
void set_expiration_range(const ExpirationRange& expiration_range) {
|
|
expiration_range_ = expiration_range;
|
|
}
|
|
|
|
// The following functions are atomic, and don't need locks
|
|
void SetFileSize(uint64_t fs) { file_size_ = fs; }
|
|
|
|
void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
|
|
|
|
void BlobRecordAdded(uint64_t record_size) {
|
|
++blob_count_;
|
|
file_size_ += record_size;
|
|
}
|
|
};
|
|
} // namespace blob_db
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
#endif // ROCKSDB_LITE
|