// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). #pragma once #include <memory> #include <unordered_map> #include <utility> #include "rocksdb/env.h" #include "rocksdb/options.h" #include "rocksdb/trace_reader_writer.h" namespace ROCKSDB_NAMESPACE { // This file contains Tracer and Replayer classes that enable capturing and // replaying RocksDB traces. class ColumnFamilyHandle; class ColumnFamilyData; class DB; class DBImpl; class Slice; class WriteBatch; extern const std::string kTraceMagic; const unsigned int kTraceTimestampSize = 8; const unsigned int kTraceTypeSize = 1; const unsigned int kTracePayloadLengthSize = 4; const unsigned int kTraceMetadataSize = kTraceTimestampSize + kTraceTypeSize + kTracePayloadLengthSize; // Supported Trace types. enum TraceType : char { kTraceBegin = 1, kTraceEnd = 2, kTraceWrite = 3, kTraceGet = 4, kTraceIteratorSeek = 5, kTraceIteratorSeekForPrev = 6, // Block cache related types. kBlockTraceIndexBlock = 7, kBlockTraceFilterBlock = 8, kBlockTraceDataBlock = 9, kBlockTraceUncompressionDictBlock = 10, kBlockTraceRangeDeletionBlock = 11, // IO Trace related types based on options that will be added in trace file. kIOGeneral = 12, kIOFileName = 13, kIOFileNameAndFileSize = 14, kIOLen = 15, kIOLenAndOffset = 16, // All trace types should be added before kTraceMax kTraceMax, }; // TODO: This should also be made part of public interface to help users build // custom TracerReaders and TraceWriters. // // The data structure that defines a single trace. struct Trace { uint64_t ts; // timestamp TraceType type; std::string payload; void reset() { ts = 0; type = kTraceMax; payload.clear(); } }; class TracerHelper { public: // Encode a trace object into the given string. static void EncodeTrace(const Trace& trace, std::string* encoded_trace); // Decode a string into the given trace object. static Status DecodeTrace(const std::string& encoded_trace, Trace* trace); }; // Tracer captures all RocksDB operations using a user-provided TraceWriter. // Every RocksDB operation is written as a single trace. Each trace will have a // timestamp and type, followed by the trace payload. class Tracer { public: Tracer(Env* env, const TraceOptions& trace_options, std::unique_ptr<TraceWriter>&& trace_writer); ~Tracer(); // Trace all write operations -- Put, Merge, Delete, SingleDelete, Write Status Write(WriteBatch* write_batch); // Trace Get operations. Status Get(ColumnFamilyHandle* cfname, const Slice& key); // Trace Iterators. Status IteratorSeek(const uint32_t& cf_id, const Slice& key); Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); // Returns true if the trace is over the configured max trace file limit. // False otherwise. bool IsTraceFileOverMax(); // Writes a trace footer at the end of the tracing Status Close(); private: // Write a trace header at the beginning, typically on initiating a trace, // with some metadata like a magic number, trace version, RocksDB version, and // trace format. Status WriteHeader(); // Write a trace footer, typically on ending a trace, with some metadata. Status WriteFooter(); // Write a single trace using the provided TraceWriter to the underlying // system, say, a filesystem or a streaming service. Status WriteTrace(const Trace& trace); // Helps in filtering and sampling of traces. // Returns true if a trace should be skipped, false otherwise. bool ShouldSkipTrace(const TraceType& type); Env* env_; TraceOptions trace_options_; std::unique_ptr<TraceWriter> trace_writer_; uint64_t trace_request_count_; }; // Replayer helps to replay the captured RocksDB operations, using a user // provided TraceReader. // The Replayer is instantiated via db_bench today, on using "replay" benchmark. class Replayer { public: Replayer(DB* db, const std::vector<ColumnFamilyHandle*>& handles, std::unique_ptr<TraceReader>&& reader); ~Replayer(); // Replay all the traces from the provided trace stream, taking the delay // between the traces into consideration. Status Replay(); // Replay the provide trace stream, which is the same as Replay(), with // multi-threads. Queries are scheduled in the thread pool job queue. // User can set the number of threads in the thread pool. Status MultiThreadReplay(uint32_t threads_num); // Enables fast forwarding a replay by reducing the delay between the ingested // traces. // fast_forward : Rate of replay speedup. // If 1, replay the operations at the same rate as in the trace stream. // If > 1, speed up the replay by this amount. Status SetFastForward(uint32_t fast_forward); private: Status ReadHeader(Trace* header); Status ReadFooter(Trace* footer); Status ReadTrace(Trace* trace); // The background function for MultiThreadReplay to execute Get query // based on the trace records. static void BGWorkGet(void* arg); // The background function for MultiThreadReplay to execute WriteBatch // (Put, Delete, SingleDelete, DeleteRange) based on the trace records. static void BGWorkWriteBatch(void* arg); // The background function for MultiThreadReplay to execute Iterator (Seek) // based on the trace records. static void BGWorkIterSeek(void* arg); // The background function for MultiThreadReplay to execute Iterator // (SeekForPrev) based on the trace records. static void BGWorkIterSeekForPrev(void* arg); DBImpl* db_; Env* env_; std::unique_ptr<TraceReader> trace_reader_; std::unordered_map<uint32_t, ColumnFamilyHandle*> cf_map_; uint32_t fast_forward_; }; // The passin arg of MultiThreadRepkay for each trace record. struct ReplayerWorkerArg { DB* db; Trace trace_entry; std::unordered_map<uint32_t, ColumnFamilyHandle*>* cf_map; WriteOptions woptions; ReadOptions roptions; }; } // namespace ROCKSDB_NAMESPACE