diff --git a/db/db_impl.cc b/db/db_impl.cc index 61a55de5d..d21a8cc0b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3276,10 +3276,10 @@ void DBImpl::WaitForIngestFile() { } } -Status DBImpl::StartTrace(const TraceOptions& /* options */, +Status DBImpl::StartTrace(const TraceOptions& trace_options, std::unique_ptr&& trace_writer) { InstrumentedMutexLock lock(&trace_mutex_); - tracer_.reset(new Tracer(env_, std::move(trace_writer))); + tracer_.reset(new Tracer(env_, trace_options, std::move(trace_writer))); return Status::OK(); } diff --git a/db/db_test2.cc b/db/db_test2.cc index afde6d153..2d6fadc35 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2688,6 +2688,74 @@ TEST_F(DBTest2, TraceAndReplay) { ASSERT_OK(DestroyDB(dbname2, options)); } +TEST_F(DBTest2, TraceWithLimit) { + Options options = CurrentOptions(); + options.merge_operator = MergeOperators::CreatePutOperator(); + ReadOptions ro; + WriteOptions wo; + TraceOptions trace_opts; + EnvOptions env_opts; + CreateAndReopenWithCF({"pikachu"}, options); + Random rnd(301); + + // test the max trace file size options + trace_opts.max_trace_file_size = 5; + std::string trace_filename = dbname_ + "/rocksdb.trace1"; + std::unique_ptr trace_writer; + ASSERT_OK(NewFileTraceWriter(env_, env_opts, trace_filename, &trace_writer)); + ASSERT_OK(db_->StartTrace(trace_opts, std::move(trace_writer))); + ASSERT_OK(Put(0, "a", "1")); + ASSERT_OK(Put(0, "b", "1")); + ASSERT_OK(Put(0, "c", "1")); + ASSERT_OK(db_->EndTrace()); + + std::string dbname2 = test::TmpDir(env_) + "/db_replay2"; + std::string value; + ASSERT_OK(DestroyDB(dbname2, options)); + + // Using a different name than db2, to pacify infer's use-after-lifetime + // warnings (http://fbinfer.com). + DB* db2_init = nullptr; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname2, &db2_init)); + ColumnFamilyHandle* cf; + ASSERT_OK( + db2_init->CreateColumnFamily(ColumnFamilyOptions(), "pikachu", &cf)); + delete cf; + delete db2_init; + + DB* db2 = nullptr; + std::vector column_families; + ColumnFamilyOptions cf_options; + cf_options.merge_operator = MergeOperators::CreatePutOperator(); + column_families.push_back(ColumnFamilyDescriptor("default", cf_options)); + column_families.push_back( + ColumnFamilyDescriptor("pikachu", ColumnFamilyOptions())); + std::vector handles; + ASSERT_OK(DB::Open(DBOptions(), dbname2, column_families, &handles, &db2)); + + env_->SleepForMicroseconds(100); + // Verify that the keys don't already exist + ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound()); + + std::unique_ptr trace_reader; + ASSERT_OK(NewFileTraceReader(env_, env_opts, trace_filename, &trace_reader)); + Replayer replayer(db2, handles_, std::move(trace_reader)); + ASSERT_OK(replayer.Replay()); + + ASSERT_TRUE(db2->Get(ro, handles[0], "a", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "b", &value).IsNotFound()); + ASSERT_TRUE(db2->Get(ro, handles[0], "c", &value).IsNotFound()); + + for (auto handle : handles) { + delete handle; + } + delete db2; + ASSERT_OK(DestroyDB(dbname2, options)); +} + #endif // ROCKSDB_LITE TEST_F(DBTest2, PinnableSliceAndMmapReads) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 6d2fc5267..c3ba44839 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1337,6 +1337,11 @@ struct IngestExternalFileOptions { bool write_global_seqno = true; }; -struct TraceOptions {}; +// TraceOptions is used for StartTrace +struct TraceOptions { + // To avoid the trace file size grows large than the storage space, + // user can set the max trace file size in Bytes. Default is 64GB + uint64_t max_trace_file_size = uint64_t{64} * 1024 * 1024 * 1024; +}; } // namespace rocksdb diff --git a/include/rocksdb/trace_reader_writer.h b/include/rocksdb/trace_reader_writer.h index 31226487b..28919a0fa 100644 --- a/include/rocksdb/trace_reader_writer.h +++ b/include/rocksdb/trace_reader_writer.h @@ -24,6 +24,7 @@ class TraceWriter { virtual Status Write(const Slice& data) = 0; virtual Status Close() = 0; + virtual uint64_t GetFileSize() = 0; }; // TraceReader allows reading RocksDB traces from any system, one operation at diff --git a/util/trace_replay.cc b/util/trace_replay.cc index b21f20b7e..5b9bec651 100644 --- a/util/trace_replay.cc +++ b/util/trace_replay.cc @@ -31,14 +31,20 @@ void DecodeCFAndKey(std::string& buffer, uint32_t* cf_id, Slice* key) { } } // namespace -Tracer::Tracer(Env* env, std::unique_ptr&& trace_writer) - : env_(env), trace_writer_(std::move(trace_writer)) { +Tracer::Tracer(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer) + : env_(env), + trace_options_(trace_options), + trace_writer_(std::move(trace_writer)) { WriteHeader(); } Tracer::~Tracer() { trace_writer_.reset(); } Status Tracer::Write(WriteBatch* write_batch) { + if (IsTraceFileOverMax()) { + return Status::OK(); + } Trace trace; trace.ts = env_->NowMicros(); trace.type = kTraceWrite; @@ -47,6 +53,9 @@ Status Tracer::Write(WriteBatch* write_batch) { } Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { + if (IsTraceFileOverMax()) { + return Status::OK(); + } Trace trace; trace.ts = env_->NowMicros(); trace.type = kTraceGet; @@ -55,6 +64,9 @@ Status Tracer::Get(ColumnFamilyHandle* column_family, const Slice& key) { } Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { + if (IsTraceFileOverMax()) { + return Status::OK(); + } Trace trace; trace.ts = env_->NowMicros(); trace.type = kTraceIteratorSeek; @@ -63,6 +75,9 @@ Status Tracer::IteratorSeek(const uint32_t& cf_id, const Slice& key) { } Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { + if (IsTraceFileOverMax()) { + return Status::OK(); + } Trace trace; trace.ts = env_->NowMicros(); trace.type = kTraceIteratorSeekForPrev; @@ -70,6 +85,11 @@ Status Tracer::IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key) { return WriteTrace(trace); } +bool Tracer::IsTraceFileOverMax() { + uint64_t trace_file_size = trace_writer_->GetFileSize(); + return (trace_file_size > trace_options_.max_trace_file_size); +} + Status Tracer::WriteHeader() { std::ostringstream s; s << kTraceMagic << "\t" diff --git a/util/trace_replay.h b/util/trace_replay.h index e120063c0..d935f65ce 100644 --- a/util/trace_replay.h +++ b/util/trace_replay.h @@ -10,6 +10,7 @@ #include #include "rocksdb/env.h" +#include "rocksdb/options.h" #include "rocksdb/trace_reader_writer.h" namespace rocksdb { @@ -55,13 +56,15 @@ struct Trace { // Trace RocksDB operations using a TraceWriter. class Tracer { public: - Tracer(Env* env, std::unique_ptr&& trace_writer); + Tracer(Env* env, const TraceOptions& trace_options, + std::unique_ptr&& trace_writer); ~Tracer(); Status Write(WriteBatch* write_batch); Status Get(ColumnFamilyHandle* cfname, const Slice& key); Status IteratorSeek(const uint32_t& cf_id, const Slice& key); Status IteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); + bool IsTraceFileOverMax(); Status Close(); @@ -71,6 +74,7 @@ class Tracer { Status WriteTrace(const Trace& trace); Env* env_; + TraceOptions trace_options_; std::unique_ptr trace_writer_; }; diff --git a/utilities/trace/file_trace_reader_writer.cc b/utilities/trace/file_trace_reader_writer.cc index 38767c092..4a81516a8 100644 --- a/utilities/trace/file_trace_reader_writer.cc +++ b/utilities/trace/file_trace_reader_writer.cc @@ -83,6 +83,8 @@ Status FileTraceWriter::Write(const Slice& data) { return file_writer_->Append(data); } +uint64_t FileTraceWriter::GetFileSize() { return file_writer_->GetFileSize(); } + Status NewFileTraceReader(Env* env, const EnvOptions& env_options, const std::string& trace_filename, std::unique_ptr* trace_reader) { diff --git a/utilities/trace/file_trace_reader_writer.h b/utilities/trace/file_trace_reader_writer.h index c0cc8882d..863f5d9d0 100644 --- a/utilities/trace/file_trace_reader_writer.h +++ b/utilities/trace/file_trace_reader_writer.h @@ -39,6 +39,7 @@ class FileTraceWriter : public TraceWriter { virtual Status Write(const Slice& data) override; virtual Status Close() override; + virtual uint64_t GetFileSize() override; private: std::unique_ptr file_writer_;