6a17b07ca8
Summary: Update SstFileWriter to use user TablePropertiesCollectors that are passed in Options Test Plan: unittests Reviewers: sdong Reviewed By: sdong Subscribers: jkedgar, andrewkr, hermanlee4, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D62253
1884 lines
65 KiB
C++
1884 lines
65 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under the BSD-style license found in the
|
|
// LICENSE file in the root directory of this source tree. An additional grant
|
|
// of patent rights can be found in the PATENTS file in the same directory.
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
|
|
#include "db/db_test_util.h"
|
|
#include "port/stack_trace.h"
|
|
#include "rocksdb/sst_file_manager.h"
|
|
#include "rocksdb/sst_file_writer.h"
|
|
#include "util/sst_file_manager_impl.h"
|
|
#include "port/port.h"
|
|
|
|
namespace rocksdb {
|
|
|
|
class DBSSTTest : public DBTestBase {
|
|
public:
|
|
DBSSTTest() : DBTestBase("/db_sst_test") {}
|
|
};
|
|
|
|
TEST_F(DBSSTTest, DontDeletePendingOutputs) {
|
|
Options options;
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
DestroyAndReopen(options);
|
|
|
|
// Every time we write to a table file, call FOF/POF with full DB scan. This
|
|
// will make sure our pending_outputs_ protection work correctly
|
|
std::function<void()> purge_obsolete_files_function = [&]() {
|
|
JobContext job_context(0);
|
|
dbfull()->TEST_LockMutex();
|
|
dbfull()->FindObsoleteFiles(&job_context, true /*force*/);
|
|
dbfull()->TEST_UnlockMutex();
|
|
dbfull()->PurgeObsoleteFiles(job_context);
|
|
job_context.Clean();
|
|
};
|
|
|
|
env_->table_write_callback_ = &purge_obsolete_files_function;
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
ASSERT_OK(Put("a", "begin"));
|
|
ASSERT_OK(Put("z", "end"));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
// If pending output guard does not work correctly, PurgeObsoleteFiles() will
|
|
// delete the file that Compaction is trying to create, causing this: error
|
|
// db/db_test.cc:975: IO error:
|
|
// /tmp/rocksdbtest-1552237650/db_test/000009.sst: No such file or directory
|
|
Compact("a", "b");
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBSSTTest, DontDeleteMovedFile) {
|
|
// This test triggers move compaction and verifies that the file is not
|
|
// deleted when it's part of move compaction
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
|
|
options.level0_file_num_compaction_trigger =
|
|
2; // trigger compaction when we have 2 files
|
|
DestroyAndReopen(options);
|
|
|
|
Random rnd(301);
|
|
// Create two 1MB sst files
|
|
for (int i = 0; i < 2; ++i) {
|
|
// Create 1MB sst file
|
|
for (int j = 0; j < 100; ++j) {
|
|
ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
// this should execute both L0->L1 and L1->(move)->L2 compactions
|
|
dbfull()->TEST_WaitForCompact();
|
|
ASSERT_EQ("0,0,1", FilesPerLevel(0));
|
|
|
|
// If the moved file is actually deleted (the move-safeguard in
|
|
// ~Version::Version() is not there), we get this failure:
|
|
// Corruption: Can't access /000009.sst
|
|
Reopen(options);
|
|
}
|
|
|
|
// This reproduces a bug where we don't delete a file because when it was
|
|
// supposed to be deleted, it was blocked by pending_outputs
|
|
// Consider:
|
|
// 1. current file_number is 13
|
|
// 2. compaction (1) starts, blocks deletion of all files starting with 13
|
|
// (pending outputs)
|
|
// 3. file 13 is created by compaction (2)
|
|
// 4. file 13 is consumed by compaction (3) and file 15 was created. Since file
|
|
// 13 has no references, it is put into VersionSet::obsolete_files_
|
|
// 5. FindObsoleteFiles() gets file 13 from VersionSet::obsolete_files_. File 13
|
|
// is deleted from obsolete_files_ set.
|
|
// 6. PurgeObsoleteFiles() tries to delete file 13, but this file is blocked by
|
|
// pending outputs since compaction (1) is still running. It is not deleted and
|
|
// it is not present in obsolete_files_ anymore. Therefore, we never delete it.
|
|
TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.write_buffer_size = 2 * 1024 * 1024; // 2 MB
|
|
options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
|
|
options.level0_file_num_compaction_trigger =
|
|
2; // trigger compaction when we have 2 files
|
|
options.max_background_flushes = 2;
|
|
options.max_background_compactions = 2;
|
|
|
|
OnFileDeletionListener* listener = new OnFileDeletionListener();
|
|
options.listeners.emplace_back(listener);
|
|
|
|
Reopen(options);
|
|
|
|
Random rnd(301);
|
|
// Create two 1MB sst files
|
|
for (int i = 0; i < 2; ++i) {
|
|
// Create 1MB sst file
|
|
for (int j = 0; j < 100; ++j) {
|
|
ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
// this should execute both L0->L1 and L1->(move)->L2 compactions
|
|
dbfull()->TEST_WaitForCompact();
|
|
ASSERT_EQ("0,0,1", FilesPerLevel(0));
|
|
|
|
test::SleepingBackgroundTask blocking_thread;
|
|
port::Mutex mutex_;
|
|
bool already_blocked(false);
|
|
|
|
// block the flush
|
|
std::function<void()> block_first_time = [&]() {
|
|
bool blocking = false;
|
|
{
|
|
MutexLock l(&mutex_);
|
|
if (!already_blocked) {
|
|
blocking = true;
|
|
already_blocked = true;
|
|
}
|
|
}
|
|
if (blocking) {
|
|
blocking_thread.DoSleep();
|
|
}
|
|
};
|
|
env_->table_write_callback_ = &block_first_time;
|
|
// Insert 2.5MB data, which should trigger a flush because we exceed
|
|
// write_buffer_size. The flush will be blocked with block_first_time
|
|
// pending_file is protecting all the files created after
|
|
for (int j = 0; j < 256; ++j) {
|
|
ASSERT_OK(Put(Key(j), RandomString(&rnd, 10 * 1024)));
|
|
}
|
|
blocking_thread.WaitUntilSleeping();
|
|
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr));
|
|
|
|
ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
|
|
std::vector<LiveFileMetaData> metadata;
|
|
db_->GetLiveFilesMetaData(&metadata);
|
|
ASSERT_EQ(metadata.size(), 1U);
|
|
auto file_on_L2 = metadata[0].name;
|
|
listener->SetExpectedFileName(dbname_ + file_on_L2);
|
|
|
|
ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr,
|
|
true /* disallow trivial move */));
|
|
ASSERT_EQ("0,0,0,0,1", FilesPerLevel(0));
|
|
|
|
// finish the flush!
|
|
blocking_thread.WakeUp();
|
|
blocking_thread.WaitUntilDone();
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
ASSERT_EQ("1,0,0,0,1", FilesPerLevel(0));
|
|
|
|
metadata.clear();
|
|
db_->GetLiveFilesMetaData(&metadata);
|
|
ASSERT_EQ(metadata.size(), 2U);
|
|
|
|
// This file should have been deleted during last compaction
|
|
ASSERT_EQ(Status::NotFound(), env_->FileExists(dbname_ + file_on_L2));
|
|
listener->VerifyMatchedCount(1);
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
TEST_F(DBSSTTest, DBWithSstFileManager) {
|
|
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
|
|
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
|
|
|
|
int files_added = 0;
|
|
int files_deleted = 0;
|
|
int files_moved = 0;
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"SstFileManagerImpl::OnAddFile", [&](void* arg) { files_added++; });
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"SstFileManagerImpl::OnDeleteFile", [&](void* arg) { files_deleted++; });
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"SstFileManagerImpl::OnMoveFile", [&](void* arg) { files_moved++; });
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Options options = CurrentOptions();
|
|
options.sst_file_manager = sst_file_manager;
|
|
DestroyAndReopen(options);
|
|
|
|
Random rnd(301);
|
|
for (int i = 0; i < 25; i++) {
|
|
GenerateNewRandomFile(&rnd);
|
|
ASSERT_OK(Flush());
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
dbfull()->TEST_WaitForCompact();
|
|
// Verify that we are tracking all sst files in dbname_
|
|
ASSERT_EQ(sfm->GetTrackedFiles(), GetAllSSTFiles());
|
|
}
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
auto files_in_db = GetAllSSTFiles();
|
|
// Verify that we are tracking all sst files in dbname_
|
|
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
|
|
// Verify the total files size
|
|
uint64_t total_files_size = 0;
|
|
for (auto& file_to_size : files_in_db) {
|
|
total_files_size += file_to_size.second;
|
|
}
|
|
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
|
|
// We flushed at least 25 files
|
|
ASSERT_GE(files_added, 25);
|
|
// Compaction must have deleted some files
|
|
ASSERT_GT(files_deleted, 0);
|
|
// No files were moved
|
|
ASSERT_EQ(files_moved, 0);
|
|
|
|
Close();
|
|
Reopen(options);
|
|
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
|
|
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
|
|
|
|
// Verify that we track all the files again after the DB is closed and opened
|
|
Close();
|
|
sst_file_manager.reset(NewSstFileManager(env_));
|
|
options.sst_file_manager = sst_file_manager;
|
|
sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
|
|
|
|
Reopen(options);
|
|
ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
|
|
ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBSSTTest, RateLimitedDelete) {
|
|
Destroy(last_options_);
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBSSTTest::RateLimitedDelete:1",
|
|
"DeleteScheduler::BackgroundEmptyTrash"},
|
|
});
|
|
|
|
std::vector<uint64_t> penalties;
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"DeleteScheduler::BackgroundEmptyTrash:Wait",
|
|
[&](void* arg) { penalties.push_back(*(static_cast<int*>(arg))); });
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.env = env_;
|
|
|
|
std::string trash_dir = test::TmpDir(env_) + "/trash";
|
|
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
|
|
Status s;
|
|
options.sst_file_manager.reset(NewSstFileManager(
|
|
env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
|
|
ASSERT_OK(s);
|
|
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
|
|
|
|
ASSERT_OK(TryReopen(options));
|
|
// Create 4 files in L0
|
|
for (char v = 'a'; v <= 'd'; v++) {
|
|
ASSERT_OK(Put("Key2", DummyString(1024, v)));
|
|
ASSERT_OK(Put("Key3", DummyString(1024, v)));
|
|
ASSERT_OK(Put("Key4", DummyString(1024, v)));
|
|
ASSERT_OK(Put("Key1", DummyString(1024, v)));
|
|
ASSERT_OK(Put("Key4", DummyString(1024, v)));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
// We created 4 sst files in L0
|
|
ASSERT_EQ("4", FilesPerLevel(0));
|
|
|
|
std::vector<LiveFileMetaData> metadata;
|
|
db_->GetLiveFilesMetaData(&metadata);
|
|
|
|
// Compaction will move the 4 files in L0 to trash and create 1 L1 file
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
ASSERT_EQ("0,1", FilesPerLevel(0));
|
|
|
|
uint64_t delete_start_time = env_->NowMicros();
|
|
// Hold BackgroundEmptyTrash
|
|
TEST_SYNC_POINT("DBSSTTest::RateLimitedDelete:1");
|
|
sfm->WaitForEmptyTrash();
|
|
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
|
|
|
|
uint64_t total_files_size = 0;
|
|
uint64_t expected_penlty = 0;
|
|
ASSERT_EQ(penalties.size(), metadata.size());
|
|
for (size_t i = 0; i < metadata.size(); i++) {
|
|
total_files_size += metadata[i].size;
|
|
expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec);
|
|
ASSERT_EQ(expected_penlty, penalties[i]);
|
|
}
|
|
ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
// Create a DB with 2 db_paths, and generate multiple files in the 2
|
|
// db_paths using CompactRangeOptions, make sure that files that were
|
|
// deleted from first db_path were deleted using DeleteScheduler and
|
|
// files in the second path were not.
|
|
TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
|
|
int bg_delete_file = 0;
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"DeleteScheduler::DeleteTrashFile:DeleteFile",
|
|
[&](void* arg) { bg_delete_file++; });
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.db_paths.emplace_back(dbname_, 1024 * 100);
|
|
options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100);
|
|
options.env = env_;
|
|
|
|
std::string trash_dir = test::TmpDir(env_) + "/trash";
|
|
int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
|
|
Status s;
|
|
options.sst_file_manager.reset(NewSstFileManager(
|
|
env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
|
|
ASSERT_OK(s);
|
|
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
// Create 4 files in L0
|
|
for (int i = 0; i < 4; i++) {
|
|
ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
// We created 4 sst files in L0
|
|
ASSERT_EQ("4", FilesPerLevel(0));
|
|
// Compaction will delete files from L0 in first db path and generate a new
|
|
// file in L1 in second db path
|
|
CompactRangeOptions compact_options;
|
|
compact_options.target_path_id = 1;
|
|
Slice begin("Key0");
|
|
Slice end("Key3");
|
|
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
|
|
ASSERT_EQ("0,1", FilesPerLevel(0));
|
|
|
|
// Create 4 files in L0
|
|
for (int i = 4; i < 8; i++) {
|
|
ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B')));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
ASSERT_EQ("4,1", FilesPerLevel(0));
|
|
|
|
// Compaction will delete files from L0 in first db path and generate a new
|
|
// file in L1 in second db path
|
|
begin = "Key4";
|
|
end = "Key7";
|
|
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
|
|
ASSERT_EQ("0,2", FilesPerLevel(0));
|
|
|
|
sfm->WaitForEmptyTrash();
|
|
ASSERT_EQ(bg_delete_file, 8);
|
|
|
|
compact_options.bottommost_level_compaction =
|
|
BottommostLevelCompaction::kForce;
|
|
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
|
|
ASSERT_EQ("0,1", FilesPerLevel(0));
|
|
|
|
sfm->WaitForEmptyTrash();
|
|
ASSERT_EQ(bg_delete_file, 8);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
|
|
int bg_delete_file = 0;
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"DeleteScheduler::DeleteTrashFile:DeleteFile",
|
|
[&](void* arg) { bg_delete_file++; });
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.env = env_;
|
|
DestroyAndReopen(options);
|
|
|
|
// Create 4 files in L0
|
|
for (int i = 0; i < 4; i++) {
|
|
ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
// We created 4 sst files in L0
|
|
ASSERT_EQ("4", FilesPerLevel(0));
|
|
|
|
// Close DB and destroy it using DeleteScheduler
|
|
Close();
|
|
std::string trash_dir = test::TmpDir(env_) + "/trash";
|
|
int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
|
|
Status s;
|
|
options.sst_file_manager.reset(NewSstFileManager(
|
|
env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
|
|
ASSERT_OK(s);
|
|
ASSERT_OK(DestroyDB(dbname_, options));
|
|
|
|
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
|
|
sfm->WaitForEmptyTrash();
|
|
// We have deleted the 4 sst files in the delete_scheduler
|
|
ASSERT_EQ(bg_delete_file, 4);
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
|
|
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
|
|
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
|
|
|
|
Options options = CurrentOptions();
|
|
options.sst_file_manager = sst_file_manager;
|
|
options.disable_auto_compactions = true;
|
|
DestroyAndReopen(options);
|
|
|
|
Random rnd(301);
|
|
|
|
// Generate a file containing 100 keys.
|
|
for (int i = 0; i < 100; i++) {
|
|
ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
|
|
uint64_t first_file_size = 0;
|
|
auto files_in_db = GetAllSSTFiles(&first_file_size);
|
|
ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
|
|
|
|
// Set the maximum allowed space usage to the current total size
|
|
sfm->SetMaxAllowedSpaceUsage(first_file_size + 1);
|
|
|
|
ASSERT_OK(Put("key1", "val1"));
|
|
// This flush will cause bg_error_ and will fail
|
|
ASSERT_NOK(Flush());
|
|
}
|
|
|
|
TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
|
|
// This test will set a maximum allowed space for the DB, then it will
|
|
// keep filling the DB until the limit is reached and bg_error_ is set.
|
|
// When bg_error_ is set we will verify that the DB size is greater
|
|
// than the limit.
|
|
|
|
std::vector<int> max_space_limits_mbs = {1, 2, 4, 8, 10};
|
|
|
|
bool bg_error_set = false;
|
|
uint64_t total_sst_files_size = 0;
|
|
|
|
int reached_max_space_on_flush = 0;
|
|
int reached_max_space_on_compaction = 0;
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
|
|
[&](void* arg) {
|
|
bg_error_set = true;
|
|
GetAllSSTFiles(&total_sst_files_size);
|
|
reached_max_space_on_flush++;
|
|
});
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
|
|
[&](void* arg) {
|
|
bg_error_set = true;
|
|
GetAllSSTFiles(&total_sst_files_size);
|
|
reached_max_space_on_compaction++;
|
|
});
|
|
|
|
for (auto limit_mb : max_space_limits_mbs) {
|
|
bg_error_set = false;
|
|
total_sst_files_size = 0;
|
|
rocksdb::SyncPoint::GetInstance()->ClearTrace();
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
|
|
auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
|
|
|
|
Options options = CurrentOptions();
|
|
options.sst_file_manager = sst_file_manager;
|
|
options.write_buffer_size = 1024 * 512; // 512 Kb
|
|
DestroyAndReopen(options);
|
|
Random rnd(301);
|
|
|
|
sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024);
|
|
|
|
int keys_written = 0;
|
|
uint64_t estimated_db_size = 0;
|
|
while (true) {
|
|
auto s = Put(RandomString(&rnd, 10), RandomString(&rnd, 50));
|
|
if (!s.ok()) {
|
|
break;
|
|
}
|
|
keys_written++;
|
|
// Check the estimated db size vs the db limit just to make sure we
|
|
// dont run into an infinite loop
|
|
estimated_db_size = keys_written * 60; // ~60 bytes per key
|
|
ASSERT_LT(estimated_db_size, limit_mb * 1024 * 1024 * 2);
|
|
}
|
|
ASSERT_TRUE(bg_error_set);
|
|
ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
ASSERT_GT(reached_max_space_on_flush, 0);
|
|
ASSERT_GT(reached_max_space_on_compaction, 0);
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
|
|
// Open DB with infinite max open files
|
|
// - First iteration use 1 thread to open files
|
|
// - Second iteration use 5 threads to open files
|
|
for (int iter = 0; iter < 2; iter++) {
|
|
Options options;
|
|
options.create_if_missing = true;
|
|
options.write_buffer_size = 100000;
|
|
options.disable_auto_compactions = true;
|
|
options.max_open_files = -1;
|
|
if (iter == 0) {
|
|
options.max_file_opening_threads = 1;
|
|
} else {
|
|
options.max_file_opening_threads = 5;
|
|
}
|
|
options = CurrentOptions(options);
|
|
DestroyAndReopen(options);
|
|
|
|
// Create 12 Files in L0 (then move then to L2)
|
|
for (int i = 0; i < 12; i++) {
|
|
std::string k = "L2_" + Key(i);
|
|
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
CompactRangeOptions compact_options;
|
|
compact_options.change_level = true;
|
|
compact_options.target_level = 2;
|
|
db_->CompactRange(compact_options, nullptr, nullptr);
|
|
|
|
// Create 12 Files in L0
|
|
for (int i = 0; i < 12; i++) {
|
|
std::string k = "L0_" + Key(i);
|
|
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
Close();
|
|
|
|
// Reopening the DB will load all exisitng files
|
|
Reopen(options);
|
|
ASSERT_EQ("12,0,12", FilesPerLevel(0));
|
|
std::vector<std::vector<FileMetaData>> files;
|
|
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
|
|
|
|
for (const auto& level : files) {
|
|
for (const auto& file : level) {
|
|
ASSERT_TRUE(file.table_reader_handle != nullptr);
|
|
}
|
|
}
|
|
|
|
for (int i = 0; i < 12; i++) {
|
|
ASSERT_EQ(Get("L0_" + Key(i)), "L0_" + Key(i) + std::string(1000, 'a'));
|
|
ASSERT_EQ(Get("L2_" + Key(i)), "L2_" + Key(i) + std::string(1000, 'a'));
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(DBSSTTest, GetTotalSstFilesSize) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.compression = kNoCompression;
|
|
DestroyAndReopen(options);
|
|
// Generate 5 files in L0
|
|
for (int i = 0; i < 5; i++) {
|
|
for (int j = 0; j < 10; j++) {
|
|
std::string val = "val_file_" + ToString(i);
|
|
ASSERT_OK(Put(Key(j), val));
|
|
}
|
|
Flush();
|
|
}
|
|
ASSERT_EQ("5", FilesPerLevel(0));
|
|
|
|
std::vector<LiveFileMetaData> live_files_meta;
|
|
dbfull()->GetLiveFilesMetaData(&live_files_meta);
|
|
ASSERT_EQ(live_files_meta.size(), 5);
|
|
uint64_t single_file_size = live_files_meta[0].size;
|
|
|
|
uint64_t live_sst_files_size = 0;
|
|
uint64_t total_sst_files_size = 0;
|
|
for (const auto& file_meta : live_files_meta) {
|
|
live_sst_files_size += file_meta.size;
|
|
}
|
|
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 5
|
|
// Total SST files = 5
|
|
ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
|
|
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
|
|
|
|
// hold current version
|
|
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
|
|
|
|
// Compact 5 files into 1 file in L0
|
|
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
ASSERT_EQ("0,1", FilesPerLevel(0));
|
|
|
|
live_files_meta.clear();
|
|
dbfull()->GetLiveFilesMetaData(&live_files_meta);
|
|
ASSERT_EQ(live_files_meta.size(), 1);
|
|
|
|
live_sst_files_size = 0;
|
|
total_sst_files_size = 0;
|
|
for (const auto& file_meta : live_files_meta) {
|
|
live_sst_files_size += file_meta.size;
|
|
}
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 1 (compacted file)
|
|
// Total SST files = 6 (5 original files + compacted file)
|
|
ASSERT_EQ(live_sst_files_size, 1 * single_file_size);
|
|
ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
|
|
|
|
// hold current version
|
|
std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
|
|
|
|
// Delete all keys and compact, this will delete all live files
|
|
for (int i = 0; i < 10; i++) {
|
|
ASSERT_OK(Delete(Key(i)));
|
|
}
|
|
Flush();
|
|
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
ASSERT_EQ("", FilesPerLevel(0));
|
|
|
|
live_files_meta.clear();
|
|
dbfull()->GetLiveFilesMetaData(&live_files_meta);
|
|
ASSERT_EQ(live_files_meta.size(), 0);
|
|
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 0
|
|
// Total SST files = 6 (5 original files + compacted file)
|
|
ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
|
|
|
|
iter1.reset();
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 0
|
|
// Total SST files = 1 (compacted file)
|
|
ASSERT_EQ(total_sst_files_size, 1 * single_file_size);
|
|
|
|
iter2.reset();
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 0
|
|
// Total SST files = 0
|
|
ASSERT_EQ(total_sst_files_size, 0);
|
|
}
|
|
|
|
TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.compression = kNoCompression;
|
|
DestroyAndReopen(options);
|
|
// Generate 5 files in L0
|
|
for (int i = 0; i < 5; i++) {
|
|
ASSERT_OK(Put(Key(i), "val"));
|
|
Flush();
|
|
}
|
|
ASSERT_EQ("5", FilesPerLevel(0));
|
|
|
|
std::vector<LiveFileMetaData> live_files_meta;
|
|
dbfull()->GetLiveFilesMetaData(&live_files_meta);
|
|
ASSERT_EQ(live_files_meta.size(), 5);
|
|
uint64_t single_file_size = live_files_meta[0].size;
|
|
|
|
uint64_t live_sst_files_size = 0;
|
|
uint64_t total_sst_files_size = 0;
|
|
for (const auto& file_meta : live_files_meta) {
|
|
live_sst_files_size += file_meta.size;
|
|
}
|
|
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
|
|
// Live SST files = 5
|
|
// Total SST files = 5
|
|
ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
|
|
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
|
|
|
|
// hold current version
|
|
std::unique_ptr<Iterator> iter1(dbfull()->NewIterator(ReadOptions()));
|
|
|
|
// Compaction will do trivial move from L0 to L1
|
|
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
ASSERT_EQ("0,5", FilesPerLevel(0));
|
|
|
|
live_files_meta.clear();
|
|
dbfull()->GetLiveFilesMetaData(&live_files_meta);
|
|
ASSERT_EQ(live_files_meta.size(), 5);
|
|
|
|
live_sst_files_size = 0;
|
|
total_sst_files_size = 0;
|
|
for (const auto& file_meta : live_files_meta) {
|
|
live_sst_files_size += file_meta.size;
|
|
}
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 5
|
|
// Total SST files = 5 (used in 2 version)
|
|
ASSERT_EQ(live_sst_files_size, 5 * single_file_size);
|
|
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
|
|
|
|
// hold current version
|
|
std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
|
|
|
|
// Delete all keys and compact, this will delete all live files
|
|
for (int i = 0; i < 5; i++) {
|
|
ASSERT_OK(Delete(Key(i)));
|
|
}
|
|
Flush();
|
|
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
ASSERT_EQ("", FilesPerLevel(0));
|
|
|
|
live_files_meta.clear();
|
|
dbfull()->GetLiveFilesMetaData(&live_files_meta);
|
|
ASSERT_EQ(live_files_meta.size(), 0);
|
|
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 0
|
|
// Total SST files = 5 (used in 2 version)
|
|
ASSERT_EQ(total_sst_files_size, 5 * single_file_size);
|
|
|
|
iter1.reset();
|
|
iter2.reset();
|
|
|
|
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
|
|
&total_sst_files_size));
|
|
// Live SST files = 0
|
|
// Total SST files = 0
|
|
ASSERT_EQ(total_sst_files_size, 0);
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFile) {
|
|
do {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
// file1.sst (0 => 99)
|
|
std::string file1 = sst_files_folder + "file1.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file1));
|
|
for (int k = 0; k < 100; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file1_info;
|
|
Status s = sst_file_writer.Finish(&file1_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file1_info.file_path, file1);
|
|
ASSERT_EQ(file1_info.num_entries, 100);
|
|
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
|
ASSERT_EQ(file1_info.largest_key, Key(99));
|
|
// sst_file_writer already finished, cannot add this value
|
|
s = sst_file_writer.Add(Key(100), "bad_val");
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
|
|
// file2.sst (100 => 199)
|
|
std::string file2 = sst_files_folder + "file2.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file2));
|
|
for (int k = 100; k < 200; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
// Cannot add this key because it's not after last added key
|
|
s = sst_file_writer.Add(Key(99), "bad_val");
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
ExternalSstFileInfo file2_info;
|
|
s = sst_file_writer.Finish(&file2_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file2_info.file_path, file2);
|
|
ASSERT_EQ(file2_info.num_entries, 100);
|
|
ASSERT_EQ(file2_info.smallest_key, Key(100));
|
|
ASSERT_EQ(file2_info.largest_key, Key(199));
|
|
|
|
// file3.sst (195 => 299)
|
|
// This file values overlap with file2 values
|
|
std::string file3 = sst_files_folder + "file3.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file3));
|
|
for (int k = 195; k < 300; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
|
}
|
|
ExternalSstFileInfo file3_info;
|
|
s = sst_file_writer.Finish(&file3_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file3_info.file_path, file3);
|
|
ASSERT_EQ(file3_info.num_entries, 105);
|
|
ASSERT_EQ(file3_info.smallest_key, Key(195));
|
|
ASSERT_EQ(file3_info.largest_key, Key(299));
|
|
|
|
// file4.sst (30 => 39)
|
|
// This file values overlap with file1 values
|
|
std::string file4 = sst_files_folder + "file4.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file4));
|
|
for (int k = 30; k < 40; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
|
}
|
|
ExternalSstFileInfo file4_info;
|
|
s = sst_file_writer.Finish(&file4_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file4_info.file_path, file4);
|
|
ASSERT_EQ(file4_info.num_entries, 10);
|
|
ASSERT_EQ(file4_info.smallest_key, Key(30));
|
|
ASSERT_EQ(file4_info.largest_key, Key(39));
|
|
|
|
// file5.sst (400 => 499)
|
|
std::string file5 = sst_files_folder + "file5.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file5));
|
|
for (int k = 400; k < 500; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file5_info;
|
|
s = sst_file_writer.Finish(&file5_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file5_info.file_path, file5);
|
|
ASSERT_EQ(file5_info.num_entries, 100);
|
|
ASSERT_EQ(file5_info.smallest_key, Key(400));
|
|
ASSERT_EQ(file5_info.largest_key, Key(499));
|
|
|
|
// Cannot create an empty sst file
|
|
std::string file_empty = sst_files_folder + "file_empty.sst";
|
|
ExternalSstFileInfo file_empty_info;
|
|
s = sst_file_writer.Finish(&file_empty_info);
|
|
ASSERT_NOK(s);
|
|
|
|
DestroyAndReopen(options);
|
|
// Add file using file path
|
|
s = db_->AddFile(std::vector<std::string>(1, file1));
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
|
for (int k = 0; k < 100; k++) {
|
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
|
}
|
|
|
|
// Add file while holding a snapshot will fail
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
if (s1 != nullptr) {
|
|
ASSERT_NOK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info)));
|
|
db_->ReleaseSnapshot(s1);
|
|
}
|
|
// We can add the file after releaseing the snapshot
|
|
ASSERT_OK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info)));
|
|
|
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
|
for (int k = 0; k < 200; k++) {
|
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
|
}
|
|
|
|
// This file has overlapping values with the exisitng data
|
|
s = db_->AddFile(std::vector<std::string>(1, file3));
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
|
|
// This file has overlapping values with the exisitng data
|
|
s = db_->AddFile(std::vector<ExternalSstFileInfo>(1, file4_info));
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
|
|
// Overwrite values of keys divisible by 5
|
|
for (int k = 0; k < 200; k += 5) {
|
|
ASSERT_OK(Put(Key(k), Key(k) + "_val_new"));
|
|
}
|
|
ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
|
|
|
|
// Key range of file5 (400 => 499) dont overlap with any keys in DB
|
|
ASSERT_OK(db_->AddFile(std::vector<std::string>(1, file5)));
|
|
|
|
// Make sure values are correct before and after flush/compaction
|
|
for (int i = 0; i < 2; i++) {
|
|
for (int k = 0; k < 200; k++) {
|
|
std::string value = Key(k) + "_val";
|
|
if (k % 5 == 0) {
|
|
value += "_new";
|
|
}
|
|
ASSERT_EQ(Get(Key(k)), value);
|
|
}
|
|
for (int k = 400; k < 500; k++) {
|
|
std::string value = Key(k) + "_val";
|
|
ASSERT_EQ(Get(Key(k)), value);
|
|
}
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
}
|
|
|
|
Close();
|
|
options.disable_auto_compactions = true;
|
|
Reopen(options);
|
|
|
|
// Delete keys in range (400 => 499)
|
|
for (int k = 400; k < 500; k++) {
|
|
ASSERT_OK(Delete(Key(k)));
|
|
}
|
|
// We deleted range (400 => 499) but cannot add file5 because
|
|
// of the range tombstones
|
|
ASSERT_NOK(db_->AddFile(std::vector<std::string>(1, file5)));
|
|
|
|
// Compacting the DB will remove the tombstones
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
// Now we can add the file
|
|
ASSERT_OK(db_->AddFile(std::vector<std::string>(1, file5)));
|
|
|
|
// Verify values of file5 in DB
|
|
for (int k = 400; k < 500; k++) {
|
|
std::string value = Key(k) + "_val";
|
|
ASSERT_EQ(Get(Key(k)), value);
|
|
}
|
|
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
|
|
kSkipFIFOCompaction));
|
|
}
|
|
class SstFileWriterCollector : public TablePropertiesCollector {
|
|
public:
|
|
explicit SstFileWriterCollector(const std::string prefix) : prefix_(prefix) {
|
|
name_ = prefix_ + "_SstFileWriterCollector";
|
|
}
|
|
|
|
const char* Name() const override { return name_.c_str(); }
|
|
|
|
Status Finish(UserCollectedProperties* properties) override {
|
|
*properties = UserCollectedProperties{
|
|
{prefix_ + "_SstFileWriterCollector", "YES"},
|
|
{prefix_ + "_Count", std::to_string(count_)},
|
|
};
|
|
return Status::OK();
|
|
}
|
|
|
|
Status AddUserKey(const Slice& user_key, const Slice& value, EntryType type,
|
|
SequenceNumber seq, uint64_t file_size) override {
|
|
++count_;
|
|
return Status::OK();
|
|
}
|
|
|
|
virtual UserCollectedProperties GetReadableProperties() const override {
|
|
return UserCollectedProperties{};
|
|
}
|
|
|
|
private:
|
|
uint32_t count_ = 0;
|
|
std::string prefix_;
|
|
std::string name_;
|
|
};
|
|
|
|
class SstFileWriterCollectorFactory : public TablePropertiesCollectorFactory {
|
|
public:
|
|
explicit SstFileWriterCollectorFactory(std::string prefix)
|
|
: prefix_(prefix), num_created_(0) {}
|
|
virtual TablePropertiesCollector* CreateTablePropertiesCollector(
|
|
TablePropertiesCollectorFactory::Context context) override {
|
|
num_created_++;
|
|
return new SstFileWriterCollector(prefix_);
|
|
}
|
|
const char* Name() const override { return "SstFileWriterCollectorFactory"; }
|
|
|
|
std::string prefix_;
|
|
uint32_t num_created_;
|
|
};
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFileList) {
|
|
do {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
|
|
auto abc_collector = std::make_shared<SstFileWriterCollectorFactory>("abc");
|
|
auto xyz_collector = std::make_shared<SstFileWriterCollectorFactory>("xyz");
|
|
|
|
options.table_properties_collector_factories.emplace_back(abc_collector);
|
|
options.table_properties_collector_factories.emplace_back(xyz_collector);
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
// file1.sst (0 => 99)
|
|
std::string file1 = sst_files_folder + "file1.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file1));
|
|
for (int k = 0; k < 100; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file1_info;
|
|
Status s = sst_file_writer.Finish(&file1_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file1_info.file_path, file1);
|
|
ASSERT_EQ(file1_info.num_entries, 100);
|
|
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
|
ASSERT_EQ(file1_info.largest_key, Key(99));
|
|
// sst_file_writer already finished, cannot add this value
|
|
s = sst_file_writer.Add(Key(100), "bad_val");
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
|
|
// file2.sst (100 => 199)
|
|
std::string file2 = sst_files_folder + "file2.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file2));
|
|
for (int k = 100; k < 200; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
// Cannot add this key because it's not after last added key
|
|
s = sst_file_writer.Add(Key(99), "bad_val");
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
ExternalSstFileInfo file2_info;
|
|
s = sst_file_writer.Finish(&file2_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file2_info.file_path, file2);
|
|
ASSERT_EQ(file2_info.num_entries, 100);
|
|
ASSERT_EQ(file2_info.smallest_key, Key(100));
|
|
ASSERT_EQ(file2_info.largest_key, Key(199));
|
|
|
|
// file3.sst (195 => 199)
|
|
// This file values overlap with file2 values
|
|
std::string file3 = sst_files_folder + "file3.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file3));
|
|
for (int k = 195; k < 200; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
|
}
|
|
ExternalSstFileInfo file3_info;
|
|
s = sst_file_writer.Finish(&file3_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file3_info.file_path, file3);
|
|
ASSERT_EQ(file3_info.num_entries, 5);
|
|
ASSERT_EQ(file3_info.smallest_key, Key(195));
|
|
ASSERT_EQ(file3_info.largest_key, Key(199));
|
|
|
|
// file4.sst (30 => 39)
|
|
// This file values overlap with file1 values
|
|
std::string file4 = sst_files_folder + "file4.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file4));
|
|
for (int k = 30; k < 40; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
|
}
|
|
ExternalSstFileInfo file4_info;
|
|
s = sst_file_writer.Finish(&file4_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file4_info.file_path, file4);
|
|
ASSERT_EQ(file4_info.num_entries, 10);
|
|
ASSERT_EQ(file4_info.smallest_key, Key(30));
|
|
ASSERT_EQ(file4_info.largest_key, Key(39));
|
|
|
|
// file5.sst (200 => 299)
|
|
std::string file5 = sst_files_folder + "file5.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file5));
|
|
for (int k = 200; k < 300; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file5_info;
|
|
s = sst_file_writer.Finish(&file5_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file5_info.file_path, file5);
|
|
ASSERT_EQ(file5_info.num_entries, 100);
|
|
ASSERT_EQ(file5_info.smallest_key, Key(200));
|
|
ASSERT_EQ(file5_info.largest_key, Key(299));
|
|
|
|
// list 1 has internal key range conflict
|
|
std::vector<std::string> file_list0({file1, file2});
|
|
std::vector<std::string> file_list1({file3, file2, file1});
|
|
std::vector<std::string> file_list2({file5});
|
|
std::vector<std::string> file_list3({file3, file4});
|
|
|
|
std::vector<ExternalSstFileInfo> info_list0({file1_info, file2_info});
|
|
std::vector<ExternalSstFileInfo> info_list1(
|
|
{file3_info, file2_info, file1_info});
|
|
std::vector<ExternalSstFileInfo> info_list2({file5_info});
|
|
std::vector<ExternalSstFileInfo> info_list3({file3_info, file4_info});
|
|
|
|
DestroyAndReopen(options);
|
|
|
|
// This list of files have key ranges are overlapping with each other
|
|
s = db_->AddFile(file_list1);
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
s = db_->AddFile(info_list1);
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
|
|
// Add files using file path list
|
|
s = db_->AddFile(file_list0);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
|
for (int k = 0; k < 200; k++) {
|
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
|
}
|
|
|
|
TablePropertiesCollection props;
|
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
|
|
ASSERT_EQ(props.size(), 2);
|
|
for (auto file_props : props) {
|
|
auto user_props = file_props.second->user_collected_properties;
|
|
ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
|
|
ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
|
|
ASSERT_EQ(user_props["abc_Count"], "100");
|
|
ASSERT_EQ(user_props["xyz_Count"], "100");
|
|
}
|
|
|
|
// Add file while holding a snapshot will fail
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
if (s1 != nullptr) {
|
|
ASSERT_NOK(db_->AddFile(info_list2));
|
|
db_->ReleaseSnapshot(s1);
|
|
}
|
|
// We can add the file after releaseing the snapshot
|
|
ASSERT_OK(db_->AddFile(info_list2));
|
|
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
|
|
for (int k = 0; k < 300; k++) {
|
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
|
}
|
|
|
|
ASSERT_OK(db_->GetPropertiesOfAllTables(&props));
|
|
ASSERT_EQ(props.size(), 3);
|
|
for (auto file_props : props) {
|
|
auto user_props = file_props.second->user_collected_properties;
|
|
ASSERT_EQ(user_props["abc_SstFileWriterCollector"], "YES");
|
|
ASSERT_EQ(user_props["xyz_SstFileWriterCollector"], "YES");
|
|
ASSERT_EQ(user_props["abc_Count"], "100");
|
|
ASSERT_EQ(user_props["xyz_Count"], "100");
|
|
}
|
|
|
|
// This file list has overlapping values with the exisitng data
|
|
s = db_->AddFile(file_list3);
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
s = db_->AddFile(info_list3);
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
|
|
// Overwrite values of keys divisible by 5
|
|
for (int k = 0; k < 200; k += 5) {
|
|
ASSERT_OK(Put(Key(k), Key(k) + "_val_new"));
|
|
}
|
|
ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
|
|
|
|
// Make sure values are correct before and after flush/compaction
|
|
for (int i = 0; i < 2; i++) {
|
|
for (int k = 0; k < 200; k++) {
|
|
std::string value = Key(k) + "_val";
|
|
if (k % 5 == 0) {
|
|
value += "_new";
|
|
}
|
|
ASSERT_EQ(Get(Key(k)), value);
|
|
}
|
|
for (int k = 200; k < 300; k++) {
|
|
std::string value = Key(k) + "_val";
|
|
ASSERT_EQ(Get(Key(k)), value);
|
|
}
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
}
|
|
|
|
// Delete keys in range (200 => 299)
|
|
for (int k = 200; k < 300; k++) {
|
|
ASSERT_OK(Delete(Key(k)));
|
|
}
|
|
// We deleted range (200 => 299) but cannot add file5 because
|
|
// of the range tombstones
|
|
ASSERT_NOK(db_->AddFile(file_list2));
|
|
|
|
// Compacting the DB will remove the tombstones
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
|
|
// Now we can add the file
|
|
ASSERT_OK(db_->AddFile(file_list2));
|
|
|
|
// Verify values of file5 in DB
|
|
for (int k = 200; k < 300; k++) {
|
|
std::string value = Key(k) + "_val";
|
|
ASSERT_EQ(Get(Key(k)), value);
|
|
}
|
|
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
|
|
kSkipFIFOCompaction));
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFileListAtomicity) {
|
|
do {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
// files[0].sst (0 => 99)
|
|
// files[1].sst (100 => 199)
|
|
// ...
|
|
// file[8].sst (800 => 899)
|
|
int n = 9;
|
|
std::vector<std::string> files(n);
|
|
std::vector<ExternalSstFileInfo> files_info(n);
|
|
for (int i = 0; i < n; i++) {
|
|
files[i] = sst_files_folder + "file" + std::to_string(i) + ".sst";
|
|
ASSERT_OK(sst_file_writer.Open(files[i]));
|
|
for (int k = i * 100; k < (i + 1) * 100; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
Status s = sst_file_writer.Finish(&files_info[i]);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(files_info[i].file_path, files[i]);
|
|
ASSERT_EQ(files_info[i].num_entries, 100);
|
|
ASSERT_EQ(files_info[i].smallest_key, Key(i * 100));
|
|
ASSERT_EQ(files_info[i].largest_key, Key((i + 1) * 100 - 1));
|
|
}
|
|
files.push_back(sst_files_folder + "file" + std::to_string(n) + ".sst");
|
|
auto s = db_->AddFile(files);
|
|
ASSERT_NOK(s) << s.ToString();
|
|
for (int k = 0; k < n * 100; k++) {
|
|
ASSERT_EQ("NOT_FOUND", Get(Key(k)));
|
|
}
|
|
s = db_->AddFile(files_info);
|
|
ASSERT_OK(s);
|
|
for (int k = 0; k < n * 100; k++) {
|
|
std::string value = Key(k) + "_val";
|
|
ASSERT_EQ(Get(Key(k)), value);
|
|
}
|
|
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
|
|
kSkipFIFOCompaction));
|
|
}
|
|
// This test reporduce a bug that can happen in some cases if the DB started
|
|
// purging obsolete files when we are adding an external sst file.
|
|
// This situation may result in deleting the file while it's being added.
|
|
TEST_F(DBSSTTest, AddExternalSstFilePurgeObsoleteFilesBug) {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
// file1.sst (0 => 500)
|
|
std::string sst_file_path = sst_files_folder + "file1.sst";
|
|
Status s = sst_file_writer.Open(sst_file_path);
|
|
ASSERT_OK(s);
|
|
for (int i = 0; i < 500; i++) {
|
|
std::string k = Key(i);
|
|
s = sst_file_writer.Add(k, k + "_val");
|
|
ASSERT_OK(s);
|
|
}
|
|
|
|
ExternalSstFileInfo sst_file_info;
|
|
s = sst_file_writer.Finish(&sst_file_info);
|
|
ASSERT_OK(s);
|
|
|
|
options.delete_obsolete_files_period_micros = 0;
|
|
options.disable_auto_compactions = true;
|
|
DestroyAndReopen(options);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::AddFile:FileCopied", [&](void* arg) {
|
|
ASSERT_OK(Put("aaa", "bbb"));
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(Put("aaa", "xxx"));
|
|
ASSERT_OK(Flush());
|
|
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
|
});
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
s = db_->AddFile(std::vector<std::string>(1, sst_file_path));
|
|
ASSERT_OK(s);
|
|
|
|
for (int i = 0; i < 500; i++) {
|
|
std::string k = Key(i);
|
|
std::string v = k + "_val";
|
|
ASSERT_EQ(Get(k), v);
|
|
}
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFileNoCopy) {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
const ImmutableCFOptions ioptions(options);
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
// file1.sst (0 => 99)
|
|
std::string file1 = sst_files_folder + "file1.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file1));
|
|
for (int k = 0; k < 100; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file1_info;
|
|
Status s = sst_file_writer.Finish(&file1_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file1_info.file_path, file1);
|
|
ASSERT_EQ(file1_info.num_entries, 100);
|
|
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
|
ASSERT_EQ(file1_info.largest_key, Key(99));
|
|
|
|
// file2.sst (100 => 299)
|
|
std::string file2 = sst_files_folder + "file2.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file2));
|
|
for (int k = 100; k < 300; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file2_info;
|
|
s = sst_file_writer.Finish(&file2_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file2_info.file_path, file2);
|
|
ASSERT_EQ(file2_info.num_entries, 200);
|
|
ASSERT_EQ(file2_info.smallest_key, Key(100));
|
|
ASSERT_EQ(file2_info.largest_key, Key(299));
|
|
|
|
// file3.sst (110 => 124) .. overlap with file2.sst
|
|
std::string file3 = sst_files_folder + "file3.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file3));
|
|
for (int k = 110; k < 125; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val_overlap"));
|
|
}
|
|
ExternalSstFileInfo file3_info;
|
|
s = sst_file_writer.Finish(&file3_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file3_info.file_path, file3);
|
|
ASSERT_EQ(file3_info.num_entries, 15);
|
|
ASSERT_EQ(file3_info.smallest_key, Key(110));
|
|
ASSERT_EQ(file3_info.largest_key, Key(124));
|
|
s = db_->AddFile(std::vector<ExternalSstFileInfo>(1, file1_info),
|
|
true /* move file */);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(Status::NotFound(), env_->FileExists(file1));
|
|
|
|
s = db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info),
|
|
false /* copy file */);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_OK(env_->FileExists(file2));
|
|
|
|
// This file have overlapping values with the exisitng data
|
|
s = db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info),
|
|
true /* move file */);
|
|
ASSERT_FALSE(s.ok()) << s.ToString();
|
|
ASSERT_OK(env_->FileExists(file3));
|
|
|
|
for (int k = 0; k < 300; k++) {
|
|
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
|
|
}
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFileSkipSnapshot) {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
// file1.sst (0 => 99)
|
|
std::string file1 = sst_files_folder + "file1.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file1));
|
|
for (int k = 0; k < 100; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file1_info;
|
|
Status s = sst_file_writer.Finish(&file1_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file1_info.file_path, file1);
|
|
ASSERT_EQ(file1_info.num_entries, 100);
|
|
ASSERT_EQ(file1_info.smallest_key, Key(0));
|
|
ASSERT_EQ(file1_info.largest_key, Key(99));
|
|
|
|
// file2.sst (100 => 299)
|
|
std::string file2 = sst_files_folder + "file2.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file2));
|
|
for (int k = 100; k < 300; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file2_info;
|
|
s = sst_file_writer.Finish(&file2_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file2_info.file_path, file2);
|
|
ASSERT_EQ(file2_info.num_entries, 200);
|
|
ASSERT_EQ(file2_info.smallest_key, Key(100));
|
|
ASSERT_EQ(file2_info.largest_key, Key(299));
|
|
|
|
ASSERT_OK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file1_info)));
|
|
|
|
|
|
// Add file will fail when holding snapshot and use the default
|
|
// skip_snapshot_check to false
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
if (s1 != nullptr) {
|
|
ASSERT_NOK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info)));
|
|
}
|
|
|
|
// Add file will success when set skip_snapshot_check to true even db holding
|
|
// snapshot
|
|
if (s1 != nullptr) {
|
|
ASSERT_OK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file2_info), false, true));
|
|
db_->ReleaseSnapshot(s1);
|
|
}
|
|
|
|
// file3.sst (300 => 399)
|
|
std::string file3 = sst_files_folder + "file3.sst";
|
|
ASSERT_OK(sst_file_writer.Open(file3));
|
|
for (int k = 300; k < 400; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k) + "_val"));
|
|
}
|
|
ExternalSstFileInfo file3_info;
|
|
s = sst_file_writer.Finish(&file3_info);
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
ASSERT_EQ(file3_info.file_path, file3);
|
|
ASSERT_EQ(file3_info.num_entries, 100);
|
|
ASSERT_EQ(file3_info.smallest_key, Key(300));
|
|
ASSERT_EQ(file3_info.largest_key, Key(399));
|
|
|
|
// check that we have change the old key
|
|
ASSERT_EQ(Get(Key(300)), "NOT_FOUND");
|
|
const Snapshot* s2 = db_->GetSnapshot();
|
|
ASSERT_OK(db_->AddFile(std::vector<ExternalSstFileInfo>(1, file3_info), false, true));
|
|
ASSERT_EQ(Get(Key(300)), Key(300) + ("_val"));
|
|
ASSERT_EQ(Get(Key(300), s2), Key(300) + ("_val"));
|
|
|
|
db_->ReleaseSnapshot(s2);
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
// Bulk load 10 files every file contain 1000 keys
|
|
int num_files = 10;
|
|
int keys_per_file = 1000;
|
|
|
|
// Generate file names
|
|
std::vector<std::string> file_names;
|
|
for (int i = 0; i < num_files; i++) {
|
|
std::string file_name = "file_" + ToString(i) + ".sst";
|
|
file_names.push_back(sst_files_folder + file_name);
|
|
}
|
|
|
|
do {
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
|
|
std::atomic<int> thread_num(0);
|
|
std::function<void()> write_file_func = [&]() {
|
|
int file_idx = thread_num.fetch_add(1);
|
|
int range_start = file_idx * keys_per_file;
|
|
int range_end = range_start + keys_per_file;
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
ASSERT_OK(sst_file_writer.Open(file_names[file_idx]));
|
|
|
|
for (int k = range_start; k < range_end; k++) {
|
|
ASSERT_OK(sst_file_writer.Add(Key(k), Key(k)));
|
|
}
|
|
|
|
Status s = sst_file_writer.Finish();
|
|
ASSERT_TRUE(s.ok()) << s.ToString();
|
|
};
|
|
// Write num_files files in parallel
|
|
std::vector<std::thread> sst_writer_threads;
|
|
for (int i = 0; i < num_files; ++i) {
|
|
sst_writer_threads.emplace_back(write_file_func);
|
|
}
|
|
|
|
for (auto& t : sst_writer_threads) {
|
|
t.join();
|
|
}
|
|
|
|
fprintf(stderr, "Wrote %d files (%d keys)\n", num_files,
|
|
num_files * keys_per_file);
|
|
|
|
thread_num.store(0);
|
|
std::atomic<int> files_added(0);
|
|
std::function<void()> load_file_func = [&]() {
|
|
// We intentionally add every file twice, and assert that it was added
|
|
// only once and the other add failed
|
|
int thread_id = thread_num.fetch_add(1);
|
|
int file_idx = thread_id / 2;
|
|
// sometimes we use copy, sometimes link .. the result should be the same
|
|
bool move_file = (thread_id % 3 == 0);
|
|
|
|
Status s = db_->AddFile(std::vector<std::string>(1, file_names[file_idx]),
|
|
move_file);
|
|
if (s.ok()) {
|
|
files_added++;
|
|
}
|
|
};
|
|
// Bulk load num_files files in parallel
|
|
std::vector<std::thread> add_file_threads;
|
|
DestroyAndReopen(options);
|
|
for (int i = 0; i < num_files * 2; ++i) {
|
|
add_file_threads.emplace_back(load_file_func);
|
|
}
|
|
|
|
for (auto& t : add_file_threads) {
|
|
t.join();
|
|
}
|
|
ASSERT_EQ(files_added.load(), num_files);
|
|
fprintf(stderr, "Loaded %d files (%d keys)\n", num_files,
|
|
num_files * keys_per_file);
|
|
|
|
// Overwrite values of keys divisible by 100
|
|
for (int k = 0; k < num_files * keys_per_file; k += 100) {
|
|
std::string key = Key(k);
|
|
Status s = Put(key, key + "_new");
|
|
ASSERT_TRUE(s.ok());
|
|
}
|
|
|
|
for (int i = 0; i < 2; i++) {
|
|
// Make sure the values are correct before and after flush/compaction
|
|
for (int k = 0; k < num_files * keys_per_file; ++k) {
|
|
std::string key = Key(k);
|
|
std::string value = (k % 100 == 0) ? (key + "_new") : key;
|
|
ASSERT_EQ(Get(key), value);
|
|
}
|
|
ASSERT_OK(Flush());
|
|
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
|
|
}
|
|
|
|
fprintf(stderr, "Verified %d values\n", num_files * keys_per_file);
|
|
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
|
|
kSkipFIFOCompaction));
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
Random rnd(301);
|
|
do {
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
DestroyAndReopen(options);
|
|
|
|
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
|
|
|
|
printf("Option config = %d\n", option_config_);
|
|
std::vector<std::pair<int, int>> key_ranges;
|
|
for (int i = 0; i < 500; i++) {
|
|
int range_start = rnd.Uniform(20000);
|
|
int keys_per_range = 10 + rnd.Uniform(41);
|
|
|
|
key_ranges.emplace_back(range_start, range_start + keys_per_range);
|
|
}
|
|
|
|
int memtable_add = 0;
|
|
int success_add_file = 0;
|
|
int failed_add_file = 0;
|
|
std::map<std::string, std::string> true_data;
|
|
for (size_t i = 0; i < key_ranges.size(); i++) {
|
|
int range_start = key_ranges[i].first;
|
|
int range_end = key_ranges[i].second;
|
|
|
|
Status s;
|
|
std::string range_val = "range_" + ToString(i);
|
|
|
|
// For 20% of ranges we use DB::Put, for 80% we use DB::AddFile
|
|
if (i && i % 5 == 0) {
|
|
// Use DB::Put to insert range (insert into memtable)
|
|
range_val += "_put";
|
|
for (int k = range_start; k <= range_end; k++) {
|
|
s = Put(Key(k), range_val);
|
|
ASSERT_OK(s);
|
|
}
|
|
memtable_add++;
|
|
} else {
|
|
// Use DB::AddFile to insert range
|
|
range_val += "_add_file";
|
|
|
|
// Generate the file containing the range
|
|
std::string file_name = sst_files_folder + env_->GenerateUniqueId();
|
|
ASSERT_OK(sst_file_writer.Open(file_name));
|
|
for (int k = range_start; k <= range_end; k++) {
|
|
s = sst_file_writer.Add(Key(k), range_val);
|
|
ASSERT_OK(s);
|
|
}
|
|
ExternalSstFileInfo file_info;
|
|
s = sst_file_writer.Finish(&file_info);
|
|
ASSERT_OK(s);
|
|
|
|
// Insert the generated file
|
|
s = db_->AddFile(std::vector<ExternalSstFileInfo>(1, file_info));
|
|
|
|
auto it = true_data.lower_bound(Key(range_start));
|
|
if (it != true_data.end() && it->first <= Key(range_end)) {
|
|
// This range overlap with data already exist in DB
|
|
ASSERT_NOK(s);
|
|
failed_add_file++;
|
|
} else {
|
|
ASSERT_OK(s);
|
|
success_add_file++;
|
|
}
|
|
}
|
|
|
|
if (s.ok()) {
|
|
// Update true_data map to include the new inserted data
|
|
for (int k = range_start; k <= range_end; k++) {
|
|
true_data[Key(k)] = range_val;
|
|
}
|
|
}
|
|
|
|
// Flush / Compact the DB
|
|
if (i && i % 50 == 0) {
|
|
Flush();
|
|
}
|
|
if (i && i % 75 == 0) {
|
|
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
|
}
|
|
}
|
|
|
|
printf(
|
|
"Total: %" ROCKSDB_PRIszt " ranges\n"
|
|
"AddFile()|Success: %d ranges\n"
|
|
"AddFile()|RangeConflict: %d ranges\n"
|
|
"Put(): %d ranges\n",
|
|
key_ranges.size(), success_add_file, failed_add_file, memtable_add);
|
|
|
|
// Verify the correctness of the data
|
|
for (const auto& kv : true_data) {
|
|
ASSERT_EQ(Get(kv.first), kv.second);
|
|
}
|
|
printf("keys/values verified\n");
|
|
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
|
|
kSkipFIFOCompaction));
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFilePickedLevel) {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = false;
|
|
options.level0_file_num_compaction_trigger = 4;
|
|
options.num_levels = 4;
|
|
options.env = env_;
|
|
DestroyAndReopen(options);
|
|
|
|
std::vector<std::vector<int>> file_to_keys;
|
|
|
|
// File 0 will go to last level (L3)
|
|
file_to_keys.push_back({1, 10});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
EXPECT_EQ(FilesPerLevel(), "0,0,0,1");
|
|
|
|
// File 1 will go to level L2 (since it overlap with file 0 in L3)
|
|
file_to_keys.push_back({2, 9});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
EXPECT_EQ(FilesPerLevel(), "0,0,1,1");
|
|
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBSSTTest::AddExternalSstFilePickedLevel:0",
|
|
"BackgroundCallCompaction:0"},
|
|
{"DBImpl::BackgroundCompaction:Start",
|
|
"DBSSTTest::AddExternalSstFilePickedLevel:1"},
|
|
{"DBSSTTest::AddExternalSstFilePickedLevel:2",
|
|
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
|
|
});
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
// Flush 4 files containing the same keys
|
|
for (int i = 0; i < 4; i++) {
|
|
ASSERT_OK(Put(Key(3), Key(3) + "put"));
|
|
ASSERT_OK(Put(Key(8), Key(8) + "put"));
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
// Wait for BackgroundCompaction() to be called
|
|
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:0");
|
|
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:1");
|
|
|
|
EXPECT_EQ(FilesPerLevel(), "4,0,1,1");
|
|
|
|
// This file overlaps with file 0 (L3), file 1 (L2) and the
|
|
// output of compaction going to L1
|
|
file_to_keys.push_back({4, 7});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
EXPECT_EQ(FilesPerLevel(), "5,0,1,1");
|
|
|
|
// This file does not overlap with any file or with the running compaction
|
|
file_to_keys.push_back({9000, 9001});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
EXPECT_EQ(FilesPerLevel(), "5,0,1,2");
|
|
|
|
// Hold compaction from finishing
|
|
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevel:2");
|
|
|
|
dbfull()->TEST_WaitForCompact();
|
|
EXPECT_EQ(FilesPerLevel(), "1,1,1,2");
|
|
|
|
for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) {
|
|
for (auto& key_id : file_to_keys[file_id]) {
|
|
std::string k = Key(key_id);
|
|
std::string v = k + ToString(file_id);
|
|
if (key_id == 3 || key_id == 8) {
|
|
v = k + "put";
|
|
}
|
|
|
|
ASSERT_EQ(Get(k), v);
|
|
}
|
|
}
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_F(DBSSTTest, AddExternalSstFilePickedLevelDynamic) {
|
|
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
|
|
env_->CreateDir(sst_files_folder);
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = false;
|
|
options.level0_file_num_compaction_trigger = 4;
|
|
options.level_compaction_dynamic_level_bytes = true;
|
|
options.num_levels = 4;
|
|
options.env = env_;
|
|
DestroyAndReopen(options);
|
|
|
|
rocksdb::SyncPoint::GetInstance()->LoadDependency({
|
|
{"DBSSTTest::AddExternalSstFilePickedLevelDynamic:0",
|
|
"BackgroundCallCompaction:0"},
|
|
{"DBImpl::BackgroundCompaction:Start",
|
|
"DBSSTTest::AddExternalSstFilePickedLevelDynamic:1"},
|
|
{"DBSSTTest::AddExternalSstFilePickedLevelDynamic:2",
|
|
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"},
|
|
});
|
|
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
// Flush 4 files containing the same keys
|
|
for (int i = 0; i < 4; i++) {
|
|
for (int k = 20; k <= 30; k++) {
|
|
ASSERT_OK(Put(Key(k), Key(k) + "put"));
|
|
}
|
|
for (int k = 50; k <= 60; k++) {
|
|
ASSERT_OK(Put(Key(k), Key(k) + "put"));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
// Wait for BackgroundCompaction() to be called
|
|
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:0");
|
|
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:1");
|
|
std::vector<std::vector<int>> file_to_keys;
|
|
|
|
// This file overlaps with the output of the compaction (going to L3)
|
|
// so the file will be added to L0 since L3 is the base level
|
|
file_to_keys.push_back({31, 32, 33, 34});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
EXPECT_EQ(FilesPerLevel(), "5");
|
|
|
|
// This file does not overlap with the current running compactiong
|
|
file_to_keys.push_back({9000, 9001});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
EXPECT_EQ(FilesPerLevel(), "5,0,0,1");
|
|
|
|
// Hold compaction from finishing
|
|
TEST_SYNC_POINT("DBSSTTest::AddExternalSstFilePickedLevelDynamic:2");
|
|
|
|
// Output of the compaction will go to L3
|
|
dbfull()->TEST_WaitForCompact();
|
|
EXPECT_EQ(FilesPerLevel(), "1,0,0,2");
|
|
|
|
Close();
|
|
options.disable_auto_compactions = true;
|
|
Reopen(options);
|
|
|
|
file_to_keys.push_back({1, 15, 19});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
ASSERT_EQ(FilesPerLevel(), "1,0,0,3");
|
|
|
|
file_to_keys.push_back({1000, 1001, 1002});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
ASSERT_EQ(FilesPerLevel(), "1,0,0,4");
|
|
|
|
file_to_keys.push_back({500, 600, 700});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
ASSERT_EQ(FilesPerLevel(), "1,0,0,5");
|
|
|
|
// File 5 overlaps with file 2 (L3 / base level)
|
|
file_to_keys.push_back({2, 10});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
ASSERT_EQ(FilesPerLevel(), "2,0,0,5");
|
|
|
|
// File 6 overlaps with file 2 (L3 / base level) and file 5 (L0)
|
|
file_to_keys.push_back({3, 9});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
ASSERT_EQ(FilesPerLevel(), "3,0,0,5");
|
|
|
|
// Verify data in files
|
|
for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) {
|
|
for (auto& key_id : file_to_keys[file_id]) {
|
|
std::string k = Key(key_id);
|
|
std::string v = k + ToString(file_id);
|
|
|
|
ASSERT_EQ(Get(k), v);
|
|
}
|
|
}
|
|
|
|
// Write range [5 => 10] to L0
|
|
for (int i = 5; i <= 10; i++) {
|
|
std::string k = Key(i);
|
|
std::string v = k + "put";
|
|
ASSERT_OK(Put(k, v));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
ASSERT_EQ(FilesPerLevel(), "4,0,0,5");
|
|
|
|
// File 7 overlaps with file 4 (L3)
|
|
file_to_keys.push_back({650, 651, 652});
|
|
ASSERT_OK(GenerateAndAddExternalFile(options, file_to_keys.back(),
|
|
file_to_keys.size() - 1));
|
|
ASSERT_EQ(FilesPerLevel(), "5,0,0,5");
|
|
|
|
for (size_t file_id = 0; file_id < file_to_keys.size(); file_id++) {
|
|
for (auto& key_id : file_to_keys[file_id]) {
|
|
std::string k = Key(key_id);
|
|
std::string v = k + ToString(file_id);
|
|
if (key_id >= 5 && key_id <= 10) {
|
|
v = k + "put";
|
|
}
|
|
|
|
ASSERT_EQ(Get(k), v);
|
|
}
|
|
}
|
|
|
|
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
// 1 Create some SST files by inserting K-V pairs into DB
|
|
// 2 Close DB and change suffix from ".sst" to ".ldb" for every other SST file
|
|
// 3 Open DB and check if all key can be read
|
|
TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
|
|
Options options = CurrentOptions();
|
|
options.write_buffer_size = 110 << 10; // 110KB
|
|
options.num_levels = 4;
|
|
DestroyAndReopen(options);
|
|
|
|
Random rnd(301);
|
|
int key_id = 0;
|
|
for (int i = 0; i < 10; ++i) {
|
|
GenerateNewFile(&rnd, &key_id, false);
|
|
}
|
|
Flush();
|
|
Close();
|
|
int const num_files = GetSstFileCount(dbname_);
|
|
ASSERT_GT(num_files, 0);
|
|
|
|
std::vector<std::string> filenames;
|
|
GetSstFiles(dbname_, &filenames);
|
|
int num_ldb_files = 0;
|
|
for (size_t i = 0; i < filenames.size(); ++i) {
|
|
if (i & 1) {
|
|
continue;
|
|
}
|
|
std::string const rdb_name = dbname_ + "/" + filenames[i];
|
|
std::string const ldb_name = Rocks2LevelTableFileName(rdb_name);
|
|
ASSERT_TRUE(env_->RenameFile(rdb_name, ldb_name).ok());
|
|
++num_ldb_files;
|
|
}
|
|
ASSERT_GT(num_ldb_files, 0);
|
|
ASSERT_EQ(num_files, GetSstFileCount(dbname_));
|
|
|
|
Reopen(options);
|
|
for (int k = 0; k < key_id; ++k) {
|
|
ASSERT_NE("NOT_FOUND", Get(Key(k)));
|
|
}
|
|
Destroy(options);
|
|
}
|
|
|
|
} // namespace rocksdb
|
|
|
|
int main(int argc, char** argv) {
|
|
rocksdb::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|