Implement trace sampling (#4963)
Summary: Implement trace sampling to allow user to specify the sampling frequency, i.e. save one per how many requests, so that a user does not need to log all if he/she is interested in only a sampled set. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4963 Differential Revision: D14011190 Pulled By: tang-jianfeng fbshipit-source-id: 078b631d9319b67cb089dd2c30e21d0df8dc406a
This commit is contained in:
parent
10d14693ac
commit
08809f5e6c
@ -8,6 +8,7 @@
|
||||
* For all users of dictionary compression who set `cache_index_and_filter_blocks == true`, we now store dictionary data used for decompression in the block cache for better control over memory usage. For users of ZSTD v1.1.4+ who compile with -DZSTD_STATIC_LINKING_ONLY, this includes a digested dictionary, which is used to increase decompression speed.
|
||||
* Add support for block checksums verification for external SST files before ingestion.
|
||||
* Add a place holder in manifest which indicate a record from future that can be safely ignored.
|
||||
* Add support for trace sampling.
|
||||
|
||||
### Public API Change
|
||||
* Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped.
|
||||
|
@ -2882,6 +2882,77 @@ TEST_F(DBTest2, TraceWithLimit) {
|
||||
ASSERT_OK(DestroyDB(dbname2, options));
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, TraceWithSampling) {
|
||||
Options options = CurrentOptions();
|
||||
ReadOptions ro;
|
||||
WriteOptions wo;
|
||||
TraceOptions trace_opts;
|
||||
EnvOptions env_opts;
|
||||
CreateAndReopenWithCF({"pikachu"}, options);
|
||||
Random rnd(301);
|
||||
|
||||
// test the trace file sampling options
|
||||
trace_opts.sampling_frequency = 2;
|
||||
std::string trace_filename = dbname_ + "/rocksdb.trace_sampling";
|
||||
std::unique_ptr<TraceWriter> trace_writer;
|
||||
ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer));
|
||||
ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer)));
|
||||
ASSERT_OK(Put(0, "a", "1"));
|
||||
ASSERT_OK(Put(0, "b", "2"));
|
||||
ASSERT_OK(Put(0, "c", "3"));
|
||||
ASSERT_OK(Put(0, "d", "4"));
|
||||
ASSERT_OK(Put(0, "e", "5"));
|
||||
ASSERT_OK(db_->EndTrace());
|
||||
|
||||
std::string dbname2 = test::TmpDir(env_) + "/db_replay_sampling";
|
||||
std::string value;
|
||||
ASSERT_OK(DestroyDB(dbname2, options));
|
||||
|
||||
// Using a different name than db2, to pacify infer's use-after-lifetime
|
||||
// warnings (http://fbinfer.com).
|
||||
DB* db2_init = nullptr;
|
||||
options.create_if_missing = true;
|
||||
ASSERT_OK(DB::Open(options, dbname2, &db2_init));
|
||||
ColumnFamilyHandle* cf;
|
||||
ASSERT_OK(
|
||||
db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf));
|
||||
delete cf;
|
||||
delete db2_init;
|
||||
|
||||
DB* db2 = nullptr;
|
||||
std::vector<ColumnFamilyDescriptor> column_families;
|
||||
ColumnFamilyOptions cf_options;
|
||||
column_families.push_back(ColumnFamilyDescriptor("default", cf_options));
|
||||
column_families.push_back(
|
||||
ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions()));
|
||||
std::vector<ColumnFamilyHandle*> handles;
|
||||
ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2));
|
||||
|
||||
env_->SleepForMicroseconds(100);
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
|
||||
|
||||
std::unique_ptr<TraceReader> trace_reader;
|
||||
ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader));
|
||||
Replayer replayer(db2, handles_, std::move(trace_reader));
|
||||
ASSERT_OK(replayer.Replay());
|
||||
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound());
|
||||
ASSERT_FALSE(db2->Get(ro, handles[0], "b", &value).IsNotFound());
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound());
|
||||
ASSERT_FALSE(db2->Get(ro, handles[0], "d", &value).IsNotFound());
|
||||
ASSERT_TRUE(db2->Get(ro, handles[0], "e", &value).IsNotFound());
|
||||
|
||||
for (auto handle : handles) {
|
||||
delete handle;
|
||||
}
|
||||
delete db2;
|
||||
ASSERT_OK(DestroyDB(dbname2, options));
|
||||
}
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest2, PinnableSliceAndMmapReads) {
|
||||
|
@ -1356,6 +1356,9 @@ struct TraceOptions {
|
||||
// To avoid the trace file size grows large than the storage space,
|
||||
// user can set the max trace file size in Bytes. Default is 64GB
|
||||
uint64_t max_trace_file_size = uint64_t{64} * 1024 * 1024 * 1024;
|
||||
// Specify trace sampling option, i.e. capture one per how many requests.
|
||||
// Default to 1 (capture every request).
|
||||
uint64_t sampling_frequency = 1;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
@ -35,14 +35,15 @@ Tracer::Tracer(Env* env, const TraceOptions& trace_options,
|
||||
std::unique_ptr<TraceWriter>&& trace_writer)
|
||||
: env_(env),
|
||||
trace_options_(trace_options),
|
||||
trace_writer_(std::move(trace_writer)) {
|
||||
trace_writer_(std::move(trace_writer)),
|
||||
trace_request_count_ (0) {
|
||||
WriteHeader();
|
||||
}
|
||||
|
||||
Tracer::~Tracer() { trace_writer_.reset(); }
|
||||
|
||||
Status Tracer::Write(WriteBatch* write_batch) {
|
||||
if (IsTraceFileOverMax()) {
|
||||
if (ShouldSkipTrace()) {
|
||||
return Status::OK();
|
||||
}
|
||||
Trace trace;
|
||||
@ -53,7 +54,7 @@ Status Tracer::Write(WriteBatch* write_batch) {
|
||||
}
|
||||
|
||||
Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
|
||||
if (IsTraceFileOverMax()) {
|
||||
if (ShouldSkipTrace()) {
|
||||
return Status::OK();
|
||||
}
|
||||
Trace trace;
|
||||
@ -64,7 +65,7 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) {
|
||||
}
|
||||
|
||||
Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
|
||||
if (IsTraceFileOverMax()) {
|
||||
if (ShouldSkipTrace()) {
|
||||
return Status::OK();
|
||||
}
|
||||
Trace trace;
|
||||
@ -75,7 +76,7 @@ Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) {
|
||||
}
|
||||
|
||||
Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
|
||||
if (IsTraceFileOverMax()) {
|
||||
if (ShouldSkipTrace()) {
|
||||
return Status::OK();
|
||||
}
|
||||
Trace trace;
|
||||
@ -85,6 +86,18 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) {
|
||||
return WriteTrace(trace);
|
||||
}
|
||||
|
||||
bool Tracer::ShouldSkipTrace() {
|
||||
if (IsTraceFileOverMax()) {
|
||||
return true;
|
||||
}
|
||||
++trace_request_count_;
|
||||
if (trace_request_count_ < trace_options_.sampling_frequency) {
|
||||
return true;
|
||||
}
|
||||
trace_request_count_ = 0;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool Tracer::IsTraceFileOverMax() {
|
||||
uint64_t trace_file_size = trace_writer_->GetFileSize();
|
||||
return (trace_file_size > trace_options_.max_trace_file_size);
|
||||
|
@ -72,10 +72,12 @@ class Tracer {
|
||||
Status WriteHeader();
|
||||
Status WriteFooter();
|
||||
Status WriteTrace(const Trace& trace);
|
||||
bool ShouldSkipTrace();
|
||||
|
||||
Env* env_;
|
||||
TraceOptions trace_options_;
|
||||
std::unique_ptr<TraceWriter> trace_writer_;
|
||||
uint64_t trace_request_count_;
|
||||
};
|
||||
|
||||
// Replay RocksDB operations from a trace.
|
||||
|
Loading…
Reference in New Issue
Block a user