Add full_history_ts_low_ to FlushJob (#7655)
Summary: https://github.com/facebook/rocksdb/issues/7556 enables `CompactionIterator` to perform garbage collection during compaction according to a lower bound (user-defined) timestamp `full_history_ts_low_`. This PR adds a data member `full_history_ts_low_` of type `std::string` to `FlushJob`, and `full_history_ts_low_` does not change during flush. `FlushJob` will pass a pointer to this data member to the `CompactionIterator` used during flush. Also refactored flush_job_test.cc to re-use some existing code, which is actually the majority of this PR. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7655 Test Plan: make check Reviewed By: ltamasi Differential Revision: D24933340 Pulled By: riversand963 fbshipit-source-id: 2e584bfd0cf6e5c295ab1af264e68e9d6a12fca3
This commit is contained in:
parent
bb69b4ce7f
commit
76ef894f9f
@ -69,8 +69,8 @@ TableBuilder* NewTableBuilder(
|
||||
}
|
||||
|
||||
Status BuildTable(
|
||||
const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs,
|
||||
const ImmutableCFOptions& ioptions,
|
||||
const std::string& dbname, VersionSet* versions,
|
||||
const ImmutableDBOptions& db_options, const ImmutableCFOptions& ioptions,
|
||||
const MutableCFOptions& mutable_cf_options, const FileOptions& file_options,
|
||||
TableCache* table_cache, InternalIterator* iter,
|
||||
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||
@ -91,7 +91,7 @@ Status BuildTable(
|
||||
TableProperties* table_properties, int level, const uint64_t creation_time,
|
||||
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
|
||||
const uint64_t file_creation_time, const std::string& db_id,
|
||||
const std::string& db_session_id) {
|
||||
const std::string& db_session_id, const std::string* full_history_ts_low) {
|
||||
assert((column_family_id ==
|
||||
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
|
||||
column_family_name.empty());
|
||||
@ -120,6 +120,10 @@ Status BuildTable(
|
||||
EventHelpers::NotifyTableFileCreationStarted(
|
||||
ioptions.listeners, dbname, column_family_name, fname, job_id, reason);
|
||||
#endif // !ROCKSDB_LITE
|
||||
Env* env = db_options.env;
|
||||
assert(env);
|
||||
FileSystem* fs = db_options.fs.get();
|
||||
assert(fs);
|
||||
TableProperties tp;
|
||||
if (iter->Valid() || !range_del_agg->IsEmpty()) {
|
||||
TableBuilder* builder;
|
||||
@ -180,7 +184,11 @@ Status BuildTable(
|
||||
&snapshots, earliest_write_conflict_snapshot, snapshot_checker, env,
|
||||
ShouldReportDetailedTime(env, ioptions.statistics),
|
||||
true /* internal key corruption is not ok */, range_del_agg.get(),
|
||||
blob_file_builder.get(), ioptions.allow_data_in_errors);
|
||||
blob_file_builder.get(), ioptions.allow_data_in_errors,
|
||||
/*compaction=*/nullptr,
|
||||
/*compaction_filter=*/nullptr, /*shutting_down=*/nullptr,
|
||||
/*preserve_deletes_seqnum=*/0, /*manual_compaction_paused=*/nullptr,
|
||||
db_options.info_log, full_history_ts_low);
|
||||
|
||||
c_iter.SeekToFirst();
|
||||
for (; c_iter.Valid(); c_iter.Next()) {
|
||||
|
@ -65,8 +65,8 @@ TableBuilder* NewTableBuilder(
|
||||
// @param column_family_name Name of the column family that is also identified
|
||||
// by column_family_id, or empty string if unknown.
|
||||
extern Status BuildTable(
|
||||
const std::string& dbname, VersionSet* versions, Env* env, FileSystem* fs,
|
||||
const ImmutableCFOptions& options,
|
||||
const std::string& dbname, VersionSet* versions,
|
||||
const ImmutableDBOptions& db_options, const ImmutableCFOptions& options,
|
||||
const MutableCFOptions& mutable_cf_options, const FileOptions& file_options,
|
||||
TableCache* table_cache, InternalIterator* iter,
|
||||
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
|
||||
@ -89,6 +89,7 @@ extern Status BuildTable(
|
||||
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,
|
||||
Env::WriteLifeTimeHint write_hint = Env::WLTH_NOT_SET,
|
||||
const uint64_t file_creation_time = 0, const std::string& db_id = "",
|
||||
const std::string& db_session_id = "");
|
||||
const std::string& db_session_id = "",
|
||||
const std::string* full_history_ts_low = nullptr);
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -1347,7 +1347,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
|
||||
|
||||
IOStatus io_s;
|
||||
s = BuildTable(
|
||||
dbname_, versions_.get(), env_, fs_.get(), *cfd->ioptions(),
|
||||
dbname_, versions_.get(), immutable_db_options_, *cfd->ioptions(),
|
||||
mutable_cf_options, file_options_for_compaction_, cfd->table_cache(),
|
||||
iter.get(), std::move(range_del_iters), &meta, &blob_file_additions,
|
||||
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
|
||||
|
@ -94,7 +94,8 @@ FlushJob::FlushJob(
|
||||
Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
|
||||
const bool sync_output_directory, const bool write_manifest,
|
||||
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const std::string& db_id, const std::string& db_session_id)
|
||||
const std::string& db_id, const std::string& db_session_id,
|
||||
std::string full_history_ts_low)
|
||||
: dbname_(dbname),
|
||||
db_id_(db_id),
|
||||
db_session_id_(db_session_id),
|
||||
@ -123,7 +124,8 @@ FlushJob::FlushJob(
|
||||
base_(nullptr),
|
||||
pick_memtable_called(false),
|
||||
thread_pri_(thread_pri),
|
||||
io_tracer_(io_tracer) {
|
||||
io_tracer_(io_tracer),
|
||||
full_history_ts_low_(std::move(full_history_ts_low)) {
|
||||
// Update the thread status to indicate flush.
|
||||
ReportStartedFlush();
|
||||
TEST_SYNC_POINT("FlushJob::FlushJob()");
|
||||
@ -398,13 +400,14 @@ Status FlushJob::WriteLevel0Table() {
|
||||
: meta_.oldest_ancester_time;
|
||||
|
||||
IOStatus io_s;
|
||||
const std::string* const full_history_ts_low =
|
||||
(full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
|
||||
s = BuildTable(
|
||||
dbname_, versions_, db_options_.env, db_options_.fs.get(),
|
||||
*cfd_->ioptions(), mutable_cf_options_, file_options_,
|
||||
cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
|
||||
&blob_file_additions, cfd_->internal_comparator(),
|
||||
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
|
||||
cfd_->GetName(), existing_snapshots_,
|
||||
dbname_, versions_, db_options_, *cfd_->ioptions(),
|
||||
mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
|
||||
std::move(range_del_iters), &meta_, &blob_file_additions,
|
||||
cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(),
|
||||
cfd_->GetID(), cfd_->GetName(), existing_snapshots_,
|
||||
earliest_write_conflict_snapshot_, snapshot_checker_,
|
||||
output_compression_, mutable_cf_options_.sample_for_compression,
|
||||
mutable_cf_options_.compression_opts,
|
||||
@ -412,7 +415,7 @@ Status FlushJob::WriteLevel0Table() {
|
||||
TableFileCreationReason::kFlush, &io_s, io_tracer_, event_logger_,
|
||||
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,
|
||||
creation_time, oldest_key_time, write_hint, current_time, db_id_,
|
||||
db_session_id_);
|
||||
db_session_id_, full_history_ts_low);
|
||||
if (!io_s.ok()) {
|
||||
io_status_ = io_s;
|
||||
}
|
||||
|
@ -73,8 +73,8 @@ class FlushJob {
|
||||
EventLogger* event_logger, bool measure_io_stats,
|
||||
const bool sync_output_directory, const bool write_manifest,
|
||||
Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
|
||||
const std::string& db_id = "",
|
||||
const std::string& db_session_id = "");
|
||||
const std::string& db_id = "", const std::string& db_session_id = "",
|
||||
std::string full_history_ts_low = "");
|
||||
|
||||
~FlushJob();
|
||||
|
||||
@ -164,6 +164,8 @@ class FlushJob {
|
||||
IOStatus io_status_;
|
||||
|
||||
const std::shared_ptr<IOTracer> io_tracer_;
|
||||
|
||||
const std::string full_history_ts_low_;
|
||||
};
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
@ -28,49 +28,33 @@ namespace ROCKSDB_NAMESPACE {
|
||||
// TODO(icanadi) Mock out everything else:
|
||||
// 1. VersionSet
|
||||
// 2. Memtable
|
||||
class FlushJobTest : public testing::Test {
|
||||
public:
|
||||
FlushJobTest()
|
||||
class FlushJobTestBase : public testing::Test {
|
||||
protected:
|
||||
FlushJobTestBase(std::string dbname, const Comparator* ucmp)
|
||||
: env_(Env::Default()),
|
||||
fs_(std::make_shared<LegacyFileSystemWrapper>(env_)),
|
||||
dbname_(test::PerThreadDBPath("flush_job_test")),
|
||||
dbname_(std::move(dbname)),
|
||||
ucmp_(ucmp),
|
||||
options_(),
|
||||
db_options_(options_),
|
||||
column_family_names_({kDefaultColumnFamilyName, "foo", "bar"}),
|
||||
table_cache_(NewLRUCache(50000, 16)),
|
||||
write_buffer_manager_(db_options_.db_write_buffer_size),
|
||||
shutting_down_(false),
|
||||
mock_table_factory_(new mock::MockTableFactory()) {
|
||||
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
|
||||
db_options_.db_paths.emplace_back(dbname_,
|
||||
std::numeric_limits<uint64_t>::max());
|
||||
db_options_.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
||||
// TODO(icanadi) Remove this once we mock out VersionSet
|
||||
NewDB();
|
||||
std::vector<ColumnFamilyDescriptor> column_families;
|
||||
cf_options_.table_factory = mock_table_factory_;
|
||||
for (const auto& cf_name : column_family_names_) {
|
||||
column_families.emplace_back(cf_name, cf_options_);
|
||||
}
|
||||
mock_table_factory_(new mock::MockTableFactory()) {}
|
||||
|
||||
db_options_.env = env_;
|
||||
db_options_.fs = fs_;
|
||||
versions_.reset(
|
||||
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
|
||||
&write_buffer_manager_, &write_controller_,
|
||||
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
|
||||
EXPECT_OK(versions_->Recover(column_families, false));
|
||||
virtual ~FlushJobTestBase() {
|
||||
if (getenv("KEEP_DB")) {
|
||||
fprintf(stdout, "db is still in %s\n", dbname_.c_str());
|
||||
} else {
|
||||
EXPECT_OK(DestroyDir(env_, dbname_));
|
||||
}
|
||||
}
|
||||
|
||||
void NewDB() {
|
||||
SetIdentityFile(env_, dbname_);
|
||||
VersionEdit new_db;
|
||||
if (db_options_.write_dbid_to_manifest) {
|
||||
DBImpl* impl = new DBImpl(DBOptions(), dbname_);
|
||||
std::string db_id;
|
||||
impl->GetDbIdentityFromIdentityFile(&db_id);
|
||||
new_db.SetDBId(db_id);
|
||||
}
|
||||
|
||||
new_db.SetLogNumber(0);
|
||||
new_db.SetNextFile(2);
|
||||
new_db.SetLastSequence(0);
|
||||
@ -82,6 +66,7 @@ class FlushJobTest : public testing::Test {
|
||||
VersionEdit new_cf;
|
||||
new_cf.AddColumnFamily(column_family_names_[i]);
|
||||
new_cf.SetColumnFamily(cf_id++);
|
||||
new_cf.SetComparatorName(ucmp_->Name());
|
||||
new_cf.SetLogNumber(0);
|
||||
new_cf.SetNextFile(2);
|
||||
new_cf.SetLastSequence(last_seq++);
|
||||
@ -114,9 +99,37 @@ class FlushJobTest : public testing::Test {
|
||||
ASSERT_OK(s);
|
||||
}
|
||||
|
||||
void SetUp() override {
|
||||
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
|
||||
|
||||
// TODO(icanadi) Remove this once we mock out VersionSet
|
||||
NewDB();
|
||||
|
||||
db_options_.env = env_;
|
||||
db_options_.fs = fs_;
|
||||
db_options_.db_paths.emplace_back(dbname_,
|
||||
std::numeric_limits<uint64_t>::max());
|
||||
db_options_.statistics = CreateDBStatistics();
|
||||
|
||||
cf_options_.comparator = ucmp_;
|
||||
|
||||
std::vector<ColumnFamilyDescriptor> column_families;
|
||||
cf_options_.table_factory = mock_table_factory_;
|
||||
for (const auto& cf_name : column_family_names_) {
|
||||
column_families.emplace_back(cf_name, cf_options_);
|
||||
}
|
||||
|
||||
versions_.reset(
|
||||
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
|
||||
&write_buffer_manager_, &write_controller_,
|
||||
/*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr));
|
||||
EXPECT_OK(versions_->Recover(column_families, false));
|
||||
}
|
||||
|
||||
Env* env_;
|
||||
std::shared_ptr<FileSystem> fs_;
|
||||
std::string dbname_;
|
||||
const Comparator* const ucmp_;
|
||||
EnvOptions env_options_;
|
||||
Options options_;
|
||||
ImmutableDBOptions db_options_;
|
||||
@ -131,6 +144,13 @@ class FlushJobTest : public testing::Test {
|
||||
std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
|
||||
};
|
||||
|
||||
class FlushJobTest : public FlushJobTestBase {
|
||||
public:
|
||||
FlushJobTest()
|
||||
: FlushJobTestBase(test::PerThreadDBPath("flush_job_test"),
|
||||
BytewiseComparator()) {}
|
||||
};
|
||||
|
||||
TEST_F(FlushJobTest, Empty) {
|
||||
JobContext job_context(0);
|
||||
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||
@ -487,6 +507,135 @@ TEST_F(FlushJobTest, Snapshots) {
|
||||
job_context.Clean();
|
||||
}
|
||||
|
||||
class FlushJobTimestampTest : public FlushJobTestBase {
|
||||
public:
|
||||
FlushJobTimestampTest()
|
||||
: FlushJobTestBase(test::PerThreadDBPath("flush_job_ts_gc_test"),
|
||||
test::ComparatorWithU64Ts()) {}
|
||||
|
||||
void AddKeyValueToMemtable(MemTable* memtable, std::string key, uint64_t ts,
|
||||
SequenceNumber seq, ValueType value_type,
|
||||
Slice value) {
|
||||
std::string key_str(std::move(key));
|
||||
PutFixed64(&key_str, ts);
|
||||
memtable->Add(seq, value_type, key_str, value);
|
||||
}
|
||||
|
||||
protected:
|
||||
static constexpr uint64_t kStartTs = 10;
|
||||
static constexpr SequenceNumber kStartSeq = 0;
|
||||
SequenceNumber curr_seq_{kStartSeq};
|
||||
std::atomic<uint64_t> curr_ts_{kStartTs};
|
||||
};
|
||||
|
||||
TEST_F(FlushJobTimestampTest, AllKeysExpired) {
|
||||
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||
autovector<MemTable*> to_delete;
|
||||
|
||||
{
|
||||
MemTable* new_mem = cfd->ConstructNewMemtable(
|
||||
*cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
|
||||
new_mem->Ref();
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
uint64_t ts = curr_ts_.fetch_add(1);
|
||||
SequenceNumber seq = (curr_seq_++);
|
||||
AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
|
||||
ValueType::kTypeValue, "0_value");
|
||||
}
|
||||
uint64_t ts = curr_ts_.fetch_add(1);
|
||||
SequenceNumber seq = (curr_seq_++);
|
||||
AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
|
||||
ValueType::kTypeDeletionWithTimestamp, "");
|
||||
cfd->imm()->Add(new_mem, &to_delete);
|
||||
}
|
||||
|
||||
std::vector<SequenceNumber> snapshots;
|
||||
constexpr SnapshotChecker* const snapshot_checker = nullptr;
|
||||
JobContext job_context(0);
|
||||
EventLogger event_logger(db_options_.info_log.get());
|
||||
std::string full_history_ts_low;
|
||||
PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
|
||||
FlushJob flush_job(
|
||||
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
|
||||
nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_,
|
||||
&shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker,
|
||||
&job_context, nullptr, nullptr, nullptr, kNoCompression,
|
||||
db_options_.statistics.get(), &event_logger, true,
|
||||
true /* sync_output_directory */, true /* write_manifest */,
|
||||
Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"",
|
||||
/*db_session_id=*/"", full_history_ts_low);
|
||||
|
||||
FileMetaData fmeta;
|
||||
mutex_.Lock();
|
||||
flush_job.PickMemTable();
|
||||
ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta));
|
||||
mutex_.Unlock();
|
||||
|
||||
{
|
||||
std::string key = test::EncodeInt(0);
|
||||
key.append(test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1));
|
||||
InternalKey ikey(key, curr_seq_ - 1, ValueType::kTypeDeletionWithTimestamp);
|
||||
ASSERT_EQ(ikey.Encode(), fmeta.smallest.Encode());
|
||||
ASSERT_EQ(ikey.Encode(), fmeta.largest.Encode());
|
||||
}
|
||||
|
||||
job_context.Clean();
|
||||
ASSERT_TRUE(to_delete.empty());
|
||||
}
|
||||
|
||||
TEST_F(FlushJobTimestampTest, NoKeyExpired) {
|
||||
ColumnFamilyData* cfd = versions_->GetColumnFamilySet()->GetDefault();
|
||||
autovector<MemTable*> to_delete;
|
||||
|
||||
{
|
||||
MemTable* new_mem = cfd->ConstructNewMemtable(
|
||||
*cfd->GetLatestMutableCFOptions(), kMaxSequenceNumber);
|
||||
new_mem->Ref();
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
uint64_t ts = curr_ts_.fetch_add(1);
|
||||
SequenceNumber seq = (curr_seq_++);
|
||||
AddKeyValueToMemtable(new_mem, test::EncodeInt(0), ts, seq,
|
||||
ValueType::kTypeValue, "0_value");
|
||||
}
|
||||
cfd->imm()->Add(new_mem, &to_delete);
|
||||
}
|
||||
|
||||
std::vector<SequenceNumber> snapshots;
|
||||
SnapshotChecker* const snapshot_checker = nullptr;
|
||||
JobContext job_context(0);
|
||||
EventLogger event_logger(db_options_.info_log.get());
|
||||
std::string full_history_ts_low;
|
||||
PutFixed64(&full_history_ts_low, 0);
|
||||
FlushJob flush_job(
|
||||
dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(),
|
||||
nullptr /* memtable_id */, env_options_, versions_.get(), &mutex_,
|
||||
&shutting_down_, snapshots, kMaxSequenceNumber, snapshot_checker,
|
||||
&job_context, nullptr, nullptr, nullptr, kNoCompression,
|
||||
db_options_.statistics.get(), &event_logger, true,
|
||||
true /* sync_output_directory */, true /* write_manifest */,
|
||||
Env::Priority::USER, nullptr /*IOTracer*/, /*db_id=*/"",
|
||||
/*db_session_id=*/"", full_history_ts_low);
|
||||
|
||||
FileMetaData fmeta;
|
||||
mutex_.Lock();
|
||||
flush_job.PickMemTable();
|
||||
ASSERT_OK(flush_job.Run(/*prep_tracker=*/nullptr, &fmeta));
|
||||
mutex_.Unlock();
|
||||
|
||||
{
|
||||
std::string ukey = test::EncodeInt(0);
|
||||
std::string smallest_key =
|
||||
ukey + test::EncodeInt(curr_ts_.load(std::memory_order_relaxed) - 1);
|
||||
std::string largest_key = ukey + test::EncodeInt(kStartTs);
|
||||
InternalKey smallest(smallest_key, curr_seq_ - 1, ValueType::kTypeValue);
|
||||
InternalKey largest(largest_key, kStartSeq, ValueType::kTypeValue);
|
||||
ASSERT_EQ(smallest.Encode(), fmeta.smallest.Encode());
|
||||
ASSERT_EQ(largest.Encode(), fmeta.largest.Encode());
|
||||
}
|
||||
job_context.Clean();
|
||||
ASSERT_TRUE(to_delete.empty());
|
||||
}
|
||||
|
||||
} // namespace ROCKSDB_NAMESPACE
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
@ -442,9 +442,9 @@ class Repairer {
|
||||
LegacyFileSystemWrapper fs(env_);
|
||||
IOStatus io_s;
|
||||
status = BuildTable(
|
||||
dbname_, /* versions */ nullptr, env_, &fs, *cfd->ioptions(),
|
||||
*cfd->GetLatestMutableCFOptions(), env_options_, table_cache_.get(),
|
||||
iter.get(), std::move(range_del_iters), &meta,
|
||||
dbname_, /* versions */ nullptr, immutable_db_options_,
|
||||
*cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), env_options_,
|
||||
table_cache_.get(), iter.get(), std::move(range_del_iters), &meta,
|
||||
nullptr /* blob_file_additions */, cfd->internal_comparator(),
|
||||
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
|
||||
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
|
||||
|
Loading…
x
Reference in New Issue
Block a user