rocksdb/utilities/blob_db/blob_compaction_filter.h
mrambacher 3dff28cf9b Use SystemClock* instead of std::shared_ptr<SystemClock> in lower level routines (#8033)
Summary:
For performance purposes, the lower level routines were changed to use a SystemClock* instead of a std::shared_ptr<SystemClock>.  The shared ptr has some performance degradation on certain hardware classes.

For most of the system, there is no risk of the pointer being deleted/invalid because the shared_ptr will be stored elsewhere.  For example, the ImmutableDBOptions stores the Env which has a std::shared_ptr<SystemClock> in it.  The SystemClock* within the ImmutableDBOptions is essentially a "short cut" to gain access to this constant resource.

There were a few classes (PeriodicWorkScheduler?) where the "short cut" property did not hold.  In those cases, the shared pointer was preserved.

Using db_bench readrandom perf_level=3 on my EC2 box, this change performed as well or better than 6.17:

6.17: readrandom   :      28.046 micros/op 854902 ops/sec;   61.3 MB/s (355999 of 355999 found)
6.18: readrandom   :      32.615 micros/op 735306 ops/sec;   52.7 MB/s (290999 of 290999 found)
PR: readrandom   :      27.500 micros/op 871909 ops/sec;   62.5 MB/s (367999 of 367999 found)

(Note that the times for 6.18 are prior to revert of the SystemClock).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8033

Reviewed By: pdillinger

Differential Revision: D27014563

Pulled By: mrambacher

fbshipit-source-id: ad0459eba03182e454391b5926bf5cdd45657b67
2021-03-15 04:34:11 -07:00

205 lines
7.8 KiB
C++

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#ifndef ROCKSDB_LITE
#include <unordered_set>
#include "db/blob/blob_index.h"
#include "monitoring/statistics.h"
#include "rocksdb/compaction_filter.h"
#include "utilities/blob_db/blob_db_gc_stats.h"
#include "utilities/blob_db/blob_db_impl.h"
#include "utilities/compaction_filters/layered_compaction_filter_base.h"
namespace ROCKSDB_NAMESPACE {
class SystemClock;
namespace blob_db {
struct BlobCompactionContext {
BlobDBImpl* blob_db_impl = nullptr;
uint64_t next_file_number = 0;
std::unordered_set<uint64_t> current_blob_files;
SequenceNumber fifo_eviction_seq = 0;
uint64_t evict_expiration_up_to = 0;
};
struct BlobCompactionContextGC {
uint64_t cutoff_file_number = 0;
};
// Compaction filter that deletes expired blob indexes from the base DB.
// Comes into two varieties, one for the non-GC case and one for the GC case.
class BlobIndexCompactionFilterBase : public LayeredCompactionFilterBase {
public:
BlobIndexCompactionFilterBase(
BlobCompactionContext&& _context,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
uint64_t current_time, Statistics* stats)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
context_(std::move(_context)),
current_time_(current_time),
statistics_(stats) {}
~BlobIndexCompactionFilterBase() override;
// Filter expired blob indexes regardless of snapshots.
bool IgnoreSnapshots() const override { return true; }
Decision FilterV2(int level, const Slice& key, ValueType value_type,
const Slice& value, std::string* new_value,
std::string* skip_until) const override;
bool IsStackedBlobDbInternalCompactionFilter() const override { return true; }
protected:
bool IsBlobFileOpened() const;
virtual bool OpenNewBlobFileIfNeeded() const;
bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
PinnableSlice* blob, bool need_decompress,
CompressionType* compression_type) const;
bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
uint64_t* new_blob_file_number,
uint64_t* new_blob_offset) const;
bool CloseAndRegisterNewBlobFileIfNeeded() const;
bool CloseAndRegisterNewBlobFile() const;
Statistics* statistics() const { return statistics_; }
const BlobCompactionContext& context() const { return context_; }
private:
Decision HandleValueChange(const Slice& key, std::string* new_value) const;
private:
BlobCompactionContext context_;
const uint64_t current_time_;
Statistics* statistics_;
mutable std::shared_ptr<BlobFile> blob_file_;
mutable std::shared_ptr<BlobLogWriter> writer_;
// It is safe to not using std::atomic since the compaction filter, created
// from a compaction filter factroy, will not be called from multiple threads.
mutable uint64_t expired_count_ = 0;
mutable uint64_t expired_size_ = 0;
mutable uint64_t evicted_count_ = 0;
mutable uint64_t evicted_size_ = 0;
};
class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
public:
BlobIndexCompactionFilter(
BlobCompactionContext&& _context,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
uint64_t current_time, Statistics* stats)
: BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
std::move(_user_comp_filter_from_factory),
current_time, stats) {}
const char* Name() const override { return "BlobIndexCompactionFilter"; }
};
class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
public:
BlobIndexCompactionFilterGC(
BlobCompactionContext&& _context, BlobCompactionContextGC&& context_gc,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory,
uint64_t current_time, Statistics* stats)
: BlobIndexCompactionFilterBase(std::move(_context), _user_comp_filter,
std::move(_user_comp_filter_from_factory),
current_time, stats),
context_gc_(std::move(context_gc)) {}
~BlobIndexCompactionFilterGC() override;
const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value,
std::string* new_value) const override;
private:
bool OpenNewBlobFileIfNeeded() const override;
private:
BlobCompactionContextGC context_gc_;
mutable BlobDBGarbageCollectionStats gc_stats_;
};
// Compaction filter factory; similarly to the filters above, it comes
// in two flavors, one that creates filters that support GC, and one
// that creates non-GC filters.
class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
public:
BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl,
SystemClock* _clock,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
: blob_db_impl_(_blob_db_impl),
clock_(_clock),
statistics_(_statistics),
user_comp_filter_(_cf_options.compaction_filter),
user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
protected:
std::unique_ptr<CompactionFilter> CreateUserCompactionFilterFromFactory(
const CompactionFilter::Context& context) const;
BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
SystemClock* clock() const { return clock_; }
Statistics* statistics() const { return statistics_; }
const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
private:
BlobDBImpl* blob_db_impl_;
SystemClock* clock_;
Statistics* statistics_;
const CompactionFilter* user_comp_filter_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
class BlobIndexCompactionFilterFactory
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl,
SystemClock* _clock,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
: BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
_statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactory";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
};
class BlobIndexCompactionFilterFactoryGC
: public BlobIndexCompactionFilterFactoryBase {
public:
BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl,
SystemClock* _clock,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
: BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
_statistics) {}
const char* Name() const override {
return "BlobIndexCompactionFilterFactoryGC";
}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
};
} // namespace blob_db
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE